TimerGestorePuliziaMessaggiAnomaliLib.java

/*
 * GovWay - A customizable API Gateway 
 * https://govway.org
 * 
 * Copyright (c) 2005-2024 Link.it srl (https://link.it). 
 * 
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU General Public License version 3, as published by
 * the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
 *
 */



package org.openspcoop2.pdd.timers;

import java.sql.Connection;
import java.util.ArrayList;
import java.util.List;

import org.openspcoop2.pdd.config.OpenSPCoop2Properties;
import org.openspcoop2.pdd.core.CostantiPdD;
import org.openspcoop2.pdd.core.GestoreMessaggi;
import org.openspcoop2.pdd.core.state.OpenSPCoopStateful;
import org.openspcoop2.pdd.logger.MsgDiagnostico;
import org.openspcoop2.pdd.services.OpenSPCoop2Startup;
import org.openspcoop2.protocol.engine.constants.Costanti;
import org.openspcoop2.protocol.engine.driver.RollbackRepositoryBuste;
import org.openspcoop2.protocol.sdk.state.StateMessage;
import org.openspcoop2.utils.TipiDatabase;
import org.openspcoop2.utils.Utilities;
import org.openspcoop2.utils.date.DateManager;
import org.openspcoop2.utils.id.serial.InfoStatistics;
import org.openspcoop2.utils.semaphore.Semaphore;
import org.openspcoop2.utils.semaphore.SemaphoreConfiguration;
import org.openspcoop2.utils.semaphore.SemaphoreMapping;
import org.slf4j.Logger;

/**
 * Implementazione dell'interfaccia {@link TimerGestorePuliziaMessaggiAnomali} del Gestore
 * dei threads di servizio di OpenSPCoop.
 * 
 *  
 * @author Poli Andrea (apoli@link.it)
 * @author $Author$
 * @version $Rev$, $Date$
 */

public class TimerGestorePuliziaMessaggiAnomaliLib{

	private static TimerState STATE = TimerState.OFF; // abilitato in OpenSPCoop2Startup al momento dell'avvio
	
	public static TimerState getSTATE() {
		return STATE;
	}
	public static void setSTATE(TimerState sTATE) {
		STATE = sTATE;
	}

	private MsgDiagnostico msgDiag = null;
	private Logger logTimer = null;
	private OpenSPCoop2Properties propertiesReader = null;
	private boolean logQuery = false;
	private int limit = CostantiPdD.LIMIT_MESSAGGI_GESTORI;
	private boolean orderByQuery;

	private TimerLock timerLock = null;

	/** Semaforo */
	private Semaphore semaphore = null;
	private InfoStatistics semaphore_statistics;

	public TimerGestorePuliziaMessaggiAnomaliLib(MsgDiagnostico msgDiag,Logger log,OpenSPCoop2Properties p,boolean logQuery,
			int limit,boolean orderByQuery) throws TimerException{
		this.msgDiag = msgDiag;
		this.logTimer = log;
		this.propertiesReader = p;
		this.logQuery = logQuery;
		this.limit = limit;
		this.orderByQuery = orderByQuery;
		
		this.timerLock = new TimerLock(TipoLock.GESTIONE_PULIZIA_MESSAGGI_ANOMALI);

		if(this.propertiesReader.isTimerLockByDatabase()) {
			this.semaphore_statistics = new InfoStatistics();

			SemaphoreConfiguration config = GestoreMessaggi.newSemaphoreConfiguration(this.propertiesReader.getTimerGestorePuliziaMessaggiAnomaliLockMaxLife(), 
					this.propertiesReader.getTimerGestorePuliziaMessaggiAnomaliLockIdleTime());

			TipiDatabase databaseType = TipiDatabase.toEnumConstant(this.propertiesReader.getDatabaseType());
			try {
				this.semaphore = new Semaphore(this.semaphore_statistics, SemaphoreMapping.newInstance(this.timerLock.getIdLock()), 
						config, databaseType, this.logTimer);
			}catch(Exception e) {
				throw new TimerException(e.getMessage(),e);
			}
		}
	}

