NodeReceiverDB.java

  1. /*
  2.  * GovWay - A customizable API Gateway
  3.  * https://govway.org
  4.  *
  5.  * Copyright (c) 2005-2025 Link.it srl (https://link.it).
  6.  *
  7.  * This program is free software: you can redistribute it and/or modify
  8.  * it under the terms of the GNU General Public License version 3, as published by
  9.  * the Free Software Foundation.
  10.  *
  11.  * This program is distributed in the hope that it will be useful,
  12.  * but WITHOUT ANY WARRANTY; without even the implied warranty of
  13.  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  14.  * GNU General Public License for more details.
  15.  *
  16.  * You should have received a copy of the GNU General Public License
  17.  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
  18.  *
  19.  */

  20. package org.openspcoop2.pdd.core.node;

  21. import java.sql.Connection;

  22. import org.openspcoop2.core.id.IDSoggetto;
  23. import org.openspcoop2.pdd.config.DBManager;
  24. import org.openspcoop2.pdd.config.OpenSPCoop2Properties;
  25. import org.openspcoop2.pdd.config.Resource;
  26. import org.openspcoop2.pdd.core.AbstractCore;
  27. import org.openspcoop2.pdd.core.GestoreMessaggi;
  28. import org.openspcoop2.pdd.core.PdDContext;
  29. import org.openspcoop2.pdd.core.state.OpenSPCoopStateful;
  30. import org.openspcoop2.pdd.logger.MsgDiagnostico;
  31. import org.openspcoop2.pdd.services.core.RicezioneBuste;
  32. import org.openspcoop2.pdd.services.core.RicezioneBusteMessage;
  33. import org.openspcoop2.pdd.services.core.RicezioneContenutiApplicativi;
  34. import org.openspcoop2.pdd.services.core.RicezioneContenutiApplicativiMessage;
  35. import org.openspcoop2.protocol.engine.constants.Costanti;
  36. import org.openspcoop2.protocol.engine.driver.RepositoryBuste;
  37. import org.openspcoop2.protocol.sdk.state.StateMessage;
  38. import org.openspcoop2.protocol.sdk.state.StatefulMessage;
  39. import org.openspcoop2.utils.Utilities;

  40. /**
  41.  * Classe utilizzata per la ricezione di messaggi contenuti nell'architettura di OpenSPCoop (versione DB).
  42.  *
  43.  * @author Poli Andrea (apoli@link.it)
  44.  * @author $Author$
  45.  * @version $Rev$, $Date$
  46.  */

  47. public class NodeReceiverDB extends AbstractCore implements INodeReceiver{


  48.     private static OpenSPCoop2Properties openspcoopProperties =
  49.         OpenSPCoop2Properties.getInstance();

  50.    
  51.    
  52.     /**
  53.      * Ricezione di un messaggio  
  54.      *
  55.      * @param codicePorta Codice Porta per cui effettuare la receive
  56.      * @param idModulo Nodo destinatario per cui effettuare la ricezione.
  57.      * @param timeout Timeout sulla ricezione
  58.      * @param checkInterval Intervallo di check sulla coda
  59.      * @return true se la ricezione JMS e' andata a buon fine, false altrimenti.
  60.      *
  61.      */
  62.     @Override
  63.     public Object receive(MsgDiagnostico msgDiag, IDSoggetto codicePorta, String idModulo, String idMessaggio,
  64.             long timeout, long checkInterval) throws NodeException,NodeTimeoutException{

  65.         /* ------------  Lettura parametri del messaggio ricevuto e ValidityCheck -------------- */
  66.         Object objReturn = null;

  67.         // connessione al database
  68.         DBManager dbManager = DBManager.getInstance();
  69.         Resource resource = null;
  70.         Connection connectionDB = null;
  71.                        
  72.         // GestoreMessaggi
  73.         String tipoMessaggio = null;
  74.         if(idModulo.startsWith(RicezioneContenutiApplicativi.ID_MODULO)){
  75.             tipoMessaggio = Costanti.INBOX;
  76.         }
  77.         else if(idModulo.startsWith(RicezioneBuste.ID_MODULO)){
  78.             tipoMessaggio = Costanti.OUTBOX;
  79.         }
  80.        
  81.         OpenSPCoopStateful openspcoopstate = new OpenSPCoopStateful();
  82.         openspcoopstate.setStatoRichiesta(new StatefulMessage(null, null));
  83.         openspcoopstate.setStatoRisposta(new StatefulMessage(null, null));
  84.        
  85.         GestoreMessaggi gestoreMessaggioRichiesta = new GestoreMessaggi(openspcoopstate, false, idMessaggio,tipoMessaggio,msgDiag,null);

  86.         try{
  87.             long attesa = 0;
  88.             String idRisposta = null;
  89.             GestoreMessaggi gestoreMessaggioRisposta = null;
  90.             boolean messaggioPresente = false;
  91.            
  92.             int refreshOnlyCacheCount = 0;
  93.            
  94.             while(attesa<timeout){

  95.                 refreshOnlyCacheCount++;
  96.                 msgDiag.highDebug("Analisi richiesta con ID ["+idMessaggio+"] tipo ["+tipoMessaggio+"] contatoreRif["+refreshOnlyCacheCount+"]");
  97.                
  98.                 int nodeDBRefresh = NodeReceiverDB.openspcoopProperties.getNodeReceiverCheckDBInterval();
  99.                 int nodeRiferimentoMsgRefresh = NodeReceiverDB.openspcoopProperties.getNodeReceiverCheckDBInterval();
  100.                 try{
  101.                     if(nodeRiferimentoMsgRefresh>2)
  102.                         nodeRiferimentoMsgRefresh = nodeDBRefresh / 2;
  103.                 }catch(Exception e){
  104.                     msgDiag.highDebug("CheckDBInterval (proprieta' 'org.openspcoop.pdd.nodeReceiver.checkDB') non corretto: "+e.getMessage());
  105.                 }
  106.                
  107.                 msgDiag.highDebug("Proprieta' nodeDBRefresh["+nodeDBRefresh+"] nodeRiferimentoMsgRefresh["+nodeRiferimentoMsgRefresh+"]");
  108.                
  109.                 // Il riferimento messaggio e il proprietario puo' essere letto dalla cache.
  110.                 // Per migliorare le performance, se non e' presente nella cache si assume anche non presente nel database.
  111.                 // - Per rifMsg se si chiama il metodo mapRiferimentoIntoIDBusta(true) l'id viene cercato solo nella cache
  112.                 // - Per proprietario se si chiama il metodo getProprietario(true) viene cercato solo nella cache
  113.                 // La cache puo' cmq svuotarsi velocemente se la dimensione e' troppo piccola rispetto al numero di msg in parallelo gestiti.
  114.                 // Quindi ogni nodeDBRefresh volte (se checkInterval=500 ogni nodeDBRefresh*500 millisecondi) viene controllato anche il database, oltre alla cache.
  115.                 boolean checkOnlyCache = NodeReceiverDB.openspcoopProperties.isAbilitataCacheGestoreMessaggi();
  116.                 if(refreshOnlyCacheCount==(nodeDBRefresh+1)){
  117.                     msgDiag.highDebug("Re-inizializzo contatore refreshOnlyCacheCount");
  118.                     refreshOnlyCacheCount = 1;
  119.                     checkOnlyCache = false;
  120.                 }
  121.                
  122.                 // Il riferimentoMessaggio puo' cambiare l'associazione nel tempo.
  123.                 // Es. rispostaSincrona che e' un MessaggioErroreProtocollo.... prima viene salvato l'errore, e poi viene salvato il msg ritornato al servizio applicativo.
  124.                 // Entrambi, in ordine avranno come key il rifMessaggio.
  125.                 // Ogni (nodeDBRefresh/2) volte (se checkInterval=500 ogni (nodeDBRefresh/2)*500 millisecondi) viene fatto il refresh del messaggio
  126.                 if( (refreshOnlyCacheCount%nodeRiferimentoMsgRefresh)==0 ){
  127.                     msgDiag.highDebug("Re-inizializzo idRisposta e gestoreMessaggioRisposta");
  128.                     idRisposta = null;
  129.                     gestoreMessaggioRisposta = null;
  130.                 }
  131.                
  132.                 // Prendo la connessione solo se mi serve per la ricerca su database
  133.                 boolean needConnection = false;
  134.                 if(NodeReceiverDB.openspcoopProperties.singleConnectionTransactionManager()==false)
  135.                     needConnection = true;
  136.                 else if(resource==null)
  137.                     needConnection = true;
  138.                 if( (checkOnlyCache==false) && needConnection ){
  139.                     msgDiag.highDebug("Prendo Connessione per NodeReceiver");
  140.                     try{
  141.                         resource = dbManager.getResource(codicePorta,idModulo, PdDContext.getValue(org.openspcoop2.core.constants.Costanti.ID_TRANSAZIONE, this.getPddContext()));
  142.                     }catch(Exception e){
  143.                         throw new NodeException("Impossibile ottenere una Risorsa dal DBManager",e);
  144.                     }
  145.                     if(resource==null)
  146.                         throw new NodeException("Risorsa is null");
  147.                     if(resource.getResource() == null)  
  148.                         throw new NodeException("Connessione is null");
  149.                     connectionDB = (Connection) resource.getResource();
  150.                     ((StateMessage)openspcoopstate.getStatoRichiesta()).setConnectionDB(connectionDB);
  151.                     ((StateMessage)openspcoopstate.getStatoRisposta()).setConnectionDB(connectionDB);
  152.                 }
  153.                
  154.                 // Check esistenza messaggio
  155.                 if(idRisposta==null){
  156.                     msgDiag.highDebug("Analisi richiesta con ID ["+idMessaggio+"] tipo ["+tipoMessaggio+"]: lettura ID Risposta");
  157.                     idRisposta = gestoreMessaggioRichiesta.mapRiferimentoIntoIDBusta(checkOnlyCache); //only cache
  158.                 }
  159.                 if(idRisposta!=null){
  160.                     if(gestoreMessaggioRisposta==null){
  161.                         msgDiag.highDebug("Analisi richiesta con ID ["+idMessaggio+"] tipo ["+tipoMessaggio+"]: Costruisco GestoreRisposta per ID["+idRisposta+"]");
  162.                         gestoreMessaggioRisposta = new GestoreMessaggi(openspcoopstate, false, idRisposta,tipoMessaggio,msgDiag,null);
  163.                         msgDiag.highDebug("Analisi risposta con ID ["+idRisposta+"] tipo ["+tipoMessaggio+"]: existsMessage("+checkOnlyCache+")");
  164.                         if(checkOnlyCache)
  165.                             messaggioPresente = gestoreMessaggioRisposta.existsMessage_onlyCache();
  166.                         else
  167.                             messaggioPresente = gestoreMessaggioRisposta.existsMessage();
  168.                     }else{
  169.                         if(messaggioPresente==false){
  170.                             msgDiag.highDebug("Analisi risposta con ID ["+idRisposta+"] tipo ["+tipoMessaggio+"]: existsMessage("+checkOnlyCache+")");
  171.                             if(checkOnlyCache)
  172.                                 messaggioPresente = gestoreMessaggioRisposta.existsMessage_onlyCache();
  173.                             else
  174.                                 messaggioPresente = gestoreMessaggioRisposta.existsMessage();
  175.                         }
  176.                     }
  177.                 }
  178.                
  179.                 msgDiag.highDebug("Analisi risposta con ID ["+idRisposta+"] tipo ["+tipoMessaggio+"]: existsMessage="+messaggioPresente);
  180.                
  181.                
  182.                 if(messaggioPresente){

  183.                     // read Proprietario
  184.                     /*String proprietario = null;
  185.                     if( idModulo.startsWith(RicezioneContenutiApplicativi.ID_MODULO)) {
  186.                         proprietario = gestoreMessaggioRisposta.getProprietario_SerializableRead(NodeReceiverDB.openspcoopProperties.getGestioneSerializableDB_AttesaAttiva(),NodeReceiverDB.openspcoopProperties.getGestioneSerializableDB_CheckInterval());
  187.                     } else {
  188.                         proprietario = gestoreMessaggioRisposta.getProprietario();
  189.                     }*/
  190.                     msgDiag.highDebug("getProprietario("+checkOnlyCache+")");
  191.                     String proprietario = gestoreMessaggioRisposta.getProprietario(idModulo,checkOnlyCache);
  192.                     msgDiag.highDebug("getProprietario("+checkOnlyCache+") proprietario="+proprietario);
  193.                    
  194.                     // check proprietario
  195.                     if( idModulo.startsWith(RicezioneContenutiApplicativi.ID_MODULO)) {
  196.                         messaggioPresente = idModulo.equals(proprietario);
  197.                     }else if( idModulo.startsWith(RicezioneBuste.ID_MODULO)) {
  198.                         messaggioPresente = idModulo.equals(proprietario);
  199.                     }
  200.                     msgDiag.highDebug("Analisi risposta con ID ["+idRisposta+"] tipo ["+tipoMessaggio+"] proprietario["+proprietario+" existsMessage="+messaggioPresente);
  201.                    
  202.                 }
  203.                
  204.                 // Gestione messaggio
  205.                 if(messaggioPresente==false){
  206.                    
  207.                     //  rilascio e riprendo la connessione ogni checkInterval fino ad un timeout o alla ricezione di un oggetto
  208.                     if( (NodeReceiverDB.openspcoopProperties.singleConnectionNodeReceiver()==false) && (checkOnlyCache==false) ){
  209.                         msgDiag.highDebug("Rilascio connessione per NodeReceiver");
  210.                         dbManager.releaseResource(codicePorta, idModulo, resource);
  211.                     }
  212.                    
  213.                     msgDiag.highDebug("Sleep...");
  214.                     Utilities.sleep(checkInterval);
  215.                     attesa = attesa + checkInterval;

  216.                 }else{
  217.                    
  218.                     // Prendo informazioni da RepositoryBuste, se non era settata la connesione, la setto
  219.                     if( needConnection && checkOnlyCache){
  220.                         msgDiag.highDebug("Prendo Connessione per NodeReceiver");
  221.                         try{
  222.                             resource = dbManager.getResource(codicePorta,idModulo, PdDContext.getValue(org.openspcoop2.core.constants.Costanti.ID_TRANSAZIONE, this.getPddContext()));
  223.                         }catch(Exception e){
  224.                             throw new NodeException("Impossibile ottenere una Risorsa dal DBManager",e);
  225.                         }
  226.                         if(resource==null)
  227.                             throw new NodeException("Risorsa is null");
  228.                         if(resource.getResource() == null)
  229.                             throw new NodeException("Connessione is null");
  230.                         connectionDB = (Connection) resource.getResource();
  231.                     }
  232.                    
  233.                     StatefulMessage state = new StatefulMessage(connectionDB, null);
  234.                     if(idModulo.startsWith(RicezioneContenutiApplicativi.ID_MODULO)){
  235.                         msgDiag.highDebug("Lettura risposta per RicezioneContenutiApplicativi...");
  236.                         objReturn = new RicezioneContenutiApplicativiMessage();
  237.                         ((RicezioneContenutiApplicativiMessage)objReturn).setIdBustaRisposta(idRisposta);
  238.                         RepositoryBuste repositoryBuste = new RepositoryBuste(state, false,null);
  239.                         ((RicezioneContenutiApplicativiMessage)objReturn).setIdCollaborazione(repositoryBuste.getCollaborazioneFromInBox(idRisposta));
  240.                         ((RicezioneContenutiApplicativiMessage)objReturn).setProfiloCollaborazione(repositoryBuste.getProfiloCollaborazioneFromInBox(idRisposta),
  241.                                 repositoryBuste.getProfiloCollaborazioneValueFromInBox(idRisposta));
  242.                         try{
  243.                             ((RicezioneContenutiApplicativiMessage)objReturn).setPddContext(gestoreMessaggioRisposta.getPdDContext());
  244.                         }catch(Exception e){
  245.                             // ignore
  246.                         }
  247.                         msgDiag.highDebug("Lettura risposta per RicezioneContenutiApplicativi effettuata");
  248.                     }
  249.                     else if(idModulo.startsWith(RicezioneBuste.ID_MODULO)){
  250.                         msgDiag.highDebug("Lettura risposta per RicezioneBuste...");
  251.                         objReturn = new RicezioneBusteMessage();
  252.                         RepositoryBuste repositoryBuste = new RepositoryBuste(state, false,null);
  253.                         if(repositoryBuste.isRegistrataIntoOutBox(idRisposta)){
  254.                             ((RicezioneBusteMessage)objReturn).setBustaRisposta(repositoryBuste.getBustaFromOutBox(idRisposta));
  255.                         }else{
  256.                             ((RicezioneBusteMessage)objReturn).setIdMessaggioSblocco(idRisposta);
  257.                         }
  258.                         try{
  259.                             ((RicezioneBusteMessage)objReturn).setPddContext(gestoreMessaggioRisposta.getPdDContext());
  260.                         }catch(Exception e){
  261.                             // ignore
  262.                         }
  263.                         msgDiag.highDebug("Lettura risposta per RicezioneBuste effettuata");
  264.                     }
  265.                    
  266.                     // rilascio e riprendo la connessione ogni checkInterval fino ad un timeout o alla ricezione di un oggetto
  267.                     if( (NodeReceiverDB.openspcoopProperties.singleConnectionNodeReceiver()==false) && (checkOnlyCache==false) ){
  268.                         msgDiag.highDebug("Rilascio connessione per NodeReceiver");
  269.                         dbManager.releaseResource(codicePorta, idModulo, resource);
  270.                     }
  271.                    
  272.                     msgDiag.highDebug("Fine Lettura");
  273.                     break;
  274.                 }

  275.             }

  276.         } catch (Exception e) {
  277.             throw new NodeException("Riscontrato errore nella ricezione del messaggio di risposta per la gestione della richiesta:"
  278.                     +e.getMessage(),e);
  279.         } finally{
  280.             msgDiag.highDebug("Rilascio connessione per NodeReceiver");
  281.             dbManager.releaseResource(codicePorta, idModulo, resource);
  282.         }
  283.        
  284.         if(objReturn == null){
  285.             throw new NodeTimeoutException("Riscontrato errore durante ricezione del messaggio: Messaggio non ricevuto");
  286.         }
  287.        
  288.         return objReturn;
  289.     }

  290. }