SmistatoreThread.java

/*
 * GovWay - A customizable API Gateway 
 * https://govway.org
 * 
 * Copyright (c) 2005-2024 Link.it srl (https://link.it). 
 * 
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU General Public License version 3, as published by
 * the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
 *
 */


package org.openspcoop2.web.ctrlstat.gestori;

import java.sql.Connection;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import javax.jms.JMSException;
import javax.jms.ObjectMessage;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueReceiver;
import javax.jms.QueueSession;
import javax.naming.InitialContext;

import org.openspcoop2.core.registry.constants.PddTipologia;
import org.openspcoop2.pdd.config.OpenSPCoop2ConfigurationException;
import org.openspcoop2.utils.Utilities;
import org.openspcoop2.utils.transport.jms.ExceptionListenerJMS;
import org.openspcoop2.web.ctrlstat.config.ConsoleProperties;
import org.openspcoop2.web.ctrlstat.config.DatasourceProperties;
import org.openspcoop2.web.ctrlstat.core.ControlStationLogger;
import org.openspcoop2.web.ctrlstat.core.DBManager;
import org.openspcoop2.web.ctrlstat.core.OperazioneDaSmistare;
import org.openspcoop2.web.ctrlstat.costanti.CostantiControlStation;
import org.openspcoop2.web.ctrlstat.costanti.OperationsParameter;
import org.openspcoop2.web.ctrlstat.costanti.TipoOggettoDaSmistare;
import org.openspcoop2.web.ctrlstat.dao.PdDControlStation;
import org.openspcoop2.web.ctrlstat.servlet.pdd.PddCore;
import org.openspcoop2.web.lib.queue.ClassQueue;
import org.openspcoop2.web.lib.queue.QueueOperation;
import org.openspcoop2.web.lib.queue.QueueParameter;
import org.openspcoop2.web.lib.queue.config.QueueProperties;
import org.openspcoop2.web.lib.queue.costanti.Operazione;
import org.openspcoop2.web.lib.queue.costanti.TipoOperazione;
import org.slf4j.Logger;

/**
 * SmistatoreThread
 * 
 * @author Andrea Poli (apoli@link.it)
 * @author Stefano Corallo (corallo@link.it)
 * @author Sandra Giangrandi (sandra@link.it)
 * @author $Author$
 * @version $Rev$, $Date$
 * 
 */
public class SmistatoreThread extends Thread {

	/** Logger utilizzato per debug. */
	private static Logger log = null;
	
	/** run */
	private boolean stop = false;
	private boolean isRunning = false;
	public boolean isRunning() {
		return this.isRunning;
	}

	private DBManager dbm;
	private Connection con;

	private ExceptionListenerJMS exceptionListenerJMS = new ExceptionListenerJMS();

	private ConsoleProperties consoleProperties;
	
	private QueueProperties queueProperties;
	
	private DatasourceProperties datasourceProperties;
	
	/** Costruttore 
	 * @throws OpenSPCoop2ConfigurationException */
	public SmistatoreThread() throws OpenSPCoop2ConfigurationException {

		// configuro il logger
		SmistatoreThread.log = ControlStationLogger.getSmistatoreLogger();

		this.dbm = DBManager.getInstance();
		
		this.consoleProperties = ConsoleProperties.getInstance();
		
		this.queueProperties = QueueProperties.getInstance();
		
		this.datasourceProperties = DatasourceProperties.getInstance();
	}

