TimerConsegnaContenutiApplicativi.java

/*
 * GovWay - A customizable API Gateway 
 * https://govway.org
 * 
 * Copyright (c) 2005-2024 Link.it srl (https://link.it). 
 * 
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU General Public License version 3, as published by
 * the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
 *
 */



package org.openspcoop2.pdd.timers;

import java.sql.Connection;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Map;

import org.openspcoop2.pdd.config.ConfigurazioneCoda;
import org.openspcoop2.pdd.config.ConfigurazionePdDManager;
import org.openspcoop2.pdd.config.ConfigurazionePriorita;
import org.openspcoop2.pdd.config.OpenSPCoop2Properties;
import org.openspcoop2.pdd.core.CostantiPdD;
import org.openspcoop2.pdd.core.GestoreMessaggi;
import org.openspcoop2.pdd.core.MessaggioServizioApplicativo;
import org.openspcoop2.pdd.core.state.OpenSPCoopStateDBManager;
import org.openspcoop2.pdd.core.state.OpenSPCoopStateful;
import org.openspcoop2.pdd.logger.MsgDiagnostico;
import org.openspcoop2.pdd.services.OpenSPCoop2Startup;
import org.openspcoop2.protocol.engine.constants.Costanti;
import org.openspcoop2.protocol.registry.RegistroServiziManager;
import org.openspcoop2.protocol.sdk.state.StateMessage;
import org.openspcoop2.utils.TipiDatabase;
import org.openspcoop2.utils.Utilities;
import org.openspcoop2.utils.UtilsException;
import org.openspcoop2.utils.date.DateManager;
import org.openspcoop2.utils.id.serial.InfoStatistics;
import org.openspcoop2.utils.semaphore.Semaphore;
import org.openspcoop2.utils.semaphore.SemaphoreConfiguration;
import org.openspcoop2.utils.semaphore.SemaphoreMapping;
import org.openspcoop2.utils.threads.IGestoreCodaRunnableInstance;
import org.openspcoop2.utils.threads.Runnable;
import org.openspcoop2.utils.threads.RunnableLogger;



/**
 * Timer che si occupa di re-inoltrare i messaggi in riconsegna
 * 
 *  
 * @author Poli Andrea (apoli@link.it)
 * @author $Author$
 * @version $Rev$, $Date$
 */

public class TimerConsegnaContenutiApplicativi implements IGestoreCodaRunnableInstance  {

	private static TimerState STATE = TimerState.OFF; // abilitato in OpenSPCoop2Startup al momento dell'avvio

	public static TimerState getSTATE() {
		return STATE;
	}
	public static void setSTATE(TimerState sTATE) {
		STATE = sTATE;
	}

	private MsgDiagnostico msgDiag = null;
	private RunnableLogger log;
	private RunnableLogger logSql;
	private OpenSPCoop2Properties propertiesReader = null;
	private boolean debug;
	private String clusterId;

	private RegistroServiziManager registroServiziReader;
	private ConfigurazionePdDManager configurazionePdDReader;

	private ConfigurazioneCoda configurazioneCoda;
	
	private List<ConfigurazionePriorita> configurazioniPriorita;
	
	private TimerLock timerLock = null;

	/** Semaforo */
	private Semaphore semaphore = null;
	private InfoStatistics semaphore_statistics;

	private Date lastCheckMessaggiDaRispedire = null;
	
