SmistatoreThread.java
/*
* GovWay - A customizable API Gateway
* https://govway.org
*
* Copyright (c) 2005-2024 Link.it srl (https://link.it).
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License version 3, as published by
* the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
*/
package org.openspcoop2.web.ctrlstat.gestori;
import java.sql.Connection;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import javax.jms.JMSException;
import javax.jms.ObjectMessage;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueReceiver;
import javax.jms.QueueSession;
import javax.naming.InitialContext;
import org.openspcoop2.core.registry.constants.PddTipologia;
import org.openspcoop2.pdd.config.OpenSPCoop2ConfigurationException;
import org.openspcoop2.utils.Utilities;
import org.openspcoop2.utils.transport.jms.ExceptionListenerJMS;
import org.openspcoop2.web.ctrlstat.config.ConsoleProperties;
import org.openspcoop2.web.ctrlstat.config.DatasourceProperties;
import org.openspcoop2.web.ctrlstat.core.ControlStationLogger;
import org.openspcoop2.web.ctrlstat.core.DBManager;
import org.openspcoop2.web.ctrlstat.core.OperazioneDaSmistare;
import org.openspcoop2.web.ctrlstat.costanti.CostantiControlStation;
import org.openspcoop2.web.ctrlstat.costanti.OperationsParameter;
import org.openspcoop2.web.ctrlstat.costanti.TipoOggettoDaSmistare;
import org.openspcoop2.web.ctrlstat.dao.PdDControlStation;
import org.openspcoop2.web.ctrlstat.servlet.pdd.PddCore;
import org.openspcoop2.web.lib.queue.ClassQueue;
import org.openspcoop2.web.lib.queue.QueueOperation;
import org.openspcoop2.web.lib.queue.QueueParameter;
import org.openspcoop2.web.lib.queue.config.QueueProperties;
import org.openspcoop2.web.lib.queue.costanti.Operazione;
import org.openspcoop2.web.lib.queue.costanti.TipoOperazione;
import org.slf4j.Logger;
/**
* SmistatoreThread
*
* @author Andrea Poli (apoli@link.it)
* @author Stefano Corallo (corallo@link.it)
* @author Sandra Giangrandi (sandra@link.it)
* @author $Author$
* @version $Rev$, $Date$
*
*/
public class SmistatoreThread extends Thread {
/** Logger utilizzato per debug. */
private static Logger log = null;
/** run */
private boolean stop = false;
private boolean isRunning = false;
public boolean isRunning() {
return this.isRunning;
}
private DBManager dbm;
private Connection con;
private ExceptionListenerJMS exceptionListenerJMS = new ExceptionListenerJMS();
private ConsoleProperties consoleProperties;
private QueueProperties queueProperties;
private DatasourceProperties datasourceProperties;
/** Costruttore
* @throws OpenSPCoop2ConfigurationException */
public SmistatoreThread() throws OpenSPCoop2ConfigurationException {
// configuro il logger
SmistatoreThread.log = ControlStationLogger.getSmistatoreLogger();
this.dbm = DBManager.getInstance();
this.consoleProperties = ConsoleProperties.getInstance();
this.queueProperties = QueueProperties.getInstance();
this.datasourceProperties = DatasourceProperties.getInstance();
}
/**
* Metodo che fa partire il Thread.
*
* @since 0.4
*/
@Override
public void run() {
this.isRunning = true;
// Controllo se dbmanager inizializzato
if (!DBManager.isInitialized()) {
SmistatoreThread.log.info("Inizializzazione di " + this.getClass().getSimpleName() + " non riuscito perche' DBManager non INIZIALIZZATO");
SmistatoreThread.log.info(this.getClass().getName() + " Non AVVIATO!");
return;
}
String jmsConnectionFactory = null;
Properties jmsConnectionFactoryContext = null;
String smistatoreQueue = null;
String registroServiziQueue = null;
String gestoreEventiQueue = null;
String pddQueuePrefix = null;
boolean enginePDD = false;
boolean engineRegistro = false;
boolean engineGestoreEventi = false;
boolean singlePdD = true;
String tipoDatabase = null;
try{
// Leggo le informazioni da queue.properties
jmsConnectionFactory = this.queueProperties.getConnectionFactory();
jmsConnectionFactoryContext = this.queueProperties.getConnectionFactoryContext();
// Leggo le informazioni da console.properties
// nomi code
smistatoreQueue = this.consoleProperties.getGestioneCentralizzataNomeCodaSmistatore();
registroServiziQueue = this.consoleProperties.getGestioneCentralizzataNomeCodaRegistroServizi();
gestoreEventiQueue = this.consoleProperties.getGestioneCentralizzataNomeCodaGestoreEventi();
pddQueuePrefix = this.consoleProperties.getGestioneCentralizzataPrefissoNomeCodaConfigurazionePdd();
// Abilitazione Engine
enginePDD = this.consoleProperties.isGestioneCentralizzataSincronizzazionePdd();
engineRegistro = this.consoleProperties.isGestioneCentralizzataSincronizzazioneRegistro();
engineGestoreEventi = this.consoleProperties.isGestioneCentralizzataSincronizzazioneGestoreEventi();
// Altre informazioni
singlePdD = this.consoleProperties.isSinglePdD();
// Database Info
tipoDatabase = this.datasourceProperties.getTipoDatabase();
}catch(Exception e){
SmistatoreThread.log.info("Smistatore non avviato, sono stati rilevati errori durante la lettura delle configurazione: "+e.getMessage(),e);
return;
}
if (singlePdD) {
SmistatoreThread.log.info("Smistatore non avviato: govwayConsole avviata in singlePdD mode.");
return;
}
// Configurazione JMS
SmistatoreThread.log.debug("Smistatore: Avvio Servizio di Gestione Operazioni, Registro[" + engineRegistro + "] Pdd[" + enginePDD + "] GestoreEventi[" + engineGestoreEventi + "]");
QueueReceiver receiver = null;
Queue queue = null;
QueueConnectionFactory qcf = null;
QueueConnection qc = null;
QueueSession qs = null;
boolean trovato = false;
int i = 0;
SmistatoreThread.log.debug("Smistatore: Inizializzazione Receiver ...");
while (!trovato && (i < 600000)) {
try {
InitialContext ctx = new InitialContext(jmsConnectionFactoryContext);
queue = (Queue) ctx.lookup(smistatoreQueue);
qcf = (QueueConnectionFactory) ctx.lookup(jmsConnectionFactory);
qc = qcf.createQueueConnection();
qc.setExceptionListener(this.exceptionListenerJMS);
qs = qc.createQueueSession(true, -1);
receiver = qs.createReceiver(queue);
qc.start();
ctx.close();
SmistatoreThread.log.debug("Smistatore: Inizializzazione Receiver effettuata.");
trovato = true;
} catch (Exception e) {
i = i + 10000;
Utilities.sleep(10000);
}
}
if (!trovato) {
SmistatoreThread.log.error("Smistatore: Inizializzazione Receiver non effettuata");
return;
}
// Avvio Gestione Operazioni
boolean riconnessioneConErrore = false;
while (this.stop == false) {
try {
// riconnessione precedente non riuscita.....
if (riconnessioneConErrore) {
throw new JMSException("RiconnessioneJMS non riuscita...");
}
// Controllo ExceptionListenerJMS
if (this.exceptionListenerJMS.isConnessioneCorrotta()) {
SmistatoreThread.log.error("ExceptionJMSListener ha rilevato una connessione jms corrotta", this.exceptionListenerJMS.getException());
throw new JMSException("ExceptionJMSListener ha rilevato una connessione jms corrotta: " + this.exceptionListenerJMS.getException().getMessage());
}
SmistatoreThread.log.info("Smistatore: Ricezione operazione...");
ObjectMessage richiesta = null;
while (this.stop == false) {
richiesta = (ObjectMessage) receiver.receive(CostantiControlStation.INTERVALLO_RECEIVE);
if (richiesta != null) {
break;
}
}
if (this.stop == true) {
break;
}
// Ricezione Operazione
OperazioneDaSmistare operazione = null;
try {
operazione = (OperazioneDaSmistare) richiesta.getObject();
} catch (Exception e) {
SmistatoreThread.log.error("Smistatore: Ricevuta richiesta con tipo errato:" + e.toString());
qs.commit();
continue;
}
String idOperazione = richiesta.getStringProperty("ID");
SmistatoreThread.log.info(CostantiControlStation.OPERATIONS_DELIMITER+"Smistatore: Ricevuta richiesta di operazione con ID: " + idOperazione);
SmistatoreThread.log.debug("Smistatore: Dati operazione ricevuta idTab[" + operazione.getIDTable() + "] operazione[" + operazione.getOperazione() + "] pdd[" + operazione.getPdd() + "] oggetto[" + operazione.getOggetto() + "]");
if ((operazione.getOperazione() == null)) {
SmistatoreThread.log.error("Smistatore: Ricevuta richiesta con parametri scorretti.");
qs.commit();
continue;
}
if ((Operazione.change.equals(operazione.getOperazione()) == false) && (Operazione.add.equals(operazione.getOperazione()) == false) && (Operazione.del.equals(operazione.getOperazione()) == false)) {
SmistatoreThread.log.error("Smistatore: Operazione [" + operazione.getOperazione() + "] non supportata dal gestore");
qs.commit();
continue;
}
if ( (operazione.getOggetto() == null) || (operazione.getIDTable() < 0)) {
SmistatoreThread.log.error("Smistatore: Ricevuta richiesta con parametri scorretti.");
qs.commit();
continue;
}
// Guardo che tipo di operazione ho in coda...
// Se e' un'operazione per il registro, la metto nella coda
// OperazioniGestoreRegistroServizi
// Se e' un'operazione per la pdd, la metto nella coda del pdd
// interessato
this.con = this.dbm.getConnection();
Operazione operazioneTipologia = operazione.getOperazione();
String su = operazione.getSuperuser();
TipoOggettoDaSmistare tipoOggettoDaSmistare = operazione.getOggetto();
String pdd = operazione.getPdd();
// Preparo un oggetto di tipo PetraOperation e poi chiamo
// la insertQueue
QueueOperation queueOperation = new QueueOperation();
queueOperation.setTipoOperazione(TipoOperazione.webService);
queueOperation.setOperazione(operazioneTipologia);
queueOperation.setSuperuser(su);
// disabilito il commit
this.con.setAutoCommit(false);
// OggettoClassQueue
ClassQueue cq = null;
try {
cq = new ClassQueue(this.con, tipoDatabase, qs);
} catch (Exception e) {
SmistatoreThread.log.error("Smistatore: Inizializzazione ClassQueue non effettuata: " + e.getMessage());
qs.rollback();
continue;
}
if (tipoOggettoDaSmistare != null) {
long idTable = operazione.getIDTable();
String filter = "[" + operazione.getIDTable() + "]";
filter += "[" + tipoOggettoDaSmistare.name() + "]";
filter += "[" + operazione.getOperazione() + "]";
queueOperation.addParametro(new QueueParameter("Oggetto", tipoOggettoDaSmistare.name()));
queueOperation.addParametro(new QueueParameter("IDTable", "" + idTable));
Map<OperationsParameter, List<String>> params = operazione.getParameters();
if(params!=null && !params.isEmpty()) {
for (OperationsParameter key : params.keySet()) {
// Per ogni parametro presente nell'operazione da smistare
// creo un nuovo PetraParameter con nome key.getNome() e
// valore il valore associato nella tabella
List<String> values = params.get(key);
for (String value : values) {
queueOperation.addParametro(new QueueParameter(key.getNome(), value));
filter += "[" + value + "]";
}
}
}
// Smisto l'operazione
/* ***** REGISTRO ***** */
if (tipoOggettoDaSmistare.equals(TipoOggettoDaSmistare.soggetto) ||
tipoOggettoDaSmistare.equals(TipoOggettoDaSmistare.servizio) ||
tipoOggettoDaSmistare.equals(TipoOggettoDaSmistare.ruolo) ||
tipoOggettoDaSmistare.equals(TipoOggettoDaSmistare.accordo) ||
tipoOggettoDaSmistare.equals(TipoOggettoDaSmistare.accordoCooperazione) ||
// tipoOggettoDaSmistare.equals(TipoOggettoDaSmistare.fruitore) ||
tipoOggettoDaSmistare.equals(TipoOggettoDaSmistare.pdd)) {
// Operazione per il registro
if (engineRegistro) {
QueueOperation queueOperationRegistro = (QueueOperation) queueOperation.clone();
queueOperationRegistro.setTipoOperazione(TipoOperazione.webService);
if (cq.insertQueue(registroServiziQueue, queueOperationRegistro, filter) == 0) {
SmistatoreThread.log.error("Smistatore: Si e' verificato un problema durante l'inserimento in coda.");
qs.rollback();
this.con.rollback();
this.dbm.releaseConnection(this.con);
continue;
}
} else {
SmistatoreThread.log.info("Smistatore: sincronizzazione Registro Servizi non abilitata.");
}
}
/* ***** PDD ***** */
if ((pdd != null) && !pdd.equals("") && !pdd.equals("-") ) {
// Operazione per il pdd
if (enginePDD) {
// Qualcuno avra' provveduto a creare una coda per
// il pdd, che si chiama come il pdd stesso
PddCore pddCore = new PddCore();
PdDControlStation myPdd = pddCore.getPdDControlStation(pdd);
String tipoPdd = myPdd.getTipo();
if (PddTipologia.OPERATIVO.toString().equals(tipoPdd)) {
QueueOperation queueOperationPdD = (QueueOperation) queueOperation.clone();
queueOperationPdD.setTipoOperazione(TipoOperazione.webService);
if (cq.insertQueue(pddQueuePrefix + pdd, queueOperationPdD, filter) == 0) {
SmistatoreThread.log.error("Smistatore: Si e' verificato un problema durante l'inserimento in coda.");
qs.rollback();
this.con.rollback();
this.dbm.releaseConnection(this.con);
continue;
}
} else {
SmistatoreThread.log.warn("Smistatore: Inserimento in coda non effettuato causa NAL [" + pdd + "] Tipo [" + tipoPdd + "] ");
}
} else {
SmistatoreThread.log.info("Smistatore: sincronizzazione Nal non abilitata.");
}
}
/* ***** Gestore Eventi ***** */
if ( ((gestoreEventiQueue != null) && !gestoreEventiQueue.equals("")) &&
(
tipoOggettoDaSmistare.equals(TipoOggettoDaSmistare.soggetto) ||
tipoOggettoDaSmistare.equals(TipoOggettoDaSmistare.servizio) ||
tipoOggettoDaSmistare.equals(TipoOggettoDaSmistare.mappingFruizionePD)// ||
//tipoOggettoDaSmistare.equals(TipoOggettoDaSmistare.fruitore)
)
){
// Operazione per il registro
if (engineGestoreEventi) {
QueueOperation queueOperationGestoreEventi= (QueueOperation) queueOperation.clone();
queueOperationGestoreEventi.setTipoOperazione(TipoOperazione.webService);
if (cq.insertQueue(gestoreEventiQueue, queueOperationGestoreEventi, filter) == 0) {
SmistatoreThread.log.error("Smistatore: Si e' verificato un problema durante l'inserimento in coda.");
qs.rollback();
this.con.rollback();
this.dbm.releaseConnection(this.con);
continue;
}
} else {
SmistatoreThread.log.info("Smistatore: sincronizzazione Gestore Eventi non abilitata.");
}
}
}
this.con.commit();
this.con.setAutoCommit(true);
this.dbm.releaseConnection(this.con);
qs.commit();
SmistatoreThread.log.info("Smistatore: Operazione [" + idOperazione + "] completata.");
} catch (JMSException e) {
try {
qs.rollback();
this.con.rollback();
this.dbm.releaseConnection(this.con);
} catch (Exception er) {
}
SmistatoreThread.log.error("Smistatore: Riscontrato erroreJMS durante la gestione di una richiesta: " + e.toString());
try {
Utilities.sleep(5000);
SmistatoreThread.log.debug("Smistatore: Re-Inizializzazione Receiver ...");
try {
receiver.close();
} catch (Exception eclose) {
}
try {
qs.close();
} catch (Exception eclose) {
}
try {
qc.close();
} catch (Exception eclose) {
}
qc = qcf.createQueueConnection();
// Ripristino stato Exception Listener
if (this.exceptionListenerJMS.isConnessioneCorrotta()) {
this.exceptionListenerJMS.setConnessioneCorrotta(false);
this.exceptionListenerJMS.setException(null);
}
qc.setExceptionListener(this.exceptionListenerJMS);
qs = qc.createQueueSession(true, -1);
receiver = qs.createReceiver(queue);
qc.start();
SmistatoreThread.log.debug("Smistatore: Re-Inizializzazione Receiver effettuata.");
riconnessioneConErrore = false;
} catch (Exception er) {
SmistatoreThread.log.error("Smistatore: Re-Inizializzazione Receiver non effettuata:" + er.toString());
riconnessioneConErrore = true;
}
} catch (Exception e) {
try {
qs.rollback();
this.con.rollback();
this.dbm.releaseConnection(this.con);
} catch (Exception er) {
}
SmistatoreThread.log.error("Smistatore: Riscontrato errore durante la gestione di una richiesta: " + e.toString(), e);
} finally {
try {
this.dbm.releaseConnection(this.con);
} catch (Exception e) {
}
}
}
// Chiusura connessione
try {
if (receiver != null) {
receiver.close();
}
if (qs != null) {
qs.rollback();
qs.close();
}
if (qc != null) {
qc.stop();
qc.close();
}
} catch (Exception e) {
try {
SmistatoreThread.log.error("Smistatore: Riscontrato errore durante la chiusura del Thread: " + e.toString());
} catch (Exception eLogger) {
}
}
this.isRunning = false;
log.debug("Thread terminato");
}
public void stopGestore() {
this.stop = true;
SmistatoreThread.log.debug("Fermo il thread ...");
int timeout = 60;
for (int i = 0; i < timeout; i++) {
if(this.isRunning()){
Utilities.sleep(1000);
}
else{
break;
}
}
if(this.isRunning){
SmistatoreThread.log.debug("Sono trascorsi 60 secondi ed il thread non รจ ancora terminato??");
}
}
}