	/**
	 * Metodo che fa partire il Thread.
	 * 
	 * @since 0.4
	 */
	@Override
	public void run() {

		this.isRunning = true;
		
		// Controllo se dbmanager inizializzato
		if (!DBManager.isInitialized()) {
			SmistatoreThread.log.info("Inizializzazione di " + this.getClass().getSimpleName() + " non riuscito perche' DBManager non INIZIALIZZATO");
			SmistatoreThread.log.info(this.getClass().getName() + " Non AVVIATO!");
			return;
		}

		String jmsConnectionFactory = null;
		Properties jmsConnectionFactoryContext = null;
		
		String smistatoreQueue = null;
		String registroServiziQueue = null;
		String gestoreEventiQueue = null;
		String pddQueuePrefix = null;
		
		boolean enginePDD = false;
		boolean engineRegistro = false;
		boolean engineGestoreEventi = false;
		
		boolean singlePdD = true;
		String tipoDatabase = null;
		
		try{
			// Leggo le informazioni da queue.properties
			jmsConnectionFactory = this.queueProperties.getConnectionFactory();
			jmsConnectionFactoryContext = this.queueProperties.getConnectionFactoryContext();
			
			// Leggo le informazioni da console.properties
			
			// nomi code
			smistatoreQueue = this.consoleProperties.getGestioneCentralizzataNomeCodaSmistatore();
			registroServiziQueue = this.consoleProperties.getGestioneCentralizzataNomeCodaRegistroServizi();
			gestoreEventiQueue = this.consoleProperties.getGestioneCentralizzataNomeCodaGestoreEventi();
			pddQueuePrefix = this.consoleProperties.getGestioneCentralizzataPrefissoNomeCodaConfigurazionePdd();
			
			// Abilitazione Engine
			enginePDD = this.consoleProperties.isGestioneCentralizzataSincronizzazionePdd();
			engineRegistro = this.consoleProperties.isGestioneCentralizzataSincronizzazioneRegistro();
			engineGestoreEventi = this.consoleProperties.isGestioneCentralizzataSincronizzazioneGestoreEventi();
			
			// Altre informazioni
			singlePdD = this.consoleProperties.isSinglePdD();
			
			// Database Info
			tipoDatabase = this.datasourceProperties.getTipoDatabase();
			
		}catch(Exception e){
			SmistatoreThread.log.info("Smistatore non avviato, sono stati rilevati errori durante la lettura delle configurazione: "+e.getMessage(),e);
			return;
		}
		
		if (singlePdD) {
			SmistatoreThread.log.info("Smistatore non avviato: govwayConsole avviata in singlePdD mode.");
			return;
		}

		// Configurazione JMS
		SmistatoreThread.log.debug("Smistatore: Avvio Servizio di Gestione Operazioni, Registro[" + engineRegistro + "] Pdd[" + enginePDD + "] GestoreEventi[" + engineGestoreEventi + "]");
		QueueReceiver receiver = null;
		Queue queue = null;
		QueueConnectionFactory qcf = null;
		QueueConnection qc = null;
		QueueSession qs = null;
		boolean trovato = false;
		int i = 0;
		SmistatoreThread.log.debug("Smistatore: Inizializzazione Receiver ...");
		while (!trovato && (i < 600000)) {
			try {
				InitialContext ctx = new InitialContext(jmsConnectionFactoryContext);
				queue = (Queue) ctx.lookup(smistatoreQueue);
				qcf = (QueueConnectionFactory) ctx.lookup(jmsConnectionFactory);
				qc = qcf.createQueueConnection();
				qc.setExceptionListener(this.exceptionListenerJMS);
				qs = qc.createQueueSession(true, -1);
				receiver = qs.createReceiver(queue);
				qc.start();
				ctx.close();
				SmistatoreThread.log.debug("Smistatore: Inizializzazione Receiver effettuata.");
				trovato = true;
			} catch (Exception e) {
				i = i + 10000;
				Utilities.sleep(10000);
			}
		}

		if (!trovato) {
			SmistatoreThread.log.error("Smistatore: Inizializzazione Receiver non effettuata");
			return;
		}

		// Avvio Gestione Operazioni
		boolean riconnessioneConErrore = false;
		while (this.stop == false) {

			try {

				// riconnessione precedente non riuscita.....
				if (riconnessioneConErrore) {
					throw new JMSException("RiconnessioneJMS non riuscita...");
				}
				// Controllo ExceptionListenerJMS
				if (this.exceptionListenerJMS.isConnessioneCorrotta()) {
					SmistatoreThread.log.error("ExceptionJMSListener ha rilevato una connessione jms corrotta", this.exceptionListenerJMS.getException());
					throw new JMSException("ExceptionJMSListener ha rilevato una connessione jms corrotta: " + this.exceptionListenerJMS.getException().getMessage());
				}

				SmistatoreThread.log.info("Smistatore: Ricezione operazione...");
				ObjectMessage richiesta = null;
				while (this.stop == false) {
					richiesta = (ObjectMessage) receiver.receive(CostantiControlStation.INTERVALLO_RECEIVE);
					if (richiesta != null) {
						break;
					}
				}
				if (this.stop == true) {
					break;
				}

				// Ricezione Operazione
				OperazioneDaSmistare operazione = null;
				try {
					operazione = (OperazioneDaSmistare) richiesta.getObject();
				} catch (Exception e) {
					SmistatoreThread.log.error("Smistatore: Ricevuta richiesta con tipo errato:" + e.toString());
					qs.commit();
					continue;
				}

				String idOperazione = richiesta.getStringProperty("ID");

				SmistatoreThread.log.info(CostantiControlStation.OPERATIONS_DELIMITER+"Smistatore: Ricevuta richiesta di operazione con ID: " + idOperazione);
				SmistatoreThread.log.debug("Smistatore: Dati operazione ricevuta idTab[" + operazione.getIDTable() + "] operazione[" + operazione.getOperazione() + "] pdd[" + operazione.getPdd() + "] oggetto[" + operazione.getOggetto() + "]");

				if ((operazione.getOperazione() == null)) {
					SmistatoreThread.log.error("Smistatore: Ricevuta richiesta con parametri scorretti.");
					qs.commit();
					continue;
				}

				if ((Operazione.change.equals(operazione.getOperazione()) == false) && (Operazione.add.equals(operazione.getOperazione()) == false) && (Operazione.del.equals(operazione.getOperazione()) == false)) {
					SmistatoreThread.log.error("Smistatore: Operazione [" + operazione.getOperazione() + "] non supportata dal gestore");
					qs.commit();
					continue;
				}

				if ( (operazione.getOggetto() == null) || (operazione.getIDTable() < 0)) {
					SmistatoreThread.log.error("Smistatore: Ricevuta richiesta con parametri scorretti.");
					qs.commit();
					continue;
				}

				// Guardo che tipo di operazione ho in coda...
				// Se e' un'operazione per il registro, la metto nella coda
				// OperazioniGestoreRegistroServizi
				// Se e' un'operazione per la pdd, la metto nella coda del pdd
				// interessato

				this.con = this.dbm.getConnection();

				Operazione operazioneTipologia = operazione.getOperazione();
				String su = operazione.getSuperuser();
				TipoOggettoDaSmistare tipoOggettoDaSmistare = operazione.getOggetto();
				String pdd = operazione.getPdd();

				// Preparo un oggetto di tipo PetraOperation e poi chiamo
				// la insertQueue
				QueueOperation queueOperation = new QueueOperation();
				queueOperation.setTipoOperazione(TipoOperazione.webService);
				queueOperation.setOperazione(operazioneTipologia);
				queueOperation.setSuperuser(su);

				// disabilito il commit
				this.con.setAutoCommit(false);

				// OggettoClassQueue
				ClassQueue cq = null;
				try {
					cq = new ClassQueue(this.con, tipoDatabase, qs);
				} catch (Exception e) {
					SmistatoreThread.log.error("Smistatore: Inizializzazione ClassQueue non effettuata: " + e.getMessage());
					qs.rollback();
					continue;
				}

				if (tipoOggettoDaSmistare != null) {
					long idTable = operazione.getIDTable();

					String filter = "[" + operazione.getIDTable() + "]";
					filter += "[" + tipoOggettoDaSmistare.name() + "]";
					filter += "[" + operazione.getOperazione() + "]";

					queueOperation.addParametro(new QueueParameter("Oggetto", tipoOggettoDaSmistare.name()));
					queueOperation.addParametro(new QueueParameter("IDTable", "" + idTable));

					Map<OperationsParameter, List<String>> params = operazione.getParameters();
					if(params!=null && !params.isEmpty()) {
						for (OperationsParameter key : params.keySet()) {
							// Per ogni parametro presente nell'operazione da smistare
							// creo un nuovo PetraParameter con nome key.getNome() e
							// valore il valore associato nella tabella
	
							List<String> values = params.get(key);
							for (String value : values) {
								queueOperation.addParametro(new QueueParameter(key.getNome(), value));
								filter += "[" + value + "]";
							}
	
						}
					}

					// Smisto l'operazione

					/* ***** REGISTRO ***** */
					
					if (tipoOggettoDaSmistare.equals(TipoOggettoDaSmistare.soggetto) || 
							tipoOggettoDaSmistare.equals(TipoOggettoDaSmistare.servizio) || 
							tipoOggettoDaSmistare.equals(TipoOggettoDaSmistare.ruolo) || 
							tipoOggettoDaSmistare.equals(TipoOggettoDaSmistare.accordo) || 
							tipoOggettoDaSmistare.equals(TipoOggettoDaSmistare.accordoCooperazione) || 
//							tipoOggettoDaSmistare.equals(TipoOggettoDaSmistare.fruitore) || 
							tipoOggettoDaSmistare.equals(TipoOggettoDaSmistare.pdd)) {
						// Operazione per il registro
						if (engineRegistro) {
							QueueOperation queueOperationRegistro = (QueueOperation) queueOperation.clone();
							queueOperationRegistro.setTipoOperazione(TipoOperazione.webService);
							if (cq.insertQueue(registroServiziQueue, queueOperationRegistro, filter) == 0) {
								SmistatoreThread.log.error("Smistatore: Si e' verificato un problema durante l'inserimento in coda.");
								qs.rollback();
								this.con.rollback();
								this.dbm.releaseConnection(this.con);
								continue;
							}
						} else {
							SmistatoreThread.log.info("Smistatore: sincronizzazione Registro Servizi non abilitata.");
						}
					}

					/* ***** PDD ***** */
					
					if ((pdd != null) && !pdd.equals("") && !pdd.equals("-") ) {
						// Operazione per il pdd
						if (enginePDD) {
							// Qualcuno avra' provveduto a creare una coda per
							// il pdd, che si chiama come il pdd stesso
							PddCore pddCore = new PddCore();
							PdDControlStation myPdd = pddCore.getPdDControlStation(pdd);
							String tipoPdd = myPdd.getTipo();

							if (PddTipologia.OPERATIVO.toString().equals(tipoPdd)) {
								QueueOperation queueOperationPdD = (QueueOperation) queueOperation.clone();
								queueOperationPdD.setTipoOperazione(TipoOperazione.webService);
								if (cq.insertQueue(pddQueuePrefix + pdd, queueOperationPdD, filter) == 0) {
									SmistatoreThread.log.error("Smistatore: Si e' verificato un problema durante l'inserimento in coda.");
									qs.rollback();
									this.con.rollback();
									this.dbm.releaseConnection(this.con);
									continue;
								}
							} else {
								SmistatoreThread.log.warn("Smistatore: Inserimento in coda non effettuato causa NAL [" + pdd + "] Tipo [" + tipoPdd + "] ");
							}
						} else {
							SmistatoreThread.log.info("Smistatore: sincronizzazione Nal non abilitata.");
						}
					}

					/* ***** Gestore Eventi ***** */
					
					if ( ((gestoreEventiQueue != null) && !gestoreEventiQueue.equals("")) && 
							(
							tipoOggettoDaSmistare.equals(TipoOggettoDaSmistare.soggetto) || 
							tipoOggettoDaSmistare.equals(TipoOggettoDaSmistare.servizio) || 
							tipoOggettoDaSmistare.equals(TipoOggettoDaSmistare.mappingFruizionePD)// || 
							//tipoOggettoDaSmistare.equals(TipoOggettoDaSmistare.fruitore) 
							)
						){
						// Operazione per il registro
						if (engineGestoreEventi) {
							QueueOperation queueOperationGestoreEventi= (QueueOperation) queueOperation.clone();
							queueOperationGestoreEventi.setTipoOperazione(TipoOperazione.webService);
							if (cq.insertQueue(gestoreEventiQueue, queueOperationGestoreEventi, filter) == 0) {
								SmistatoreThread.log.error("Smistatore: Si e' verificato un problema durante l'inserimento in coda.");
								qs.rollback();
								this.con.rollback();
								this.dbm.releaseConnection(this.con);
								continue;
							}
						} else {
							SmistatoreThread.log.info("Smistatore: sincronizzazione Gestore Eventi non abilitata.");
						}
					}
				}

				this.con.commit();
				this.con.setAutoCommit(true);
				this.dbm.releaseConnection(this.con);

				qs.commit();

				SmistatoreThread.log.info("Smistatore: Operazione [" + idOperazione + "] completata.");

			} catch (JMSException e) {
				try {
					qs.rollback();
					this.con.rollback();
					this.dbm.releaseConnection(this.con);
				} catch (Exception er) {
				}
				SmistatoreThread.log.error("Smistatore: Riscontrato erroreJMS durante la gestione di una richiesta: " + e.toString());
				try {
					Utilities.sleep(5000);
					SmistatoreThread.log.debug("Smistatore: Re-Inizializzazione Receiver ...");
					try {
						receiver.close();
					} catch (Exception eclose) {
					}
					try {
						qs.close();
					} catch (Exception eclose) {
					}
					try {
						qc.close();
					} catch (Exception eclose) {
					}
					qc = qcf.createQueueConnection();
					// Ripristino stato Exception Listener
					if (this.exceptionListenerJMS.isConnessioneCorrotta()) {
						this.exceptionListenerJMS.setConnessioneCorrotta(false);
						this.exceptionListenerJMS.setException(null);
					}
					qc.setExceptionListener(this.exceptionListenerJMS);
					qs = qc.createQueueSession(true, -1);
					receiver = qs.createReceiver(queue);
					qc.start();
					SmistatoreThread.log.debug("Smistatore: Re-Inizializzazione Receiver effettuata.");
					riconnessioneConErrore = false;

				} catch (Exception er) {
					SmistatoreThread.log.error("Smistatore: Re-Inizializzazione Receiver non effettuata:" + er.toString());
					riconnessioneConErrore = true;
				}
			} catch (Exception e) {
				try {
					qs.rollback();
					this.con.rollback();
					this.dbm.releaseConnection(this.con);
				} catch (Exception er) {
				}
				SmistatoreThread.log.error("Smistatore: Riscontrato errore durante la gestione di una richiesta: " + e.toString(), e);
			} finally {

				try {
					this.dbm.releaseConnection(this.con);
				} catch (Exception e) {

				}
			}
		}

		// Chiusura connessione
		try {
			if (receiver != null) {
				receiver.close();
			}
			if (qs != null) {
				qs.rollback();
				qs.close();
			}
			if (qc != null) {
				qc.stop();
				qc.close();
			}
		} catch (Exception e) {
			try {
				SmistatoreThread.log.error("Smistatore: Riscontrato errore durante la chiusura del Thread: " + e.toString());
			} catch (Exception eLogger) {
			}
		}
		
		this.isRunning = false;
		log.debug("Thread terminato");
	}

	public void stopGestore() {
		this.stop = true;
		
		SmistatoreThread.log.debug("Fermo il thread ...");
		int timeout = 60;
		for (int i = 0; i < timeout; i++) {
			if(this.isRunning()){
				Utilities.sleep(1000);
			}
			else{
				break;
			}
		}
		if(this.isRunning){
			SmistatoreThread.log.debug("Sono trascorsi 60 secondi ed il thread non รจ ancora terminato??");
		}
	}
}