	public TimerConsegnaContenutiApplicativi(ConfigurazioneCoda configurazioneCoda, MsgDiagnostico msgDiag,
			RunnableLogger log, RunnableLogger logSql,
			OpenSPCoop2Properties p,
			ConfigurazionePdDManager configurazionePdDReader,RegistroServiziManager registroServiziReader) throws TimerException{
		this.configurazioneCoda = configurazioneCoda;
		this.msgDiag = msgDiag;
		this.log = log;
		this.logSql = logSql;
		this.propertiesReader = p;
		this.debug = configurazioneCoda.isDebug();
		this.clusterId = p.getClusterId(false);
		
		this.configurazioniPriorita = new ArrayList<ConfigurazionePriorita>();
		List<String> prioritaList = this.propertiesReader.getTimerConsegnaContenutiApplicativiPriorita();
		for (int i = 0; i < prioritaList.size(); i++) {
			String priorita = prioritaList.get(i);
			this.configurazioniPriorita.add(this.propertiesReader.getTimerConsegnaContenutiApplicativiConfigurazionePriorita(priorita));
		}
		
		this.configurazionePdDReader = configurazionePdDReader;
		this.registroServiziReader = registroServiziReader;

		// deve essere utilizzato lo stesso lock per GestoreMessaggi, ConsegnaContenuti, GestoreBuste per risolvere problema di eliminazione descritto in GestoreMessaggi metodo deleteMessageWithLock 
		//this.timerLock = new TimerLock(TipoLock.GESTIONE_REPOSITORY_MESSAGGI);
		// l'utilizzo commentato sopra era ERRATO: i messaggi salvati con il timer di consegna contenuti applicativi vengono salvati con un nuovo identificativo 'gw-0-e20ae327-791c-40ef-84ad-4b141a0ef93f' e quindi non impattano sul problema descritto sopra.
		this.timerLock = new TimerLock(TipoLock.CONSEGNA_NOTIFICHE, configurazioneCoda.getName());
		
		if(this.propertiesReader.isTimerLockByDatabase()) {
			this.semaphore_statistics = new InfoStatistics();

			SemaphoreConfiguration config = GestoreMessaggi.newSemaphoreConfiguration(this.propertiesReader.getTimerConsegnaContenutiApplicativiLockMaxLife(), 
					this.propertiesReader.getTimerConsegnaContenutiApplicativiLockIdleTime());

			TipiDatabase databaseType = TipiDatabase.toEnumConstant(this.propertiesReader.getDatabaseType());
			try {
				this.semaphore = new Semaphore(this.semaphore_statistics, SemaphoreMapping.newInstance(this.timerLock.getIdLock()), 
						config, databaseType, this.log.getLog());
			}catch(Exception e) {
				throw new TimerException(e.getMessage(),e);
			}
		}
	}

	@Override
	public void initialize(RunnableLogger log) throws UtilsException{
		
		OpenSPCoopStateful openspcoopstateGestore = new OpenSPCoopStateful();
		try {

			this.logDebug("Rilascio eventuali messaggi con lock appesi da riconsegnare verso il modulo ConsegnaContenutiApplicativi ...");
			
			openspcoopstateGestore.initResource(this.propertiesReader.getIdentitaPortaDefaultWithoutProtocol(),TimerConsegnaContenutiApplicativiThread.ID_MODULO, "initialize",
					OpenSPCoopStateDBManager.smistatoreMessaggiPresiInCarico);
			Connection connectionDB = ((StateMessage)openspcoopstateGestore.getStatoRichiesta()).getConnectionDB();

			// GestoreMessaggi da Ricercare
			GestoreMessaggi gestoreMsgSearch = new GestoreMessaggi(openspcoopstateGestore, true,this.logSql.getLog(),this.msgDiag, null);
	
			String causaMessaggiINBOXDaRiconsegnare = "Rilascio eventuali messaggi con lock appesi da riconsegnare verso il modulo ConsegnaContenutiApplicativi";
			try{
				GestoreMessaggi.acquireLock(
						this.semaphore, connectionDB, this.timerLock,
						this.msgDiag, causaMessaggiINBOXDaRiconsegnare, 
						this.propertiesReader.getTimerConsegnaContenutiApplicativi_getLockAttesaAttiva(), 
						this.propertiesReader.getTimerConsegnaContenutiApplicativi_getLockCheckInterval());
			
				gestoreMsgSearch.releaseMessaggiPresaInCosegna(this.configurazioneCoda.getName(), this.clusterId, this.debug, this.logSql);
				
			}finally{
				try{
					GestoreMessaggi.releaseLock(
							this.semaphore, connectionDB, this.timerLock,
							this.msgDiag, causaMessaggiINBOXDaRiconsegnare);
				}catch(Exception e){
					// ignore
				}
			}
			
			this.logDebug("Rilascio effettuato di eventuali messaggi con lock appesi da riconsegnare verso il modulo ConsegnaContenutiApplicativi");
		}
		catch (Exception e) {
			this.msgDiag.logErroreGenerico(e,"GestioneMessaggiRilascioLockRiconsegnaConsegnaContenutiApplicativi");
			this.logError("Riscontrato errore durante la gestione del repository dei messaggi (Rilascio lock per riconsegna verso ConsegnaContenutiApplicativi): "+ e.getMessage(),e);
		}finally{
			if(openspcoopstateGestore!=null)
				openspcoopstateGestore.releaseResource();
		}
	}
	
