GestoreControlloTraffico.java
/*
* GovWay - A customizable API Gateway
* https://govway.org
*
* Copyright (c) 2005-2024 Link.it srl (https://link.it).
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License version 3, as published by
* the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
*/
package org.openspcoop2.pdd.core.controllo_traffico;
import java.util.Date;
import org.openspcoop2.core.commons.CoreException;
import org.openspcoop2.core.constants.Costanti;
import org.openspcoop2.core.controllo_traffico.constants.TipoErrore;
import org.openspcoop2.message.constants.ServiceBinding;
import org.openspcoop2.pdd.core.PdDContext;
import org.openspcoop2.pdd.core.handlers.HandlerException;
import org.openspcoop2.pdd.logger.MsgDiagnosticiProperties;
import org.openspcoop2.pdd.logger.MsgDiagnostico;
import org.openspcoop2.utils.date.DateManager;
import org.slf4j.Logger;
/**
* GestoreControlloTraffico
*
* @author Poli Andrea (poli@link.it)
* @author $Author$
* @version $Rev$, $Date$
*/
public class GestoreControlloTraffico {
private static GestoreControlloTraffico staticInstance = null;
public static synchronized void initialize(boolean erroreGenerico){
if(staticInstance==null){
staticInstance = new GestoreControlloTraffico(erroreGenerico);
}
}
public static GestoreControlloTraffico getInstance() throws CoreException{
if(staticInstance==null){
// spotbugs warning 'SING_SINGLETON_GETTER_NOT_SYNCHRONIZED': l'istanza viene creata allo startup
synchronized (GestoreControlloTraffico.class) {
throw new CoreException("GestorePolicyAttive non inizializzato");
}
}
return staticInstance;
}
private GestoreControlloTraffico(boolean erroreGenerico){
this.erroreGenerico = erroreGenerico;
}
/**
* Threads attivi complessivi sulla Porta
**/
//private final Boolean semaphore = true; // Serve perche' senno cambiando i valori usando auto-box un-box, si perde il riferimento.
private final org.openspcoop2.utils.Semaphore lock = new org.openspcoop2.utils.Semaphore("GestoreControlloTraffico");
private long activeThreads = 0l;
private boolean pddCongestionata = false;
private boolean erroreGenerico;
public StatoTraffico getStatoControlloTraffico(String idTransazione, boolean sync) {
if(sync) {
long syncActiveThreads = 0l;
boolean syncPddCongestionata = false;
//synchronized (this.semaphore) {
this.lock.acquireThrowRuntime("getStatoControlloTraffico", idTransazione);
try {
syncActiveThreads = this.activeThreads;
syncPddCongestionata = this.pddCongestionata;
}finally {
this.lock.release("getStatoControlloTraffico", idTransazione);
}
StatoTraffico stato = new StatoTraffico();
stato.setActiveThreads(syncActiveThreads);
stato.setPddCongestionata(syncPddCongestionata);
return stato;
}
else {
//Risolve problema di deadlock che scaturiva utilizzando solamente 1 connessione e facendo un test in cui più thread invocavano con più messaggi, senza avere alcuna informazione in cache
// Si perde un pochino in precisione, ma risolve il problema del deadlock
StatoTraffico stato = new StatoTraffico();
stato.setActiveThreads(this.activeThreads);
stato.setPddCongestionata(this.pddCongestionata);
return stato;
}
}
public void addThread(ServiceBinding serviceBinding, Long maxThreadsObj, Integer thresholdObj, Boolean warningOnly, PdDContext pddContext, MsgDiagnostico msgDiag,
TipoErrore tipoErrore, boolean includiDescrizioneErrore,Logger log) throws Exception{
boolean emettiDiagnosticoMaxThreadRaggiunto = false;
boolean emettiEventoMaxThreadsViolated = false;
String descriptionEventoMaxThreadsViolated = null;
Date dataEventoMaxThreadsViolated = null;
boolean emettiEventoPddCongestionata = false;
String descriptionEventoPddCongestionata = null;
Date dataEventoPddCongestionata = null;
try{
long maxThreadsPrimitive = maxThreadsObj.longValue();
int thresholdPrimitive = (thresholdObj!=null ? thresholdObj.intValue() : 0);
String idTransazione = (pddContext!=null && pddContext.containsKey(Costanti.ID_TRANSAZIONE)) ? PdDContext.getValue(Costanti.ID_TRANSAZIONE, pddContext) : null;
long activeThreadsSyncBeforeIncrement = -1;
boolean errorSync = false;
boolean pddCongestionataSync = false;
//synchronized (this.semaphore) {
this.lock.acquire("addThread", idTransazione);
try {
activeThreadsSyncBeforeIncrement = this.activeThreads;
//System.out.println("@@@addThread CONTROLLO ["+this.activeThreads+"]<["+maxThreads+"] ("+(!(this.activeThreads<maxThreads))+")");
if(!(this.activeThreads<maxThreadsPrimitive)){
errorSync = true;
}
if(!errorSync || warningOnly) {
this.activeThreads++;
if(thresholdObj!=null){
pddCongestionataSync = this._isPddCongestionata(maxThreadsPrimitive, thresholdPrimitive);
//System.out.println("ACTIVE THREADS TOTALI: "+this.activeThreads);
//System.out.println("PDD CONGESTIONATA: "+pddCongestionata);
// verifica rispetto a variabile interna
if(this.pddCongestionata){
if(pddCongestionataSync==false){
//System.out.println("@@ NON PIU' RICHIESTO");
this.pddCongestionata = false;
}
}
else{
if(pddCongestionataSync){
//System.out.println("@@ C.T. RICHIESTO ATTIVO");
this.pddCongestionata = true;
// Emetto un evento di congestione in corso
emettiEventoPddCongestionata = true;
dataEventoPddCongestionata = DateManager.getDate();
}
}
}
}
//System.out.println("@@@addThread (dopo): "+this.activeThreads);
}finally {
this.lock.release("addThread", idTransazione);
}
HandlerException he = null;
if(errorSync) {
emettiDiagnosticoMaxThreadRaggiunto = true;
msgDiag.addKeyword(GeneratoreMessaggiErrore.TEMPLATE_ACTIVE_THREADS, activeThreadsSyncBeforeIncrement+"");
if(pddContext!=null) {
pddContext.addObject(GeneratoreMessaggiErrore.PDD_CONTEXT_ACTIVE_THREADS, activeThreadsSyncBeforeIncrement);
}
msgDiag.addKeyword(GeneratoreMessaggiErrore.TEMPLATE_MAX_THREADS_THRESHOLD, maxThreadsPrimitive+"");
//System.out.println("@@@addThread ERR");
emettiEventoMaxThreadsViolated = true;
descriptionEventoMaxThreadsViolated = "Superato il numero di richieste complessive ("+maxThreadsPrimitive+") gestibili dalla PdD";
dataEventoMaxThreadsViolated = DateManager.getDate();
if(pddContext!=null) {
GeneratoreMessaggiErrore.addPddContextInfo_ControlloTrafficoMaxThreadsViolated(pddContext,warningOnly);
}
String msgDiagnostico = null;
if(warningOnly) {
msgDiag.getMessaggio_replaceKeywords(GeneratoreMessaggiErrore.MSG_DIAGNOSTICO_INTERCEPTOR_CONTROLLO_TRAFFICO_MAXREQUESTS_VIOLATED_WARNING_ONLY);
}
else {
msgDiag.getMessaggio_replaceKeywords(GeneratoreMessaggiErrore.MSG_DIAGNOSTICO_INTERCEPTOR_CONTROLLO_TRAFFICO_MAXREQUESTS_VIOLATED);
}
he = GeneratoreMessaggiErrore.getMaxThreadsViolated(
msgDiagnostico,
this.erroreGenerico, pddContext
);
he.setEmettiDiagnostico(false);
GeneratoreMessaggiErrore.configureHandlerExceptionByTipoErrore(serviceBinding, he, tipoErrore, includiDescrizioneErrore,log);
if(warningOnly == false) {
throw he;
}
}
long activeThreadsSyncAfterIncrement = activeThreadsSyncBeforeIncrement+1;
msgDiag.addKeyword(GeneratoreMessaggiErrore.TEMPLATE_ACTIVE_THREADS, activeThreadsSyncAfterIncrement+""); // per policy applicabilità
if(thresholdObj!=null){
// Aggiungo l'informazione se la pdd risulta congestionata nel pddContext.
if(pddContext!=null) {
pddContext.addObject(CostantiControlloTraffico.PDD_CONTEXT_PDD_CONGESTIONATA, pddCongestionataSync);
}
if(emettiEventoPddCongestionata) {
descriptionEventoPddCongestionata = this._buildDescription(maxThreadsPrimitive, thresholdPrimitive, msgDiag);
}
// Il timer dovra' vedere se esiste un evento di controllo del traffico.
// Se non esiste utilizzera' il metodo 'isControlloTrafficoAttivo' per vedere che il controllo del traffico e' rientrato.
}
if(he!=null) {
// caso di warning only
throw he;
}
}
finally{
// *** ATTIVITA DA FARE FUORI DAL SYNCHRONIZED **
// fuori dal synchronized (per evitare deadlock)
if(emettiEventoMaxThreadsViolated){
CategoriaEventoControlloTraffico evento = null;
if(warningOnly) {
evento = CategoriaEventoControlloTraffico.LIMITE_GLOBALE_RICHIESTE_SIMULTANEE_WARNING_ONLY;
}
else {
evento = CategoriaEventoControlloTraffico.LIMITE_GLOBALE_RICHIESTE_SIMULTANEE;
}
NotificatoreEventi.getInstance().log(evento, dataEventoMaxThreadsViolated, descriptionEventoMaxThreadsViolated);
}
// fuori dal synchronized
if(emettiDiagnosticoMaxThreadRaggiunto){
if(warningOnly) {
msgDiag.logPersonalizzato(GeneratoreMessaggiErrore.MSG_DIAGNOSTICO_INTERCEPTOR_CONTROLLO_TRAFFICO_MAXREQUESTS_VIOLATED_WARNING_ONLY);
}
else {
msgDiag.logPersonalizzato(GeneratoreMessaggiErrore.MSG_DIAGNOSTICO_INTERCEPTOR_CONTROLLO_TRAFFICO_MAXREQUESTS_VIOLATED);
}
}
// fuori dal synchronized (per evitare deadlock)
if(emettiEventoPddCongestionata){
NotificatoreEventi.getInstance().log(CategoriaEventoControlloTraffico.CONGESTIONE_PORTA_DOMINIO, dataEventoPddCongestionata, descriptionEventoPddCongestionata);
}
}
}
public void removeThread(Long maxThreadsObj, Integer thresholdObj, String idTransazione) throws Exception{
//synchronized (this.semaphore) {
if(maxThreadsObj==null) {
throw new Exception("MaxThreads param is null");
}
long maxThreadsPrimitive = maxThreadsObj.longValue();
int thresholdPrimitive = (thresholdObj!=null ? thresholdObj.intValue() : 0);
this.lock.acquire("removeThread", idTransazione);
try {
this.activeThreads--;
if(thresholdObj!=null && this.pddCongestionata){
// System.out.println("AGGORNO CONGESTIONE");
// boolean old = this.pddCongestionata;
this.pddCongestionata = this._isPddCongestionata(maxThreadsPrimitive, thresholdPrimitive);
// if(old!=this.pddCongestionata){
// System.out.println("OLD["+old+"] NEW["+this.pddCongestionata+"]");
// }
}
//System.out.println("@@@removeThread (dopo): "+this.activeThreads);
}finally {
this.lock.release("removeThread", idTransazione);
}
}
public long sizeActiveThreads(){
//synchronized (this.semaphore) {
this.lock.acquireThrowRuntime("sizeActiveThreads");
try {
//System.out.println("@@@SIZE: "+this.activeThreads);
return this.activeThreads;
}finally {
this.lock.release("sizeActiveThreads");
}
}
public Boolean isPortaDominioCongestionata(Long maxThreadsObj, Integer thresholdObj) {
//synchronized (this.semaphore) {
long maxThreadsPrimitive = maxThreadsObj.longValue();
int thresholdPrimitive = (thresholdObj!=null ? thresholdObj.intValue() : 0);
this.lock.acquireThrowRuntime("isPortaDominioCongestionata");
try {
if(thresholdObj!=null){
this.pddCongestionata = this._isPddCongestionata(maxThreadsPrimitive, thresholdPrimitive); // refresh per evitare che l'ultimo thread abbia lasciato attivo il controllo
}
else{
this.pddCongestionata = false; // controllo non attivo
}
return this.pddCongestionata;
}finally {
this.lock.release("isPortaDominioCongestionata");
}
}
// Utilities
private boolean _isPddCongestionata(long maxThreads, int threshold){
double dActiveT = maxThreads;
double dThreshold = threshold;
double t = dActiveT / 100d;
double tt = t * dThreshold;
int numeroThreadSoglia = (int)tt;
return this.activeThreads > numeroThreadSoglia; // non ci vuole >=, nella govwayConsole si dice chiaramente 'Il controllo del traffico verrà attivato oltre le <numeroThreadSoglia> richieste '
}
private String _buildDescription(long maxThreads, int threshold, MsgDiagnostico msgDiag){
StringBuilder bf = new StringBuilder();
msgDiag.addKeyword(GeneratoreMessaggiErrore.TEMPLATE_MAX_THREADS_THRESHOLD, maxThreads+"");
msgDiag.addKeyword(GeneratoreMessaggiErrore.TEMPLATE_CONTROLLO_TRAFFICO_THRESHOLD, threshold+"");
bf.append(msgDiag.getMessaggio_replaceKeywords(MsgDiagnosticiProperties.MSG_DIAG_ALL, GeneratoreMessaggiErrore.MSG_DIAGNOSTICO_INTERCEPTOR_CONTROLLO_TRAFFICO_PDD_CONGESTIONATA));
return bf.toString();
}
}