SmistatoreThread.java

  1. /*
  2.  * GovWay - A customizable API Gateway
  3.  * https://govway.org
  4.  *
  5.  * Copyright (c) 2005-2025 Link.it srl (https://link.it).
  6.  *
  7.  * This program is free software: you can redistribute it and/or modify
  8.  * it under the terms of the GNU General Public License version 3, as published by
  9.  * the Free Software Foundation.
  10.  *
  11.  * This program is distributed in the hope that it will be useful,
  12.  * but WITHOUT ANY WARRANTY; without even the implied warranty of
  13.  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  14.  * GNU General Public License for more details.
  15.  *
  16.  * You should have received a copy of the GNU General Public License
  17.  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
  18.  *
  19.  */


  20. package org.openspcoop2.web.ctrlstat.gestori;

  21. import java.sql.Connection;
  22. import java.util.List;
  23. import java.util.Map;
  24. import java.util.Properties;

  25. import javax.jms.JMSException;
  26. import javax.jms.ObjectMessage;
  27. import javax.jms.Queue;
  28. import javax.jms.QueueConnection;
  29. import javax.jms.QueueConnectionFactory;
  30. import javax.jms.QueueReceiver;
  31. import javax.jms.QueueSession;
  32. import javax.naming.InitialContext;

  33. import org.openspcoop2.core.registry.constants.PddTipologia;
  34. import org.openspcoop2.pdd.config.OpenSPCoop2ConfigurationException;
  35. import org.openspcoop2.utils.Utilities;
  36. import org.openspcoop2.utils.transport.jms.ExceptionListenerJMS;
  37. import org.openspcoop2.web.ctrlstat.config.ConsoleProperties;
  38. import org.openspcoop2.web.ctrlstat.config.DatasourceProperties;
  39. import org.openspcoop2.web.ctrlstat.core.ControlStationLogger;
  40. import org.openspcoop2.web.ctrlstat.core.DBManager;
  41. import org.openspcoop2.web.ctrlstat.core.OperazioneDaSmistare;
  42. import org.openspcoop2.web.ctrlstat.costanti.CostantiControlStation;
  43. import org.openspcoop2.web.ctrlstat.costanti.OperationsParameter;
  44. import org.openspcoop2.web.ctrlstat.costanti.TipoOggettoDaSmistare;
  45. import org.openspcoop2.web.ctrlstat.dao.PdDControlStation;
  46. import org.openspcoop2.web.ctrlstat.servlet.pdd.PddCore;
  47. import org.openspcoop2.web.lib.queue.ClassQueue;
  48. import org.openspcoop2.web.lib.queue.QueueOperation;
  49. import org.openspcoop2.web.lib.queue.QueueParameter;
  50. import org.openspcoop2.web.lib.queue.config.QueueProperties;
  51. import org.openspcoop2.web.lib.queue.costanti.Operazione;
  52. import org.openspcoop2.web.lib.queue.costanti.TipoOperazione;
  53. import org.slf4j.Logger;

  54. /**
  55.  * SmistatoreThread
  56.  *
  57.  * @author Andrea Poli (apoli@link.it)
  58.  * @author Stefano Corallo (corallo@link.it)
  59.  * @author Sandra Giangrandi (sandra@link.it)
  60.  * @author $Author$
  61.  * @version $Rev$, $Date$
  62.  *
  63.  */
  64. public class SmistatoreThread extends Thread {

  65.     /** Logger utilizzato per debug. */
  66.     private static Logger log = null;
  67.    
  68.     /** run */
  69.     private boolean stop = false;
  70.     private boolean isRunning = false;
  71.     public boolean isRunning() {
  72.         return this.isRunning;
  73.     }

  74.     private DBManager dbm;
  75.     private Connection con;

  76.     private ExceptionListenerJMS exceptionListenerJMS = new ExceptionListenerJMS();

  77.     private ConsoleProperties consoleProperties;
  78.    
  79.     private QueueProperties queueProperties;
  80.    
  81.     private DatasourceProperties datasourceProperties;
  82.    
  83.     /** Costruttore
  84.      * @throws OpenSPCoop2ConfigurationException */
  85.     public SmistatoreThread() throws OpenSPCoop2ConfigurationException {

  86.         // configuro il logger
  87.         SmistatoreThread.log = ControlStationLogger.getSmistatoreLogger();

  88.         this.dbm = DBManager.getInstance();
  89.        
  90.         this.consoleProperties = ConsoleProperties.getInstance();
  91.        
  92.         this.queueProperties = QueueProperties.getInstance();
  93.        
  94.         this.datasourceProperties = DatasourceProperties.getInstance();
  95.     }

  96.     /**
  97.      * Metodo che fa partire il Thread.
  98.      *
  99.      * @since 0.4
  100.      */
  101.     @Override
  102.     public void run() {

  103.         this.isRunning = true;
  104.        
  105.         // Controllo se dbmanager inizializzato
  106.         if (!DBManager.isInitialized()) {
  107.             SmistatoreThread.log.info("Inizializzazione di " + this.getClass().getSimpleName() + " non riuscito perche' DBManager non INIZIALIZZATO");
  108.             SmistatoreThread.log.info(this.getClass().getName() + " Non AVVIATO!");
  109.             return;
  110.         }

  111.         String jmsConnectionFactory = null;
  112.         Properties jmsConnectionFactoryContext = null;
  113.        
  114.         String smistatoreQueue = null;
  115.         String registroServiziQueue = null;
  116.         String gestoreEventiQueue = null;
  117.         String pddQueuePrefix = null;
  118.        
  119.         boolean enginePDD = false;
  120.         boolean engineRegistro = false;
  121.         boolean engineGestoreEventi = false;
  122.        
  123.         boolean singlePdD = true;
  124.         String tipoDatabase = null;
  125.        
  126.         try{
  127.             // Leggo le informazioni da queue.properties
  128.             jmsConnectionFactory = this.queueProperties.getConnectionFactory();
  129.             jmsConnectionFactoryContext = this.queueProperties.getConnectionFactoryContext();
  130.            
  131.             // Leggo le informazioni da console.properties
  132.            
  133.             // nomi code
  134.             smistatoreQueue = this.consoleProperties.getGestioneCentralizzataNomeCodaSmistatore();
  135.             registroServiziQueue = this.consoleProperties.getGestioneCentralizzataNomeCodaRegistroServizi();
  136.             gestoreEventiQueue = this.consoleProperties.getGestioneCentralizzataNomeCodaGestoreEventi();
  137.             pddQueuePrefix = this.consoleProperties.getGestioneCentralizzataPrefissoNomeCodaConfigurazionePdd();
  138.            
  139.             // Abilitazione Engine
  140.             enginePDD = this.consoleProperties.isGestioneCentralizzataSincronizzazionePdd();
  141.             engineRegistro = this.consoleProperties.isGestioneCentralizzataSincronizzazioneRegistro();
  142.             engineGestoreEventi = this.consoleProperties.isGestioneCentralizzataSincronizzazioneGestoreEventi();
  143.            
  144.             // Altre informazioni
  145.             singlePdD = this.consoleProperties.isSinglePdD();
  146.            
  147.             // Database Info
  148.             tipoDatabase = this.datasourceProperties.getTipoDatabase();
  149.            
  150.         }catch(Exception e){
  151.             SmistatoreThread.log.info("Smistatore non avviato, sono stati rilevati errori durante la lettura delle configurazione: "+e.getMessage(),e);
  152.             return;
  153.         }
  154.        
  155.         if (singlePdD) {
  156.             SmistatoreThread.log.info("Smistatore non avviato: govwayConsole avviata in singlePdD mode.");
  157.             return;
  158.         }

  159.         // Configurazione JMS
  160.         SmistatoreThread.log.debug("Smistatore: Avvio Servizio di Gestione Operazioni, Registro[" + engineRegistro + "] Pdd[" + enginePDD + "] GestoreEventi[" + engineGestoreEventi + "]");
  161.         QueueReceiver receiver = null;
  162.         Queue queue = null;
  163.         QueueConnectionFactory qcf = null;
  164.         QueueConnection qc = null;
  165.         QueueSession qs = null;
  166.         boolean trovato = false;
  167.         int i = 0;
  168.         SmistatoreThread.log.debug("Smistatore: Inizializzazione Receiver ...");
  169.         while (!trovato && (i < 600000)) {
  170.             try {
  171.                 InitialContext ctx = new InitialContext(jmsConnectionFactoryContext);
  172.                 queue = (Queue) ctx.lookup(smistatoreQueue);
  173.                 qcf = (QueueConnectionFactory) ctx.lookup(jmsConnectionFactory);
  174.                 qc = qcf.createQueueConnection();
  175.                 qc.setExceptionListener(this.exceptionListenerJMS);
  176.                 qs = qc.createQueueSession(true, -1);
  177.                 receiver = qs.createReceiver(queue);
  178.                 qc.start();
  179.                 ctx.close();
  180.                 SmistatoreThread.log.debug("Smistatore: Inizializzazione Receiver effettuata.");
  181.                 trovato = true;
  182.             } catch (Exception e) {
  183.                 i = i + 10000;
  184.                 Utilities.sleep(10000);
  185.             }
  186.         }

  187.         if (!trovato) {
  188.             SmistatoreThread.log.error("Smistatore: Inizializzazione Receiver non effettuata");
  189.             return;
  190.         }

  191.         // Avvio Gestione Operazioni
  192.         boolean riconnessioneConErrore = false;
  193.         while (this.stop == false) {

  194.             try {

  195.                 // riconnessione precedente non riuscita.....
  196.                 if (riconnessioneConErrore) {
  197.                     throw new JMSException("RiconnessioneJMS non riuscita...");
  198.                 }
  199.                 // Controllo ExceptionListenerJMS
  200.                 if (this.exceptionListenerJMS.isConnessioneCorrotta()) {
  201.                     SmistatoreThread.log.error("ExceptionJMSListener ha rilevato una connessione jms corrotta", this.exceptionListenerJMS.getException());
  202.                     throw new JMSException("ExceptionJMSListener ha rilevato una connessione jms corrotta: " + this.exceptionListenerJMS.getException().getMessage());
  203.                 }

  204.                 SmistatoreThread.log.info("Smistatore: Ricezione operazione...");
  205.                 ObjectMessage richiesta = null;
  206.                 while (this.stop == false) {
  207.                     richiesta = (ObjectMessage) receiver.receive(CostantiControlStation.INTERVALLO_RECEIVE);
  208.                     if (richiesta != null) {
  209.                         break;
  210.                     }
  211.                 }
  212.                 if (this.stop == true) {
  213.                     break;
  214.                 }

  215.                 // Ricezione Operazione
  216.                 OperazioneDaSmistare operazione = null;
  217.                 try {
  218.                     operazione = (OperazioneDaSmistare) richiesta.getObject();
  219.                 } catch (Exception e) {
  220.                     SmistatoreThread.log.error("Smistatore: Ricevuta richiesta con tipo errato:" + e.toString());
  221.                     qs.commit();
  222.                     continue;
  223.                 }

  224.                 String idOperazione = richiesta.getStringProperty("ID");

  225.                 SmistatoreThread.log.info(CostantiControlStation.OPERATIONS_DELIMITER+"Smistatore: Ricevuta richiesta di operazione con ID: " + idOperazione);
  226.                 SmistatoreThread.log.debug("Smistatore: Dati operazione ricevuta idTab[" + operazione.getIDTable() + "] operazione[" + operazione.getOperazione() + "] pdd[" + operazione.getPdd() + "] oggetto[" + operazione.getOggetto() + "]");

  227.                 if ((operazione.getOperazione() == null)) {
  228.                     SmistatoreThread.log.error("Smistatore: Ricevuta richiesta con parametri scorretti.");
  229.                     qs.commit();
  230.                     continue;
  231.                 }

  232.                 if ((Operazione.change.equals(operazione.getOperazione()) == false) && (Operazione.add.equals(operazione.getOperazione()) == false) && (Operazione.del.equals(operazione.getOperazione()) == false)) {
  233.                     SmistatoreThread.log.error("Smistatore: Operazione [" + operazione.getOperazione() + "] non supportata dal gestore");
  234.                     qs.commit();
  235.                     continue;
  236.                 }

  237.                 if ( (operazione.getOggetto() == null) || (operazione.getIDTable() < 0)) {
  238.                     SmistatoreThread.log.error("Smistatore: Ricevuta richiesta con parametri scorretti.");
  239.                     qs.commit();
  240.                     continue;
  241.                 }

  242.                 // Guardo che tipo di operazione ho in coda...
  243.                 // Se e' un'operazione per il registro, la metto nella coda
  244.                 // OperazioniGestoreRegistroServizi
  245.                 // Se e' un'operazione per la pdd, la metto nella coda del pdd
  246.                 // interessato

  247.                 this.con = this.dbm.getConnection();

  248.                 Operazione operazioneTipologia = operazione.getOperazione();
  249.                 String su = operazione.getSuperuser();
  250.                 TipoOggettoDaSmistare tipoOggettoDaSmistare = operazione.getOggetto();
  251.                 String pdd = operazione.getPdd();

  252.                 // Preparo un oggetto di tipo PetraOperation e poi chiamo
  253.                 // la insertQueue
  254.                 QueueOperation queueOperation = new QueueOperation();
  255.                 queueOperation.setTipoOperazione(TipoOperazione.webService);
  256.                 queueOperation.setOperazione(operazioneTipologia);
  257.                 queueOperation.setSuperuser(su);

  258.                 // disabilito il commit
  259.                 this.con.setAutoCommit(false);

  260.                 // OggettoClassQueue
  261.                 ClassQueue cq = null;
  262.                 try {
  263.                     cq = new ClassQueue(this.con, tipoDatabase, qs);
  264.                 } catch (Exception e) {
  265.                     SmistatoreThread.log.error("Smistatore: Inizializzazione ClassQueue non effettuata: " + e.getMessage());
  266.                     qs.rollback();
  267.                     continue;
  268.                 }

  269.                 if (tipoOggettoDaSmistare != null) {
  270.                     long idTable = operazione.getIDTable();

  271.                     String filter = "[" + operazione.getIDTable() + "]";
  272.                     filter += "[" + tipoOggettoDaSmistare.name() + "]";
  273.                     filter += "[" + operazione.getOperazione() + "]";

  274.                     queueOperation.addParametro(new QueueParameter("Oggetto", tipoOggettoDaSmistare.name()));
  275.                     queueOperation.addParametro(new QueueParameter("IDTable", "" + idTable));

  276.                     Map<OperationsParameter, List<String>> params = operazione.getParameters();
  277.                     if(params!=null && !params.isEmpty()) {
  278.                         for (OperationsParameter key : params.keySet()) {
  279.                             // Per ogni parametro presente nell'operazione da smistare
  280.                             // creo un nuovo PetraParameter con nome key.getNome() e
  281.                             // valore il valore associato nella tabella
  282.    
  283.                             List<String> values = params.get(key);
  284.                             for (String value : values) {
  285.                                 queueOperation.addParametro(new QueueParameter(key.getNome(), value));
  286.                                 filter += "[" + value + "]";
  287.                             }
  288.    
  289.                         }
  290.                     }

  291.                     // Smisto l'operazione

  292.                     /* ***** REGISTRO ***** */
  293.                    
  294.                     if (tipoOggettoDaSmistare.equals(TipoOggettoDaSmistare.soggetto) ||
  295.                             tipoOggettoDaSmistare.equals(TipoOggettoDaSmistare.servizio) ||
  296.                             tipoOggettoDaSmistare.equals(TipoOggettoDaSmistare.ruolo) ||
  297.                             tipoOggettoDaSmistare.equals(TipoOggettoDaSmistare.accordo) ||
  298.                             tipoOggettoDaSmistare.equals(TipoOggettoDaSmistare.accordoCooperazione) ||
  299. //                          tipoOggettoDaSmistare.equals(TipoOggettoDaSmistare.fruitore) ||
  300.                             tipoOggettoDaSmistare.equals(TipoOggettoDaSmistare.pdd)) {
  301.                         // Operazione per il registro
  302.                         if (engineRegistro) {
  303.                             QueueOperation queueOperationRegistro = (QueueOperation) queueOperation.clone();
  304.                             queueOperationRegistro.setTipoOperazione(TipoOperazione.webService);
  305.                             if (cq.insertQueue(registroServiziQueue, queueOperationRegistro, filter) == 0) {
  306.                                 SmistatoreThread.log.error("Smistatore: Si e' verificato un problema durante l'inserimento in coda.");
  307.                                 qs.rollback();
  308.                                 this.con.rollback();
  309.                                 this.dbm.releaseConnection(this.con);
  310.                                 continue;
  311.                             }
  312.                         } else {
  313.                             SmistatoreThread.log.info("Smistatore: sincronizzazione Registro Servizi non abilitata.");
  314.                         }
  315.                     }

  316.                     /* ***** PDD ***** */
  317.                    
  318.                     if ((pdd != null) && !pdd.equals("") && !pdd.equals("-") ) {
  319.                         // Operazione per il pdd
  320.                         if (enginePDD) {
  321.                             // Qualcuno avra' provveduto a creare una coda per
  322.                             // il pdd, che si chiama come il pdd stesso
  323.                             PddCore pddCore = new PddCore();
  324.                             PdDControlStation myPdd = pddCore.getPdDControlStation(pdd);
  325.                             String tipoPdd = myPdd.getTipo();

  326.                             if (PddTipologia.OPERATIVO.toString().equals(tipoPdd)) {
  327.                                 QueueOperation queueOperationPdD = (QueueOperation) queueOperation.clone();
  328.                                 queueOperationPdD.setTipoOperazione(TipoOperazione.webService);
  329.                                 if (cq.insertQueue(pddQueuePrefix + pdd, queueOperationPdD, filter) == 0) {
  330.                                     SmistatoreThread.log.error("Smistatore: Si e' verificato un problema durante l'inserimento in coda.");
  331.                                     qs.rollback();
  332.                                     this.con.rollback();
  333.                                     this.dbm.releaseConnection(this.con);
  334.                                     continue;
  335.                                 }
  336.                             } else {
  337.                                 SmistatoreThread.log.warn("Smistatore: Inserimento in coda non effettuato causa NAL [" + pdd + "] Tipo [" + tipoPdd + "] ");
  338.                             }
  339.                         } else {
  340.                             SmistatoreThread.log.info("Smistatore: sincronizzazione Nal non abilitata.");
  341.                         }
  342.                     }

  343.                     /* ***** Gestore Eventi ***** */
  344.                    
  345.                     if ( ((gestoreEventiQueue != null) && !gestoreEventiQueue.equals("")) &&
  346.                             (
  347.                             tipoOggettoDaSmistare.equals(TipoOggettoDaSmistare.soggetto) ||
  348.                             tipoOggettoDaSmistare.equals(TipoOggettoDaSmistare.servizio) ||
  349.                             tipoOggettoDaSmistare.equals(TipoOggettoDaSmistare.mappingFruizionePD)// ||
  350.                             //tipoOggettoDaSmistare.equals(TipoOggettoDaSmistare.fruitore)
  351.                             )
  352.                         ){
  353.                         // Operazione per il registro
  354.                         if (engineGestoreEventi) {
  355.                             QueueOperation queueOperationGestoreEventi= (QueueOperation) queueOperation.clone();
  356.                             queueOperationGestoreEventi.setTipoOperazione(TipoOperazione.webService);
  357.                             if (cq.insertQueue(gestoreEventiQueue, queueOperationGestoreEventi, filter) == 0) {
  358.                                 SmistatoreThread.log.error("Smistatore: Si e' verificato un problema durante l'inserimento in coda.");
  359.                                 qs.rollback();
  360.                                 this.con.rollback();
  361.                                 this.dbm.releaseConnection(this.con);
  362.                                 continue;
  363.                             }
  364.                         } else {
  365.                             SmistatoreThread.log.info("Smistatore: sincronizzazione Gestore Eventi non abilitata.");
  366.                         }
  367.                     }
  368.                 }

  369.                 this.con.commit();
  370.                 this.con.setAutoCommit(true);
  371.                 this.dbm.releaseConnection(this.con);

  372.                 qs.commit();

  373.                 SmistatoreThread.log.info("Smistatore: Operazione [" + idOperazione + "] completata.");

  374.             } catch (JMSException e) {
  375.                 try {
  376.                     qs.rollback();
  377.                     this.con.rollback();
  378.                     this.dbm.releaseConnection(this.con);
  379.                 } catch (Exception er) {
  380.                 }
  381.                 SmistatoreThread.log.error("Smistatore: Riscontrato erroreJMS durante la gestione di una richiesta: " + e.toString());
  382.                 try {
  383.                     Utilities.sleep(5000);
  384.                     SmistatoreThread.log.debug("Smistatore: Re-Inizializzazione Receiver ...");
  385.                     try {
  386.                         receiver.close();
  387.                     } catch (Exception eclose) {
  388.                     }
  389.                     try {
  390.                         qs.close();
  391.                     } catch (Exception eclose) {
  392.                     }
  393.                     try {
  394.                         qc.close();
  395.                     } catch (Exception eclose) {
  396.                     }
  397.                     qc = qcf.createQueueConnection();
  398.                     // Ripristino stato Exception Listener
  399.                     if (this.exceptionListenerJMS.isConnessioneCorrotta()) {
  400.                         this.exceptionListenerJMS.setConnessioneCorrotta(false);
  401.                         this.exceptionListenerJMS.setException(null);
  402.                     }
  403.                     qc.setExceptionListener(this.exceptionListenerJMS);
  404.                     qs = qc.createQueueSession(true, -1);
  405.                     receiver = qs.createReceiver(queue);
  406.                     qc.start();
  407.                     SmistatoreThread.log.debug("Smistatore: Re-Inizializzazione Receiver effettuata.");
  408.                     riconnessioneConErrore = false;

  409.                 } catch (Exception er) {
  410.                     SmistatoreThread.log.error("Smistatore: Re-Inizializzazione Receiver non effettuata:" + er.toString());
  411.                     riconnessioneConErrore = true;
  412.                 }
  413.             } catch (Exception e) {
  414.                 try {
  415.                     qs.rollback();
  416.                     this.con.rollback();
  417.                     this.dbm.releaseConnection(this.con);
  418.                 } catch (Exception er) {
  419.                 }
  420.                 SmistatoreThread.log.error("Smistatore: Riscontrato errore durante la gestione di una richiesta: " + e.toString(), e);
  421.             } finally {

  422.                 try {
  423.                     this.dbm.releaseConnection(this.con);
  424.                 } catch (Exception e) {

  425.                 }
  426.             }
  427.         }

  428.         // Chiusura connessione
  429.         try {
  430.             if (receiver != null) {
  431.                 receiver.close();
  432.             }
  433.             if (qs != null) {
  434.                 qs.rollback();
  435.                 qs.close();
  436.             }
  437.             if (qc != null) {
  438.                 qc.stop();
  439.                 qc.close();
  440.             }
  441.         } catch (Exception e) {
  442.             try {
  443.                 SmistatoreThread.log.error("Smistatore: Riscontrato errore durante la chiusura del Thread: " + e.toString());
  444.             } catch (Exception eLogger) {
  445.             }
  446.         }
  447.        
  448.         this.isRunning = false;
  449.         log.debug("Thread terminato");
  450.     }

  451.     public void stopGestore() {
  452.         this.stop = true;
  453.        
  454.         SmistatoreThread.log.debug("Fermo il thread ...");
  455.         int timeout = 60;
  456.         for (int i = 0; i < timeout; i++) {
  457.             if(this.isRunning()){
  458.                 Utilities.sleep(1000);
  459.             }
  460.             else{
  461.                 break;
  462.             }
  463.         }
  464.         if(this.isRunning){
  465.             SmistatoreThread.log.debug("Sono trascorsi 60 secondi ed il thread non รจ ancora terminato??");
  466.         }
  467.     }
  468. }