	private static final String DATA_START = "DATA_START";
	
	@Override
	public void logCheckInProgress(Map<String, Object> context) {
		// Prendo la gestione
		this.msgDiag.logPersonalizzato("controlloInCorso");
		this.log.info(this.msgDiag.getMessaggio_replaceKeywords("controlloInCorso"));
		long startControlloRepositoryMessaggi = DateManager.getTimeMillis();
		context.put(DATA_START, startControlloRepositoryMessaggi);
	}
	
	@Override
	public void logRegisteredThreads(Map<String, Object> context, int nuoviThreadsAttivati) {
		if(nuoviThreadsAttivati>0){
			this.msgDiag.addKeyword(CostantiPdD.KEY_TIMER_CONSEGNA_CONTENUTI_APPLICATIVI_NUMERO_MESSAGGI_INOLTRATI,nuoviThreadsAttivati+"");
			this.msgDiag.logPersonalizzato("ricercaMessaggiDaInoltrare");
		}
	}
	
	@Override
	public void logCheckFinished(Map<String, Object> context) {
		
		// end
		long endControlloRepositoryMessaggi = DateManager.getTimeMillis();
		long startControlloRepositoryMessaggi = (Long) context.get(DATA_START);
		long diff = (endControlloRepositoryMessaggi-startControlloRepositoryMessaggi);
		this.log.info("Controllo Repository Messaggi (Riconsegna verso ConsegnaContenutiApplicativi) terminato in "+Utilities.convertSystemTimeIntoStringMillisecondi(diff, true));

	}
	
	
	@Override
	public List<Runnable> nextRunnable(int limit) throws UtilsException{
			
		List<Runnable> returnNull = null;
		
		// Controllo che il sistema non sia andando in shutdown
		if(OpenSPCoop2Startup.contextDestroyed){
			this.logError("Rilevato sistema in shutdown");
			return returnNull;
		}

		// Controllo che l'inizializzazione corretta delle risorse sia effettuata
		if(!OpenSPCoop2Startup.initialize){
			this.msgDiag.logFatalError("inizializzazione di OpenSPCoop non effettuata", "Check Inizializzazione");
			String msgErrore = "Riscontrato errore: inizializzazione del Timer o di OpenSPCoop non effettuata";
			this.logError(msgErrore);
			throw new UtilsException(msgErrore);
		}

		// Controllo risorse di sistema disponibili
		if( !TimerMonitoraggioRisorseThread.isRisorseDisponibili()){
			this.logError("Risorse di sistema non disponibili: "+TimerMonitoraggioRisorseThread.getRisorsaNonDisponibile().getMessage(),TimerMonitoraggioRisorseThread.getRisorsaNonDisponibile());
			return returnNull;
		}
		if( !MsgDiagnostico.gestoreDiagnosticaDisponibile){
			this.logError("Sistema di diagnostica non disponibile: "+MsgDiagnostico.motivoMalfunzionamentoDiagnostici.getMessage(),MsgDiagnostico.motivoMalfunzionamentoDiagnostici);
			return returnNull;
		}
		
		// Controllo che il timer non sia stato momentaneamente disabilitato
		if(!TimerState.ENABLED.equals(STATE)) {
			this.msgDiag.logPersonalizzato("disabilitato");
			this.log.info(this.msgDiag.getMessaggio_replaceKeywords("disabilitato"));
			return returnNull;
		}
		
		OpenSPCoopStateful openspcoopstateGestore = new OpenSPCoopStateful();
		try {

			this.logDebug("Inizializzazione connessione al db per ricercare nuovi threads da attivare (limit: "+limit+") ...");
			openspcoopstateGestore.initResource(this.propertiesReader.getIdentitaPortaDefaultWithoutProtocol(),TimerConsegnaContenutiApplicativiThread.ID_MODULO, "nextRunnable",
					OpenSPCoopStateDBManager.smistatoreMessaggiPresiInCarico);
			Connection connectionDB = ((StateMessage)openspcoopstateGestore.getStatoRichiesta()).getConnectionDB();

			boolean verificaPresenzaMessaggiDaRispedire = false;
			boolean calcolaDataMinimaMessaggiRispedire = false;
			Integer secondiAnzianitaPerIniziareSpedireNuovoMessaggio = this.configurazioneCoda.getScheduleMessageAfter();
			if(this.lastCheckMessaggiDaRispedire==null) {
				this.lastCheckMessaggiDaRispedire=DateManager.getDate();
				verificaPresenzaMessaggiDaRispedire = true;
			}
			else {
				Date expired = new Date(DateManager.getTimeMillis()-(1000*this.configurazioneCoda.getNextMessages_consegnaFallita_intervalloControllo()));
				this.logDebug("("+this.configurazioneCoda.getName()+") Verifica check messaggi da spedire previsto quando ultimo chek '"+
						org.openspcoop2.utils.date.DateUtils.getSimpleDateFormatMs().format(this.lastCheckMessaggiDaRispedire)+"' < '"+org.openspcoop2.utils.date.DateUtils.getSimpleDateFormatMs().format(expired)+"' ...");
				if(this.lastCheckMessaggiDaRispedire.before(expired)) {
					verificaPresenzaMessaggiDaRispedire = true;
					this.lastCheckMessaggiDaRispedire=DateManager.getDate();
				}
			}
			if(verificaPresenzaMessaggiDaRispedire) {
				calcolaDataMinimaMessaggiRispedire = this.configurazioneCoda.isNextMessages_consegnaFallita_calcolaDataMinimaRiconsegna();
			}
			this.logDebug("("+this.configurazioneCoda.getName()+") verificaPresenzaMessaggiDaRispedire:"+verificaPresenzaMessaggiDaRispedire+" calcolaDataMinimaMessaggiRispedire:"+calcolaDataMinimaMessaggiRispedire);

			
			// GestoreMessaggi da Ricercare
			GestoreMessaggi gestoreMsgSearch = new GestoreMessaggi(openspcoopstateGestore, true,this.logSql.getLog(),this.msgDiag, null);
			
			Date now = DateManager.getDate();
			List<MessaggioServizioApplicativo> msgDaRiconsegnareINBOX = new ArrayList<>();
			
			// APPLICATIVI PRIORITARI
			List<String> serviziApplicativiPrioritari = this.configurazionePdDReader.getServiziApplicativiConsegnaNotifichePrioritarie(this.configurazioneCoda.getName());
			if(serviziApplicativiPrioritari!=null && !serviziApplicativiPrioritari.isEmpty()) {
				
				String prefix = "[Applicativi Prioritari] ";
				String causale = prefix+"Messaggi da riconsegnare verso il modulo ConsegnaContenutiApplicativi";
				try {
					this.logDebug(prefix+"Acquisizione lock per ricercare nuovi threads da attivare (limit: "+limit+") ...");
					this.lock(connectionDB, causale);
					
					this.logDebug(prefix+"Lock acquisito, ricerca nuovi threads da attivare (limit: "+limit+") ...");
					List<MessaggioServizioApplicativo> msgDaRiconsegnareINBOX_priorita = 
							gestoreMsgSearch.readMessaggiDaRiconsegnareIntoBoxByServiziApplicativPrioritari(limit,
									verificaPresenzaMessaggiDaRispedire, calcolaDataMinimaMessaggiRispedire,secondiAnzianitaPerIniziareSpedireNuovoMessaggio,
									now,
									this.propertiesReader.getTimerConsegnaContenutiApplicativiPresaInConsegnaMaxLife(),
									this.debug,this.logSql,
									this.configurazioneCoda.getName(),
									serviziApplicativiPrioritari.toArray(new String[1]));
					
					if(msgDaRiconsegnareINBOX_priorita!=null && !msgDaRiconsegnareINBOX_priorita.isEmpty()) {
						this.logDebug(prefix+"Ricerca nuovi threads da attivare terminata (limit: "+limit+"); prendo in carico "+
								msgDaRiconsegnareINBOX_priorita.size()+" messaggi ...");
						
						for (MessaggioServizioApplicativo messaggioServizioApplicativo : msgDaRiconsegnareINBOX_priorita) {
							GestoreMessaggi messaggioDaInviare = new GestoreMessaggi(openspcoopstateGestore,true,messaggioServizioApplicativo.getIdMessaggio(),Costanti.INBOX,
									this.log.getLog(),this.msgDiag,null);
							messaggioDaInviare.updateMessaggioPresaInCosegna(messaggioServizioApplicativo.getServizioApplicativo(), 
									this.clusterId, this.debug, this.logSql);
							
							msgDaRiconsegnareINBOX.add(messaggioServizioApplicativo);
						}
						
						this.logDebug(prefix+"Presa in carico "+msgDaRiconsegnareINBOX_priorita.size()+" messaggi terminata, rilascio lock...");
					}
					else {
						this.logDebug(prefix+"Ricerca nuovi threads da attivare terminata (limit: "+limit+"); non sono presenti messaggi, rilascio lock...");
					}
					
				}finally{
					this.releaseLock(connectionDB, causale);
				}
				this.logDebug(prefix+"Lock rilasciato");
			}
			
			int finestraAncoraDisponibileDopoApplicativiPrioritari = limit - msgDaRiconsegnareINBOX.size();
						
			if(finestraAncoraDisponibileDopoApplicativiPrioritari>0) {
				for (ConfigurazionePriorita configurazionePriorita : this.configurazioniPriorita) {
	
					String prefix = "[P-"+configurazionePriorita.getLabel()+"] ";
					
					int limitPriorita = 0;
					if(configurazionePriorita.isNessunaPriorita()) {
						limitPriorita = limit - msgDaRiconsegnareINBOX.size();
						this.logDebug(prefix+"Calcolo limit; cerco entries rimaste senza guardare la priorità ("+limitPriorita+") ...");
					}
					else {
						limitPriorita = (finestraAncoraDisponibileDopoApplicativiPrioritari * configurazionePriorita.getPercentuale()) / 100;
						this.logDebug(prefix+"Calcolo limit; "+configurazionePriorita.getPercentuale()+"% di "+finestraAncoraDisponibileDopoApplicativiPrioritari+": "+limitPriorita+" ...");
					}
					if(limitPriorita<=0) {
						this.logDebug(prefix+"Per la priorità non è necessario cercare alcun messaggio");
						continue;
					}
					
					String causale = prefix+"Messaggi da riconsegnare verso il modulo ConsegnaContenutiApplicativi";
					try {
						this.logDebug(prefix+"Acquisizione lock per ricercare nuovi threads da attivare (limit: "+limitPriorita+") ...");
						this.lock(connectionDB, causale);
						
						this.logDebug(prefix+"Lock acquisito, ricerca nuovi threads da attivare (limit: "+limitPriorita+") ...");
						List<MessaggioServizioApplicativo> msgDaRiconsegnareINBOXpriorita = 
								gestoreMsgSearch.readMessaggiDaRiconsegnareIntoBoxByPriorita(limitPriorita,
										verificaPresenzaMessaggiDaRispedire, calcolaDataMinimaMessaggiRispedire, secondiAnzianitaPerIniziareSpedireNuovoMessaggio,
										now,
										this.propertiesReader.getTimerConsegnaContenutiApplicativiPresaInConsegnaMaxLife(),
										this.debug,this.logSql,
										this.configurazioneCoda.getName(),
										configurazionePriorita.isNessunaPriorita() ? null : configurazionePriorita.getName());
						
						if(msgDaRiconsegnareINBOXpriorita!=null && !msgDaRiconsegnareINBOXpriorita.isEmpty()) {
							this.logDebug(prefix+"Ricerca nuovi threads da attivare terminata (limit: "+limitPriorita+"); prendo in carico "+
									msgDaRiconsegnareINBOXpriorita.size()+" messaggi ...");
							
							// La finestra non deve essere ricalcolata
							/**finestraAncoraDisponibileDopoApplicativiPrioritari = finestraAncoraDisponibileDopoApplicativiPrioritari - msgDaRiconsegnareINBOX_priorita.size();*/
							
							for (MessaggioServizioApplicativo messaggioServizioApplicativo : msgDaRiconsegnareINBOXpriorita) {
								GestoreMessaggi messaggioDaInviare = new GestoreMessaggi(openspcoopstateGestore,true,messaggioServizioApplicativo.getIdMessaggio(),Costanti.INBOX,
										this.log.getLog(),this.msgDiag,null);
								messaggioDaInviare.updateMessaggioPresaInCosegna(messaggioServizioApplicativo.getServizioApplicativo(), 
										this.clusterId, this.debug, this.logSql);
								
								msgDaRiconsegnareINBOX.add(messaggioServizioApplicativo);
							}
							
							this.logDebug(prefix+"Presa in carico "+msgDaRiconsegnareINBOXpriorita.size()+" messaggi terminata, rilascio lock...");
						}
						else {
							this.logDebug(prefix+"Ricerca nuovi threads da attivare terminata (limit: "+limitPriorita+"); non sono presenti messaggi, rilascio lock...");
						}
						
					}finally{
						this.releaseLock(connectionDB, causale);
					}
					this.logDebug(prefix+"Lock rilasciato");
					
				}
			}
			
			this.logDebug("Creazione Runnable ...");
			List<Runnable> listRunnable = null;
			if(msgDaRiconsegnareINBOX!=null && !msgDaRiconsegnareINBOX.isEmpty()) {
				listRunnable = new ArrayList<>();
				for (MessaggioServizioApplicativo messaggioServizioApplicativo : msgDaRiconsegnareINBOX) {
					TimerConsegnaContenutiApplicativiSender sender = 
							new TimerConsegnaContenutiApplicativiSender(messaggioServizioApplicativo, 
									this.registroServiziReader, this.configurazionePdDReader, 
									this.clusterId, this.configurazioneCoda);
					listRunnable.add(new Runnable(sender, -1));
				}
			}
			this.logDebug("Creazione Runnable terminata");
			
			return listRunnable;
		} 
		catch(TimerLockNotAvailableException t) {
			// msg diagnostico emesso durante l'emissione dell'eccezione
			this.log.info(t.getMessage(),t);
		}
		catch (Exception e) {
			this.msgDiag.logErroreGenerico(e,"GestioneMessaggiRiconsegnaConsegnaContenutiApplicativi");
			this.logError("Riscontrato errore durante la gestione del repository dei messaggi (Riconsegna verso ConsegnaContenutiApplicativi): "+ e.getMessage(),e);
		}finally{
			if(openspcoopstateGestore!=null)
				openspcoopstateGestore.releaseResource();
		}
		
		return returnNull;
	}