	public void check() throws TimerException {

		// Controllo che il sistema non sia andando in shutdown
		if(OpenSPCoop2Startup.contextDestroyed){
			this.logTimer.error("["+TimerGestorePuliziaMessaggiAnomali.ID_MODULO+"] Rilevato sistema in shutdown");
			return;
		}

		// Controllo che l'inizializzazione corretta delle risorse sia effettuata
		if(!OpenSPCoop2Startup.initialize){
			this.msgDiag.logFatalError("inizializzazione di OpenSPCoop non effettuata", "Check Inizializzazione");
			String msgErrore = "Riscontrato errore: inizializzazione del Timer o di OpenSPCoop non effettuata";
			this.logTimer.error(msgErrore);
			throw new TimerException(msgErrore);
		}

		// Controllo risorse di sistema disponibili
		if( !TimerMonitoraggioRisorseThread.isRisorseDisponibili()){
			this.logTimer.error("["+TimerGestorePuliziaMessaggiAnomali.ID_MODULO+"] Risorse di sistema non disponibili: "+TimerMonitoraggioRisorseThread.getRisorsaNonDisponibile().getMessage(),TimerMonitoraggioRisorseThread.getRisorsaNonDisponibile());
			return;
		}
		if( !MsgDiagnostico.gestoreDiagnosticaDisponibile){
			this.logTimer.error("["+TimerGestorePuliziaMessaggiAnomali.ID_MODULO+"] Sistema di diagnostica non disponibile: "+MsgDiagnostico.motivoMalfunzionamentoDiagnostici.getMessage(),MsgDiagnostico.motivoMalfunzionamentoDiagnostici);
			return;
		}
		
		// Controllo che il timer non sia stato momentaneamente disabilitato
		if(!TimerState.ENABLED.equals(STATE)) {
			this.msgDiag.logPersonalizzato("disabilitato");
			this.logTimer.info(this.msgDiag.getMessaggio_replaceKeywords("disabilitato"));
			return;
		}


		this.msgDiag.logPersonalizzato("controlloInCorso");
		this.logTimer.info(this.msgDiag.getMessaggio_replaceKeywords("controlloInCorso"));
		long startControlloRepositoryMessaggi = DateManager.getTimeMillis();


		RollbackRepositoryBuste rollbackMessaggio = null;

		OpenSPCoopStateful openspcoopstate = new OpenSPCoopStateful();

		try {

			openspcoopstate.initResource(this.propertiesReader.getIdentitaPortaDefaultWithoutProtocol(), TimerGestorePuliziaMessaggiAnomali.ID_MODULO, null);
			Connection connectionDB = ((StateMessage)openspcoopstate.getStatoRichiesta()).getConnectionDB();
			

			// Messaggi da eliminare 
			GestoreMessaggi gestoreMsgSearch = new GestoreMessaggi(openspcoopstate, true,this.logTimer, this.msgDiag,null);
			String causaMessaggiDaRipulire = "Eliminazione buste non più riferite da messaggi";
			List<String> busteInutiliINBOX = null;
			List<String> busteInutiliOUTBOX = null;
			try{
				GestoreMessaggi.acquireLock(
						this.semaphore, connectionDB, this.timerLock,
						this.msgDiag, causaMessaggiDaRipulire, 
						this.propertiesReader.getTimerGestorePuliziaMessaggiAnomali_getLockAttesaAttiva(), 
						this.propertiesReader.getTimerGestorePuliziaMessaggiAnomali_getLockCheckInterval());
				
				busteInutiliINBOX = gestoreMsgSearch.readBusteNonRiferiteDaMessaggiFromInBox(this.limit,this.logQuery,
						this.propertiesReader.isForceIndex(),this.orderByQuery);
				busteInutiliOUTBOX = gestoreMsgSearch.readBusteNonRiferiteDaMessaggiFromOutBox(this.limit,this.logQuery,
						this.propertiesReader.isForceIndex(),this.orderByQuery);
	
				if(this.logQuery){
					if( ! (busteInutiliINBOX.size()>0 ||   busteInutiliOUTBOX.size() > 0)){
						this.logTimer.info("Non sono stati trovati messaggi anomali");
					}
				}
	
	
				while( busteInutiliINBOX.size()>0 ||
						busteInutiliOUTBOX.size() > 0){
	
					this.msgDiag.addKeyword(CostantiPdD.KEY_TIMER_GESTORE_MESSAGGI_INCONSISTENTI_NUM_MSG_INBOX, busteInutiliINBOX.size()+"");
					this.msgDiag.addKeyword(CostantiPdD.KEY_TIMER_GESTORE_MESSAGGI_INCONSISTENTI_NUM_MSG_OUTBOX, busteInutiliOUTBOX.size()+"");
	
					this.msgDiag.logPersonalizzato("ricercaMessaggiDaEliminare");
	
	
					// Buste non piu' riferite da messaggi
					// Eliminazione Messaggi from INBOX
					if(this.logQuery)
						this.logTimer.info("Trovate "+busteInutiliINBOX.size()+" buste non piu' riferita da messaggi da eliminare (INBOX) ...");
					this.msgDiag.addKeyword(CostantiPdD.KEY_TIPO_MESSAGGIO,Costanti.INBOX);
					for(int i=0; i<busteInutiliINBOX.size(); i++){
	
						String idMsgDaEliminare = busteInutiliINBOX.get(i);
						this.msgDiag.addKeyword(CostantiPdD.KEY_ID_MESSAGGIO_DA_ELIMINARE,idMsgDaEliminare);
	
						try{
							GestoreMessaggi.updateLock(
									this.semaphore, connectionDB, this.timerLock,
									this.msgDiag, "Eliminazione busta anomala in INBOX con id ["+idMsgDaEliminare+"] ...");
						}catch(Throwable e){
							this.msgDiag.logErroreGenerico(e,"EliminazioneBustaInbox("+idMsgDaEliminare+")-UpdateLock");
							this.logTimer.error("ErroreEliminazioneBustaInbox("+idMsgDaEliminare+")-UpdateLock: "+e.getMessage(),e);
							break;
						}
						
						try{
							//	rollback messaggio scaduto (eventuale profilo + accesso_pdd)
							rollbackMessaggio = new RollbackRepositoryBuste(idMsgDaEliminare,openspcoopstate.getStatoRichiesta(),true);
							rollbackMessaggio.rollbackBustaIntoInBox(false); //l'history viene eliminato con la scadenza della busta
							((StateMessage)openspcoopstate.getStatoRichiesta()).executePreparedStatement();
	
							this.msgDiag.logPersonalizzato("eliminazioneMessaggio");
							if(this.logQuery){
								this.logTimer.debug(this.msgDiag.getMessaggio_replaceKeywords("eliminazioneMessaggio"));
							}
	
						}catch(Exception e){
							if(rollbackMessaggio!=null){
								((StateMessage)openspcoopstate.getStatoRichiesta()).closePreparedStatement();
								rollbackMessaggio = null;
							}
							this.msgDiag.logErroreGenerico(e,"EliminazioneBustaInbox("+idMsgDaEliminare+")");
							this.logTimer.error("ErroreEliminazioneBustaInbox("+idMsgDaEliminare+"): "+e.getMessage(),e);
						}
					}
					if(this.logQuery)
						this.logTimer.info("Eliminate "+busteInutiliINBOX.size()+" buste non piu' riferita da messaggi (INBOX)");
	
	
					// Buste non piu' riferite da messaggi
					// Eliminazione Messaggi from OUTBOX
					if(this.logQuery)
						this.logTimer.info("Trovate "+busteInutiliOUTBOX.size()+" buste non piu' riferita da messaggi da eliminare (OUTBOX) ...");
					this.msgDiag.addKeyword(CostantiPdD.KEY_TIPO_MESSAGGIO,Costanti.OUTBOX);
					for(int i=0; i<busteInutiliOUTBOX.size(); i++){
	
						String idMsgDaEliminare = busteInutiliOUTBOX.get(i);
						this.msgDiag.addKeyword(CostantiPdD.KEY_ID_MESSAGGIO_DA_ELIMINARE,idMsgDaEliminare);
	
						try{
							GestoreMessaggi.updateLock(
									this.semaphore, connectionDB, this.timerLock,
									this.msgDiag, "Eliminazione busta anomala in OUTBOX con id ["+idMsgDaEliminare+"] ...");
						}catch(Throwable e){
							this.msgDiag.logErroreGenerico(e,"EliminazioneBustaOutbox("+idMsgDaEliminare+")-UpdateLock");
							this.logTimer.error("EliminazioneBustaOutbox("+idMsgDaEliminare+")-UpdateLock: "+e.getMessage(),e);
							break;
						}
						
						try{
							//	rollback messaggio scaduto (eventuale profilo + accesso_pdd)
							rollbackMessaggio = new RollbackRepositoryBuste(idMsgDaEliminare,openspcoopstate.getStatoRichiesta(),true);
							rollbackMessaggio.rollbackBustaIntoOutBox(false); //l'history viene eliminato con la scadenza della busta
							((StateMessage)openspcoopstate.getStatoRichiesta()).executePreparedStatement();
	
							this.msgDiag.logPersonalizzato("eliminazioneMessaggio");
							if(this.logQuery){
								this.logTimer.debug(this.msgDiag.getMessaggio_replaceKeywords("eliminazioneMessaggio"));
							}
	
						}catch(Exception e){
							if(rollbackMessaggio!=null){
								((StateMessage)openspcoopstate.getStatoRichiesta()).closePreparedStatement();
								rollbackMessaggio = null;
							}
							this.msgDiag.logErroreGenerico(e,"EliminazioneBustaOutbox("+idMsgDaEliminare+")");
							this.logTimer.error("EliminazioneBustaOutbox("+idMsgDaEliminare+"): "+e.getMessage(),e);
						}
					}
					if(this.logQuery)
						this.logTimer.info("Eliminate "+busteInutiliOUTBOX.size()+" buste non piu' riferita da messaggi (OUTBOX)");
	
					boolean cerca = true;
					try{
						GestoreMessaggi.updateLock(
								this.semaphore, connectionDB, this.timerLock,
								this.msgDiag, "Ricerca nuove buste non più riferite da messaggi ...");						
					}catch(Throwable e){
						this.msgDiag.logErroreGenerico(e,"RicercaNuoveBusteOneWayNonRiscontrate-UpdateLock");
						this.logTimer.error("ErroreRicercaNuoveBusteOneWayNonRiscontrate-UpdateLock: "+e.getMessage(),e);
						cerca = false;
					}
					
					// Check altri riscontri da inviare
					if(cerca) {
						// Check altri messaggi da eliminare
						busteInutiliINBOX = gestoreMsgSearch.readBusteNonRiferiteDaMessaggiFromInBox(this.limit,this.logQuery,
								this.propertiesReader.isForceIndex(),this.orderByQuery);
						busteInutiliOUTBOX = gestoreMsgSearch.readBusteNonRiferiteDaMessaggiFromOutBox(this.limit,this.logQuery,
								this.propertiesReader.isForceIndex(),this.orderByQuery);
					}
					else {
						busteInutiliINBOX = new ArrayList<>(); // per uscire dal while
						busteInutiliOUTBOX = new ArrayList<>(); // per uscire dal while
					}
	
				}
			}finally{
				try{
					GestoreMessaggi.releaseLock(
							this.semaphore, connectionDB, this.timerLock,
							this.msgDiag, causaMessaggiDaRipulire);
				}catch(Exception e){
					// ignore
				}
			}
				

			// end
			long endControlloRepositoryMessaggi = DateManager.getTimeMillis();
			long diff = (endControlloRepositoryMessaggi-startControlloRepositoryMessaggi);
			this.logTimer.info("Pulizia Messaggi Anomali terminata in "+Utilities.convertSystemTimeIntoStringMillisecondi(diff, true));

		} 
		catch(TimerLockNotAvailableException t) {
			// msg diagnostico emesso durante l'emissione dell'eccezione
			this.logTimer.info(t.getMessage(),t);
		}
		catch (Exception e) {
			this.msgDiag.logErroreGenerico(e,"GestioneMessaggiInconsistenti");
			this.logTimer.error("Riscontrato errore durante la pulizia dei messaggi anomali: "+e.getMessage(),e);
		}finally{
			if(openspcoopstate!=null)
				openspcoopstate.releaseResource();
		}
	}

}