TimerConsegnaContenutiApplicativi.java
- /*
- * GovWay - A customizable API Gateway
- * https://govway.org
- *
- * Copyright (c) 2005-2025 Link.it srl (https://link.it).
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU General Public License version 3, as published by
- * the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- *
- */
- package org.openspcoop2.pdd.timers;
- import java.sql.Connection;
- import java.util.ArrayList;
- import java.util.Date;
- import java.util.List;
- import java.util.Map;
- import org.openspcoop2.pdd.config.ConfigurazioneCoda;
- import org.openspcoop2.pdd.config.ConfigurazionePdDManager;
- import org.openspcoop2.pdd.config.ConfigurazionePriorita;
- import org.openspcoop2.pdd.config.OpenSPCoop2Properties;
- import org.openspcoop2.pdd.core.CostantiPdD;
- import org.openspcoop2.pdd.core.GestoreMessaggi;
- import org.openspcoop2.pdd.core.MessaggioServizioApplicativo;
- import org.openspcoop2.pdd.core.state.OpenSPCoopStateDBManager;
- import org.openspcoop2.pdd.core.state.OpenSPCoopStateful;
- import org.openspcoop2.pdd.logger.MsgDiagnostico;
- import org.openspcoop2.pdd.services.OpenSPCoop2Startup;
- import org.openspcoop2.protocol.engine.constants.Costanti;
- import org.openspcoop2.protocol.registry.RegistroServiziManager;
- import org.openspcoop2.protocol.sdk.state.StateMessage;
- import org.openspcoop2.utils.TipiDatabase;
- import org.openspcoop2.utils.Utilities;
- import org.openspcoop2.utils.UtilsException;
- import org.openspcoop2.utils.date.DateManager;
- import org.openspcoop2.utils.id.serial.InfoStatistics;
- import org.openspcoop2.utils.semaphore.Semaphore;
- import org.openspcoop2.utils.semaphore.SemaphoreConfiguration;
- import org.openspcoop2.utils.semaphore.SemaphoreMapping;
- import org.openspcoop2.utils.threads.IGestoreCodaRunnableInstance;
- import org.openspcoop2.utils.threads.Runnable;
- import org.openspcoop2.utils.threads.RunnableLogger;
- /**
- * Timer che si occupa di re-inoltrare i messaggi in riconsegna
- *
- *
- * @author Poli Andrea (apoli@link.it)
- * @author $Author$
- * @version $Rev$, $Date$
- */
- public class TimerConsegnaContenutiApplicativi implements IGestoreCodaRunnableInstance {
- private static TimerState STATE = TimerState.OFF; // abilitato in OpenSPCoop2Startup al momento dell'avvio
- public static TimerState getSTATE() {
- return STATE;
- }
- public static void setSTATE(TimerState sTATE) {
- STATE = sTATE;
- }
- private MsgDiagnostico msgDiag = null;
- private RunnableLogger log;
- private RunnableLogger logSql;
- private OpenSPCoop2Properties propertiesReader = null;
- private boolean debug;
- private String clusterId;
- private RegistroServiziManager registroServiziReader;
- private ConfigurazionePdDManager configurazionePdDReader;
- private ConfigurazioneCoda configurazioneCoda;
-
- private List<ConfigurazionePriorita> configurazioniPriorita;
-
- private TimerLock timerLock = null;
- /** Semaforo */
- private Semaphore semaphore = null;
- private InfoStatistics semaphore_statistics;
- private Date lastCheckMessaggiDaRispedire = null;
-
- public TimerConsegnaContenutiApplicativi(ConfigurazioneCoda configurazioneCoda, MsgDiagnostico msgDiag,
- RunnableLogger log, RunnableLogger logSql,
- OpenSPCoop2Properties p,
- ConfigurazionePdDManager configurazionePdDReader,RegistroServiziManager registroServiziReader) throws TimerException{
- this.configurazioneCoda = configurazioneCoda;
- this.msgDiag = msgDiag;
- this.log = log;
- this.logSql = logSql;
- this.propertiesReader = p;
- this.debug = configurazioneCoda.isDebug();
- this.clusterId = p.getClusterId(false);
-
- this.configurazioniPriorita = new ArrayList<ConfigurazionePriorita>();
- List<String> prioritaList = this.propertiesReader.getTimerConsegnaContenutiApplicativiPriorita();
- for (int i = 0; i < prioritaList.size(); i++) {
- String priorita = prioritaList.get(i);
- this.configurazioniPriorita.add(this.propertiesReader.getTimerConsegnaContenutiApplicativiConfigurazionePriorita(priorita));
- }
-
- this.configurazionePdDReader = configurazionePdDReader;
- this.registroServiziReader = registroServiziReader;
- // deve essere utilizzato lo stesso lock per GestoreMessaggi, ConsegnaContenuti, GestoreBuste per risolvere problema di eliminazione descritto in GestoreMessaggi metodo deleteMessageWithLock
- //this.timerLock = new TimerLock(TipoLock.GESTIONE_REPOSITORY_MESSAGGI);
- // l'utilizzo commentato sopra era ERRATO: i messaggi salvati con il timer di consegna contenuti applicativi vengono salvati con un nuovo identificativo 'gw-0-e20ae327-791c-40ef-84ad-4b141a0ef93f' e quindi non impattano sul problema descritto sopra.
- this.timerLock = new TimerLock(TipoLock.CONSEGNA_NOTIFICHE, configurazioneCoda.getName());
-
- if(this.propertiesReader.isTimerLockByDatabase()) {
- this.semaphore_statistics = new InfoStatistics();
- SemaphoreConfiguration config = GestoreMessaggi.newSemaphoreConfiguration(this.propertiesReader.getTimerConsegnaContenutiApplicativiLockMaxLife(),
- this.propertiesReader.getTimerConsegnaContenutiApplicativiLockIdleTime());
- TipiDatabase databaseType = TipiDatabase.toEnumConstant(this.propertiesReader.getDatabaseType());
- try {
- this.semaphore = new Semaphore(this.semaphore_statistics, SemaphoreMapping.newInstance(this.timerLock.getIdLock()),
- config, databaseType, this.log.getLog());
- }catch(Exception e) {
- throw new TimerException(e.getMessage(),e);
- }
- }
- }
- @Override
- public void initialize(RunnableLogger log) throws UtilsException{
-
- OpenSPCoopStateful openspcoopstateGestore = new OpenSPCoopStateful();
- try {
- this.logDebug("Rilascio eventuali messaggi con lock appesi da riconsegnare verso il modulo ConsegnaContenutiApplicativi ...");
-
- openspcoopstateGestore.initResource(this.propertiesReader.getIdentitaPortaDefaultWithoutProtocol(),TimerConsegnaContenutiApplicativiThread.ID_MODULO, "initialize",
- OpenSPCoopStateDBManager.smistatoreMessaggiPresiInCarico);
- Connection connectionDB = ((StateMessage)openspcoopstateGestore.getStatoRichiesta()).getConnectionDB();
- // GestoreMessaggi da Ricercare
- GestoreMessaggi gestoreMsgSearch = new GestoreMessaggi(openspcoopstateGestore, true,this.logSql.getLog(),this.msgDiag, null);
-
- String causaMessaggiINBOXDaRiconsegnare = "Rilascio eventuali messaggi con lock appesi da riconsegnare verso il modulo ConsegnaContenutiApplicativi";
- try{
- GestoreMessaggi.acquireLock(
- this.semaphore, connectionDB, this.timerLock,
- this.msgDiag, causaMessaggiINBOXDaRiconsegnare,
- this.propertiesReader.getTimerConsegnaContenutiApplicativi_getLockAttesaAttiva(),
- this.propertiesReader.getTimerConsegnaContenutiApplicativi_getLockCheckInterval());
-
- gestoreMsgSearch.releaseMessaggiPresaInCosegna(this.configurazioneCoda.getName(), this.clusterId, this.debug, this.logSql);
-
- }finally{
- try{
- GestoreMessaggi.releaseLock(
- this.semaphore, connectionDB, this.timerLock,
- this.msgDiag, causaMessaggiINBOXDaRiconsegnare);
- }catch(Exception e){
- // ignore
- }
- }
-
- this.logDebug("Rilascio effettuato di eventuali messaggi con lock appesi da riconsegnare verso il modulo ConsegnaContenutiApplicativi");
- }
- catch (Exception e) {
- this.msgDiag.logErroreGenerico(e,"GestioneMessaggiRilascioLockRiconsegnaConsegnaContenutiApplicativi");
- this.logError("Riscontrato errore durante la gestione del repository dei messaggi (Rilascio lock per riconsegna verso ConsegnaContenutiApplicativi): "+ e.getMessage(),e);
- }finally{
- if(openspcoopstateGestore!=null)
- openspcoopstateGestore.releaseResource();
- }
- }
-
- private static final String DATA_START = "DATA_START";
-
- @Override
- public void logCheckInProgress(Map<String, Object> context) {
- // Prendo la gestione
- this.msgDiag.logPersonalizzato("controlloInCorso");
- this.log.info(this.msgDiag.getMessaggio_replaceKeywords("controlloInCorso"));
- long startControlloRepositoryMessaggi = DateManager.getTimeMillis();
- context.put(DATA_START, startControlloRepositoryMessaggi);
- }
-
- @Override
- public void logRegisteredThreads(Map<String, Object> context, int nuoviThreadsAttivati) {
- if(nuoviThreadsAttivati>0){
- this.msgDiag.addKeyword(CostantiPdD.KEY_TIMER_CONSEGNA_CONTENUTI_APPLICATIVI_NUMERO_MESSAGGI_INOLTRATI,nuoviThreadsAttivati+"");
- this.msgDiag.logPersonalizzato("ricercaMessaggiDaInoltrare");
- }
- }
-
- @Override
- public void logCheckFinished(Map<String, Object> context) {
-
- // end
- long endControlloRepositoryMessaggi = DateManager.getTimeMillis();
- long startControlloRepositoryMessaggi = (Long) context.get(DATA_START);
- long diff = (endControlloRepositoryMessaggi-startControlloRepositoryMessaggi);
- this.log.info("Controllo Repository Messaggi (Riconsegna verso ConsegnaContenutiApplicativi) terminato in "+Utilities.convertSystemTimeIntoStringMillisecondi(diff, true));
- }
-
-
- @Override
- public List<Runnable> nextRunnable(int limit) throws UtilsException{
-
- List<Runnable> returnNull = null;
-
- // Controllo che il sistema non sia andando in shutdown
- if(OpenSPCoop2Startup.contextDestroyed){
- this.logError("Rilevato sistema in shutdown");
- return returnNull;
- }
- // Controllo che l'inizializzazione corretta delle risorse sia effettuata
- if(!OpenSPCoop2Startup.initialize){
- this.msgDiag.logFatalError("inizializzazione di OpenSPCoop non effettuata", "Check Inizializzazione");
- String msgErrore = "Riscontrato errore: inizializzazione del Timer o di OpenSPCoop non effettuata";
- this.logError(msgErrore);
- throw new UtilsException(msgErrore);
- }
- // Controllo risorse di sistema disponibili
- if( !TimerMonitoraggioRisorseThread.isRisorseDisponibili()){
- this.logError("Risorse di sistema non disponibili: "+TimerMonitoraggioRisorseThread.getRisorsaNonDisponibile().getMessage(),TimerMonitoraggioRisorseThread.getRisorsaNonDisponibile());
- return returnNull;
- }
- if( !MsgDiagnostico.gestoreDiagnosticaDisponibile){
- this.logError("Sistema di diagnostica non disponibile: "+MsgDiagnostico.motivoMalfunzionamentoDiagnostici.getMessage(),MsgDiagnostico.motivoMalfunzionamentoDiagnostici);
- return returnNull;
- }
-
- // Controllo che il timer non sia stato momentaneamente disabilitato
- if(!TimerState.ENABLED.equals(STATE)) {
- this.msgDiag.logPersonalizzato("disabilitato");
- this.log.info(this.msgDiag.getMessaggio_replaceKeywords("disabilitato"));
- return returnNull;
- }
-
- OpenSPCoopStateful openspcoopstateGestore = new OpenSPCoopStateful();
- try {
- this.logDebug("Inizializzazione connessione al db per ricercare nuovi threads da attivare (limit: "+limit+") ...");
- openspcoopstateGestore.initResource(this.propertiesReader.getIdentitaPortaDefaultWithoutProtocol(),TimerConsegnaContenutiApplicativiThread.ID_MODULO, "nextRunnable",
- OpenSPCoopStateDBManager.smistatoreMessaggiPresiInCarico);
- Connection connectionDB = ((StateMessage)openspcoopstateGestore.getStatoRichiesta()).getConnectionDB();
- boolean verificaPresenzaMessaggiDaRispedire = false;
- boolean calcolaDataMinimaMessaggiRispedire = false;
- Integer secondiAnzianitaPerIniziareSpedireNuovoMessaggio = this.configurazioneCoda.getScheduleMessageAfter();
- if(this.lastCheckMessaggiDaRispedire==null) {
- this.lastCheckMessaggiDaRispedire=DateManager.getDate();
- verificaPresenzaMessaggiDaRispedire = true;
- }
- else {
- Date expired = new Date(DateManager.getTimeMillis()-(1000*this.configurazioneCoda.getNextMessages_consegnaFallita_intervalloControllo()));
- this.logDebug("("+this.configurazioneCoda.getName()+") Verifica check messaggi da spedire previsto quando ultimo chek '"+
- org.openspcoop2.utils.date.DateUtils.getSimpleDateFormatMs().format(this.lastCheckMessaggiDaRispedire)+"' < '"+org.openspcoop2.utils.date.DateUtils.getSimpleDateFormatMs().format(expired)+"' ...");
- if(this.lastCheckMessaggiDaRispedire.before(expired)) {
- verificaPresenzaMessaggiDaRispedire = true;
- this.lastCheckMessaggiDaRispedire=DateManager.getDate();
- }
- }
- if(verificaPresenzaMessaggiDaRispedire) {
- calcolaDataMinimaMessaggiRispedire = this.configurazioneCoda.isNextMessages_consegnaFallita_calcolaDataMinimaRiconsegna();
- }
- this.logDebug("("+this.configurazioneCoda.getName()+") verificaPresenzaMessaggiDaRispedire:"+verificaPresenzaMessaggiDaRispedire+" calcolaDataMinimaMessaggiRispedire:"+calcolaDataMinimaMessaggiRispedire);
-
- // GestoreMessaggi da Ricercare
- GestoreMessaggi gestoreMsgSearch = new GestoreMessaggi(openspcoopstateGestore, true,this.logSql.getLog(),this.msgDiag, null);
-
- Date now = DateManager.getDate();
- List<MessaggioServizioApplicativo> msgDaRiconsegnareINBOX = new ArrayList<>();
-
- // APPLICATIVI PRIORITARI
- List<String> serviziApplicativiPrioritari = this.configurazionePdDReader.getServiziApplicativiConsegnaNotifichePrioritarie(this.configurazioneCoda.getName());
- if(serviziApplicativiPrioritari!=null && !serviziApplicativiPrioritari.isEmpty()) {
-
- String prefix = "[Applicativi Prioritari] ";
- String causale = prefix+"Messaggi da riconsegnare verso il modulo ConsegnaContenutiApplicativi";
- try {
- this.logDebug(prefix+"Acquisizione lock per ricercare nuovi threads da attivare (limit: "+limit+") ...");
- this.lock(connectionDB, causale);
-
- this.logDebug(prefix+"Lock acquisito, ricerca nuovi threads da attivare (limit: "+limit+") ...");
- List<MessaggioServizioApplicativo> msgDaRiconsegnareINBOX_priorita =
- gestoreMsgSearch.readMessaggiDaRiconsegnareIntoBoxByServiziApplicativPrioritari(limit,
- verificaPresenzaMessaggiDaRispedire, calcolaDataMinimaMessaggiRispedire,secondiAnzianitaPerIniziareSpedireNuovoMessaggio,
- now,
- this.propertiesReader.getTimerConsegnaContenutiApplicativiPresaInConsegnaMaxLife(),
- this.debug,this.logSql,
- this.configurazioneCoda.getName(),
- serviziApplicativiPrioritari.toArray(new String[1]));
-
- if(msgDaRiconsegnareINBOX_priorita!=null && !msgDaRiconsegnareINBOX_priorita.isEmpty()) {
- this.logDebug(prefix+"Ricerca nuovi threads da attivare terminata (limit: "+limit+"); prendo in carico "+
- msgDaRiconsegnareINBOX_priorita.size()+" messaggi ...");
-
- for (MessaggioServizioApplicativo messaggioServizioApplicativo : msgDaRiconsegnareINBOX_priorita) {
- GestoreMessaggi messaggioDaInviare = new GestoreMessaggi(openspcoopstateGestore,true,messaggioServizioApplicativo.getIdMessaggio(),Costanti.INBOX,
- this.log.getLog(),this.msgDiag,null);
- messaggioDaInviare.updateMessaggioPresaInCosegna(messaggioServizioApplicativo.getServizioApplicativo(),
- this.clusterId, this.debug, this.logSql);
-
- msgDaRiconsegnareINBOX.add(messaggioServizioApplicativo);
- }
-
- this.logDebug(prefix+"Presa in carico "+msgDaRiconsegnareINBOX_priorita.size()+" messaggi terminata, rilascio lock...");
- }
- else {
- this.logDebug(prefix+"Ricerca nuovi threads da attivare terminata (limit: "+limit+"); non sono presenti messaggi, rilascio lock...");
- }
-
- }finally{
- this.releaseLock(connectionDB, causale);
- }
- this.logDebug(prefix+"Lock rilasciato");
- }
-
- int finestraAncoraDisponibileDopoApplicativiPrioritari = limit - msgDaRiconsegnareINBOX.size();
-
- if(finestraAncoraDisponibileDopoApplicativiPrioritari>0) {
- for (ConfigurazionePriorita configurazionePriorita : this.configurazioniPriorita) {
-
- String prefix = "[P-"+configurazionePriorita.getLabel()+"] ";
-
- int limitPriorita = 0;
- if(configurazionePriorita.isNessunaPriorita()) {
- limitPriorita = limit - msgDaRiconsegnareINBOX.size();
- this.logDebug(prefix+"Calcolo limit; cerco entries rimaste senza guardare la priorità ("+limitPriorita+") ...");
- }
- else {
- limitPriorita = (finestraAncoraDisponibileDopoApplicativiPrioritari * configurazionePriorita.getPercentuale()) / 100;
- this.logDebug(prefix+"Calcolo limit; "+configurazionePriorita.getPercentuale()+"% di "+finestraAncoraDisponibileDopoApplicativiPrioritari+": "+limitPriorita+" ...");
- }
- if(limitPriorita<=0) {
- this.logDebug(prefix+"Per la priorità non è necessario cercare alcun messaggio");
- continue;
- }
-
- String causale = prefix+"Messaggi da riconsegnare verso il modulo ConsegnaContenutiApplicativi";
- try {
- this.logDebug(prefix+"Acquisizione lock per ricercare nuovi threads da attivare (limit: "+limitPriorita+") ...");
- this.lock(connectionDB, causale);
-
- this.logDebug(prefix+"Lock acquisito, ricerca nuovi threads da attivare (limit: "+limitPriorita+") ...");
- List<MessaggioServizioApplicativo> msgDaRiconsegnareINBOXpriorita =
- gestoreMsgSearch.readMessaggiDaRiconsegnareIntoBoxByPriorita(limitPriorita,
- verificaPresenzaMessaggiDaRispedire, calcolaDataMinimaMessaggiRispedire, secondiAnzianitaPerIniziareSpedireNuovoMessaggio,
- now,
- this.propertiesReader.getTimerConsegnaContenutiApplicativiPresaInConsegnaMaxLife(),
- this.debug,this.logSql,
- this.configurazioneCoda.getName(),
- configurazionePriorita.isNessunaPriorita() ? null : configurazionePriorita.getName());
-
- if(msgDaRiconsegnareINBOXpriorita!=null && !msgDaRiconsegnareINBOXpriorita.isEmpty()) {
- this.logDebug(prefix+"Ricerca nuovi threads da attivare terminata (limit: "+limitPriorita+"); prendo in carico "+
- msgDaRiconsegnareINBOXpriorita.size()+" messaggi ...");
-
- // La finestra non deve essere ricalcolata
- /**finestraAncoraDisponibileDopoApplicativiPrioritari = finestraAncoraDisponibileDopoApplicativiPrioritari - msgDaRiconsegnareINBOX_priorita.size();*/
-
- for (MessaggioServizioApplicativo messaggioServizioApplicativo : msgDaRiconsegnareINBOXpriorita) {
- GestoreMessaggi messaggioDaInviare = new GestoreMessaggi(openspcoopstateGestore,true,messaggioServizioApplicativo.getIdMessaggio(),Costanti.INBOX,
- this.log.getLog(),this.msgDiag,null);
- messaggioDaInviare.updateMessaggioPresaInCosegna(messaggioServizioApplicativo.getServizioApplicativo(),
- this.clusterId, this.debug, this.logSql);
-
- msgDaRiconsegnareINBOX.add(messaggioServizioApplicativo);
- }
-
- this.logDebug(prefix+"Presa in carico "+msgDaRiconsegnareINBOXpriorita.size()+" messaggi terminata, rilascio lock...");
- }
- else {
- this.logDebug(prefix+"Ricerca nuovi threads da attivare terminata (limit: "+limitPriorita+"); non sono presenti messaggi, rilascio lock...");
- }
-
- }finally{
- this.releaseLock(connectionDB, causale);
- }
- this.logDebug(prefix+"Lock rilasciato");
-
- }
- }
-
- this.logDebug("Creazione Runnable ...");
- List<Runnable> listRunnable = null;
- if(msgDaRiconsegnareINBOX!=null && !msgDaRiconsegnareINBOX.isEmpty()) {
- listRunnable = new ArrayList<>();
- for (MessaggioServizioApplicativo messaggioServizioApplicativo : msgDaRiconsegnareINBOX) {
- TimerConsegnaContenutiApplicativiSender sender =
- new TimerConsegnaContenutiApplicativiSender(messaggioServizioApplicativo,
- this.registroServiziReader, this.configurazionePdDReader,
- this.clusterId, this.configurazioneCoda);
- listRunnable.add(new Runnable(sender, -1));
- }
- }
- this.logDebug("Creazione Runnable terminata");
-
- return listRunnable;
- }
- catch(TimerLockNotAvailableException t) {
- // msg diagnostico emesso durante l'emissione dell'eccezione
- this.log.info(t.getMessage(),t);
- }
- catch (Exception e) {
- this.msgDiag.logErroreGenerico(e,"GestioneMessaggiRiconsegnaConsegnaContenutiApplicativi");
- this.logError("Riscontrato errore durante la gestione del repository dei messaggi (Riconsegna verso ConsegnaContenutiApplicativi): "+ e.getMessage(),e);
- }finally{
- if(openspcoopstateGestore!=null)
- openspcoopstateGestore.releaseResource();
- }
-
- return returnNull;
- }
- private void lock(Connection connectionDB, String causale) throws UtilsException, TimerLockNotAvailableException {
- GestoreMessaggi.acquireLock(
- this.semaphore, connectionDB, this.timerLock,
- this.msgDiag, causale,
- this.propertiesReader.getTimerConsegnaContenutiApplicativi_getLockAttesaAttiva(),
- this.propertiesReader.getTimerConsegnaContenutiApplicativi_getLockCheckInterval());
- }
- private void releaseLock(Connection connectionDB, String causale) {
- try{
- GestoreMessaggi.releaseLock(
- this.semaphore, connectionDB, this.timerLock,
- this.msgDiag, causale);
- }catch(Exception e){
- // ignore
- }
- }
-
- private void logDebug(String msg) {
- this.logDebug(msg, null);
- }
- private void logDebug(String msg, Throwable e) {
- if(e!=null) {
- this.log.debug(getPrefix()+msg, e);
- }
- else {
- this.log.debug(getPrefix()+msg);
- }
- }
-
- private void logError(String msg) {
- this.logError(msg, null);
- }
- private void logError(String msg, Throwable e) {
- if(e!=null) {
- this.log.error(getPrefix()+msg, e);
- }
- else {
- this.log.error(getPrefix()+msg);
- }
- }
-
- private String getPrefix() {
- if(this.configurazioneCoda!=null) {
- return "["+this.configurazioneCoda.getName()+"] ";
- }
- return "";
- }
- }