	private void lock(Connection connectionDB, String causale) throws UtilsException, TimerLockNotAvailableException {
		GestoreMessaggi.acquireLock(
				this.semaphore, connectionDB, this.timerLock,
				this.msgDiag, causale, 
				this.propertiesReader.getTimerConsegnaContenutiApplicativi_getLockAttesaAttiva(), 
				this.propertiesReader.getTimerConsegnaContenutiApplicativi_getLockCheckInterval());
	}
	private void releaseLock(Connection connectionDB, String causale) {
		try{
			GestoreMessaggi.releaseLock(
					this.semaphore, connectionDB, this.timerLock,
					this.msgDiag, causale);
		}catch(Exception e){
			// ignore
		}
	}
	
	private void logDebug(String msg) {
		this.logDebug(msg, null);
	}
	private void logDebug(String msg, Throwable e) {
		if(e!=null) {
			this.log.debug(getPrefix()+msg, e);
		}
		else {
			this.log.debug(getPrefix()+msg);
		}
	}
	
	private void logError(String msg) {
		this.logError(msg, null);
	}
	private void logError(String msg, Throwable e) {
		if(e!=null) {
			this.log.error(getPrefix()+msg, e);
		}
		else {
			this.log.error(getPrefix()+msg);
		}
	}
	
	private String getPrefix() {
		if(this.configurazioneCoda!=null) {
			return "["+this.configurazioneCoda.getName()+"] ";
		}
		return "";
	}
}