GestoreControlloTraffico.java
/*
* GovWay - A customizable API Gateway
* https://govway.org
*
* Copyright (c) 2005-2025 Link.it srl (https://link.it).
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License version 3, as published by
* the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
*/
package org.openspcoop2.pdd.core.controllo_traffico;
import java.util.Date;
import java.util.concurrent.atomic.AtomicLong;
import org.openspcoop2.core.commons.CoreException;
import org.openspcoop2.core.config.driver.DriverConfigurazioneException;
import org.openspcoop2.core.controllo_traffico.constants.TipoErrore;
import org.openspcoop2.message.constants.ServiceBinding;
import org.openspcoop2.pdd.core.PdDContext;
import org.openspcoop2.pdd.core.handlers.HandlerException;
import org.openspcoop2.pdd.logger.MsgDiagnosticiProperties;
import org.openspcoop2.pdd.logger.MsgDiagnostico;
import org.openspcoop2.protocol.sdk.ProtocolException;
import org.openspcoop2.utils.UtilsException;
import org.openspcoop2.utils.date.DateManager;
import org.slf4j.Logger;
/**
* GestoreControlloTraffico
*
* @author Poli Andrea (poli@link.it)
* @author $Author$
* @version $Rev$, $Date$
*/
public class GestoreControlloTraffico {
private static GestoreControlloTraffico staticInstance = null;
public static synchronized void initialize(boolean erroreGenerico){
if(staticInstance==null){
staticInstance = new GestoreControlloTraffico(erroreGenerico);
}
}
public static GestoreControlloTraffico getInstance() throws CoreException{
if(staticInstance==null){
// spotbugs warning 'SING_SINGLETON_GETTER_NOT_SYNCHRONIZED': l'istanza viene creata allo startup
synchronized (GestoreControlloTraffico.class) {
throw new CoreException("GestorePolicyAttive non inizializzato");
}
}
return staticInstance;
}
private GestoreControlloTraffico(boolean erroreGenerico){
this.erroreGenerico = erroreGenerico;
}
/**
* Threads attivi complessivi sulla Porta
**/
private AtomicLong activeThreads = new AtomicLong(0l);
private boolean erroreGenerico;
private Long maxThreads = null;
private Integer threshold = null;
public StatoTraffico getStatoControlloTraffico() {
long currentActiveThreads = this.activeThreads.get();
StatoTraffico stato = new StatoTraffico();
stato.setActiveThreads(currentActiveThreads);
stato.setPddCongestionata(this.isPddCongestionata(currentActiveThreads));
return stato;
}
public void addThread(ServiceBinding serviceBinding,
Long maxThreadsObj, Integer thresholdObj, Boolean warningOnly,
PdDContext pddContext, MsgDiagnostico msgDiag, TipoErrore tipoErrore,
boolean includiDescrizioneErrore,Logger log) throws ProtocolException, HandlerException, CoreException, UtilsException, DriverConfigurazioneException {
boolean emettiDiagnosticoMaxThreadRaggiunto = false;
boolean emettiEventoMaxThreadsViolated = false;
String descriptionEventoMaxThreadsViolated = null;
Date dataEventoMaxThreadsViolated = null;
boolean emettiEventoPddCongestionata = false;
String descriptionEventoPddCongestionata = null;
Date dataEventoPddCongestionata = null;
try{
long maxThreadsPrimitive = maxThreadsObj.longValue();
int thresholdPrimitive = (thresholdObj!=null ? thresholdObj.intValue() : 0);
long activeThreadsSyncBeforeIncrement = -1;
boolean errorSync = false;
boolean pddCongestionataSync = false;
/**
* Gestione della concorrenza lock-free, la gestione dei thread massimi
* consentiti viene fatta usando una variabile atomica, in questo caso
* l'unica situazione negativa si verifica nel caso n thread contemporaneamente
* prendano
*/
/**String idTransazione = null;
if(pddContext!=null) {
idTransazione = (String) pddContext.getObject(org.openspcoop2.core.constants.Costanti.ID_TRANSAZIONE);
}
System.out.println("["+idTransazione+"] PRIMA: "+this.activeThreads.get());*/
// utilizzo una variabile atomica per gestire la concorrenza
long currentActiveThreads = this.activeThreads.incrementAndGet();
/**System.out.println("["+idTransazione+"] DOPO: "+currentActiveThreads);*/
// nel caso l'incremento abbia superato il massimo di thread consentiti devo decrementare il contatore
if (currentActiveThreads > maxThreadsPrimitive) {
errorSync = true;
// nel caso warningOnly posso continuare indisturbato
if (!warningOnly.booleanValue())
this.activeThreads.decrementAndGet();
}
// nel caso il thread sia stato correttamente aggiunto controllo la congestione
if((!errorSync || warningOnly.booleanValue()) && thresholdObj!=null){
boolean prePddCongestionata = this.isPddCongestionata(maxThreadsPrimitive, thresholdPrimitive, currentActiveThreads - 1);
boolean curPddCongestionata = this.isPddCongestionata(maxThreadsPrimitive, thresholdPrimitive, currentActiveThreads);
// se l'aggiunta del thread corrente ha congestionato il sistema mando il segnale
if (!prePddCongestionata && curPddCongestionata) {
emettiEventoPddCongestionata = true;
dataEventoPddCongestionata = DateManager.getDate();
}
}
HandlerException he = null;
if(errorSync) {
emettiDiagnosticoMaxThreadRaggiunto = true;
msgDiag.addKeyword(GeneratoreMessaggiErrore.TEMPLATE_ACTIVE_THREADS, activeThreadsSyncBeforeIncrement+"");
if(pddContext!=null) {
pddContext.addObject(GeneratoreMessaggiErrore.PDD_CONTEXT_ACTIVE_THREADS, activeThreadsSyncBeforeIncrement);
}
msgDiag.addKeyword(GeneratoreMessaggiErrore.TEMPLATE_MAX_THREADS_THRESHOLD, maxThreadsPrimitive+"");
emettiEventoMaxThreadsViolated = true;
descriptionEventoMaxThreadsViolated = "Superato il numero di richieste complessive ("+maxThreadsPrimitive+") gestibili dalla PdD";
dataEventoMaxThreadsViolated = DateManager.getDate();
if(pddContext!=null) {
GeneratoreMessaggiErrore.addPddContextInfoControlloTrafficoMaxThreadsViolated(pddContext,warningOnly);
}
String msgDiagnostico = null;
if(warningOnly.booleanValue()) {
msgDiag.getMessaggio_replaceKeywords(GeneratoreMessaggiErrore.MSG_DIAGNOSTICO_INTERCEPTOR_CONTROLLO_TRAFFICO_MAXREQUESTS_VIOLATED_WARNING_ONLY);
}
else {
msgDiag.getMessaggio_replaceKeywords(GeneratoreMessaggiErrore.MSG_DIAGNOSTICO_INTERCEPTOR_CONTROLLO_TRAFFICO_MAXREQUESTS_VIOLATED);
}
he = GeneratoreMessaggiErrore.getMaxThreadsViolated(
msgDiagnostico,
this.erroreGenerico, pddContext
);
he.setEmettiDiagnostico(false);
GeneratoreMessaggiErrore.configureHandlerExceptionByTipoErrore(serviceBinding, he, tipoErrore, includiDescrizioneErrore,log);
if(!warningOnly.booleanValue()) {
throw he;
}
}
long activeThreadsSyncAfterIncrement = activeThreadsSyncBeforeIncrement+1;
msgDiag.addKeyword(GeneratoreMessaggiErrore.TEMPLATE_ACTIVE_THREADS, activeThreadsSyncAfterIncrement+""); // per policy applicabilità
if(thresholdObj!=null){
// Aggiungo l'informazione se la pdd risulta congestionata nel pddContext.
if(pddContext!=null) {
pddContext.addObject(CostantiControlloTraffico.PDD_CONTEXT_PDD_CONGESTIONATA, pddCongestionataSync);
}
if(emettiEventoPddCongestionata) {
descriptionEventoPddCongestionata = this.buildDescription(maxThreadsPrimitive, thresholdPrimitive, msgDiag);
}
// Il timer dovra' vedere se esiste un evento di controllo del traffico.
// Se non esiste utilizzera' il metodo 'isControlloTrafficoAttivo' per vedere che il controllo del traffico e' rientrato.
}
if(he!=null) {
// caso di warning only
throw he;
}
}
finally{
if(emettiEventoMaxThreadsViolated){
CategoriaEventoControlloTraffico evento = null;
if(warningOnly.booleanValue()) {
evento = CategoriaEventoControlloTraffico.LIMITE_GLOBALE_RICHIESTE_SIMULTANEE_WARNING_ONLY;
}
else {
evento = CategoriaEventoControlloTraffico.LIMITE_GLOBALE_RICHIESTE_SIMULTANEE;
}
NotificatoreEventi.getInstance().log(evento, dataEventoMaxThreadsViolated, descriptionEventoMaxThreadsViolated);
}
// fuori dal synchronized
if(emettiDiagnosticoMaxThreadRaggiunto){
if(warningOnly.booleanValue()) {
msgDiag.logPersonalizzato(GeneratoreMessaggiErrore.MSG_DIAGNOSTICO_INTERCEPTOR_CONTROLLO_TRAFFICO_MAXREQUESTS_VIOLATED_WARNING_ONLY);
}
else {
msgDiag.logPersonalizzato(GeneratoreMessaggiErrore.MSG_DIAGNOSTICO_INTERCEPTOR_CONTROLLO_TRAFFICO_MAXREQUESTS_VIOLATED);
}
}
// fuori dal synchronized (per evitare deadlock)
if(emettiEventoPddCongestionata){
NotificatoreEventi.getInstance().log(CategoriaEventoControlloTraffico.CONGESTIONE_PORTA_DOMINIO, dataEventoPddCongestionata, descriptionEventoPddCongestionata);
}
}
}
public void removeThread() {
this.activeThreads.decrementAndGet();
}
public long sizeActiveThreads(){
return this.activeThreads.get();
}
public Boolean isPortaDominioCongestionata(Long maxThreadsObj, Integer thresholdObj) {
return this.isPddCongestionata(maxThreadsObj, thresholdObj, this.activeThreads.get());
}
// Utilities
/**
* Restituisce l'ultimo valore calcolato di PddCongestionata
* @param activeThreads: numero di thread attualmente attivi
* @return
*/
private boolean isPddCongestionata(long activeThreads) {
return this.isPddCongestionata(this.maxThreads, this.threshold, activeThreads);
}
/**
* Restituisce se la Pdd risulta congestionata
* @param maxThreads: numero massimo di thread attivi
* @param threshold: valore percentuale sul numero di thread massimo oltre il quale
* la pdd risulta congestionata (null se non attivo)
* @param activeThreads: numero di thread attivi
* @return true se il controllo della congestione della pdd é abilitato e risulta
* congestionata
*/
private boolean isPddCongestionata(Long maxThreads, Integer threshold, long activeThreads){
// mi salvo i valori di maxThreads e threshold in caso volessi restituire l'ultimo valore calcolato
this.maxThreads = maxThreads;
this.threshold = threshold;
if (threshold == null || maxThreads == null)
return false;
double dActiveT = maxThreads;
double dThreshold = threshold;
double t = dActiveT / 100d;
double tt = t * dThreshold;
int numeroThreadSoglia = (int)tt;
return activeThreads > numeroThreadSoglia; // non ci vuole >=, nella govwayConsole si dice chiaramente 'Il controllo del traffico verrà attivato oltre le <numeroThreadSoglia> richieste '
}
private String buildDescription(long maxThreads, int threshold, MsgDiagnostico msgDiag){
StringBuilder bf = new StringBuilder();
msgDiag.addKeyword(GeneratoreMessaggiErrore.TEMPLATE_MAX_THREADS_THRESHOLD, maxThreads+"");
msgDiag.addKeyword(GeneratoreMessaggiErrore.TEMPLATE_CONTROLLO_TRAFFICO_THRESHOLD, threshold+"");
bf.append(msgDiag.getMessaggio_replaceKeywords(MsgDiagnosticiProperties.MSG_DIAG_ALL, GeneratoreMessaggiErrore.MSG_DIAGNOSTICO_INTERCEPTOR_CONTROLLO_TRAFFICO_PDD_CONGESTIONATA));
return bf.toString();
}
}