JMSReceiver.java

  1. /*
  2.  * GovWay - A customizable API Gateway
  3.  * https://govway.org
  4.  *
  5.  * Copyright (c) 2005-2025 Link.it srl (https://link.it).
  6.  *
  7.  * This program is free software: you can redistribute it and/or modify
  8.  * it under the terms of the GNU General Public License version 3, as published by
  9.  * the Free Software Foundation.
  10.  *
  11.  * This program is distributed in the hope that it will be useful,
  12.  * but WITHOUT ANY WARRANTY; without even the implied warranty of
  13.  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  14.  * GNU General Public License for more details.
  15.  *
  16.  * You should have received a copy of the GNU General Public License
  17.  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
  18.  *
  19.  */



  20. package org.openspcoop2.pdd.core;

  21. import java.util.HashMap;
  22. import java.util.Map;

  23. import javax.jms.JMSException;
  24. import javax.jms.ObjectMessage;
  25. import javax.jms.MessageConsumer;
  26. import javax.jms.Queue;

  27. import org.slf4j.Logger;
  28. import org.openspcoop2.core.id.IDSoggetto;
  29. import org.openspcoop2.pdd.config.JMSObject;
  30. import org.openspcoop2.pdd.config.QueueManager;
  31. import org.openspcoop2.pdd.config.Resource;
  32. import org.openspcoop2.utils.Utilities;

  33. /**
  34.  * Classe utilizzata per ricevere messaggi JMS dai componenti dell'architettura di OpenSPCoop.
  35.  *
  36.  * @author Poli Andrea (apoli@link.it)
  37.  * @author $Author$
  38.  * @version $Rev$, $Date$
  39.  */

  40. public class JMSReceiver {

  41.    
  42.     /** QueueManager */
  43.     private QueueManager qmanager;
  44.     /** Indicazione sul modulo che utilizza il Sender  */
  45.     private String idModulo = null;
  46.     /** Indicazione sul codice porta del Sender */
  47.     private IDSoggetto codicePorta = null;

  48.     /** Object Ricevuto */
  49.     private Object received;
  50.     /** Proprieta' Ricevute */
  51.     private Map<String,java.io.Serializable> propertiesReceived;
  52.     /** motivo di un eventuale errore */
  53.     private String errore;
  54.     /** ID JMS presente nell'header del messaggio ricevuto */
  55.     private String idHeaderJMS;
  56.    
  57.     /** Indicazione se deve essere utilizzata una singola connessione JMS */
  58.     private boolean singleConnection;
  59.    
  60.     /** Logger */
  61.     private Logger log;

  62.     /** IDTransazione */
  63.     private String idTransazione;
  64.    

  65.     /**
  66.      * Costruttore
  67.      *
  68.      * @param aCodicePorta Codice del dominio che sta gestendo la richiesta.
  69.      * @param aIDModulo Identificativo del Receiver.
  70.      * @param singleConnection Indicazione se deve essere utilizzata una singola connessione JMS
  71.      *
  72.      */
  73.     public JMSReceiver(IDSoggetto aCodicePorta,String aIDModulo,boolean singleConnection,Logger log,String idTransazione) throws Exception {
  74.         this.codicePorta = aCodicePorta;
  75.         this.idModulo = "JMSReceiver."+aIDModulo;
  76.         this.singleConnection = singleConnection;
  77.         this.qmanager = QueueManager.getInstance();
  78.         this.log = log;
  79.         this.idTransazione = idTransazione;
  80.     }

  81.     /**
  82.      * Ricezione di un messaggio  (Serve per filtrare una coda con un messaggio)
  83.      *
  84.      * @param destinatario Nodo destinatario per cui effettuare la ricezione.
  85.      * @param msgSelector Filtro da utilizzare per la ricezione
  86.      * @return true se la ricezione JMS e' andata a buon fine, false altrimenti.
  87.      *
  88.      */
  89.     public boolean clean(String destinatario,String msgSelector){
  90.         return receive(destinatario,msgSelector,1,1);
  91.     }
  92.    
  93.     /**
  94.      * Ricezione di un messaggio  
  95.      *
  96.      * @param destinatario Nodo destinatario per cui effettuare la ricezione.
  97.      * @param timeout Timeout sulla ricezione
  98.      * @param checkInterval Intervallo di check sulla coda
  99.      * @return true se la ricezione JMS e' andata a buon fine, false altrimenti.
  100.      *
  101.      */
  102.     public boolean receive(String destinatario,long timeout,long checkInterval){
  103.         return receive(destinatario,null,timeout,checkInterval);
  104.     }
  105.     /**
  106.      * Ricezione di un messaggio  
  107.      *
  108.      * @param destinatario Nodo destinatario per cui effettuare la ricezione.
  109.      * @param msgSelector Filtro da utilizzare per la ricezione
  110.      * @param timeout Timeout sulla ricezione
  111.      * @param checkInterval Intervallo di check sulla coda
  112.      * @return true se la ricezione JMS e' andata a buon fine, false altrimenti.
  113.      *
  114.      */
  115.     @SuppressWarnings("resource")
  116.     public boolean receive(String destinatario,String msgSelector,long timeout,long checkInterval){

  117.         Resource resource = null;
  118.         MessageConsumer receiver = null;
  119.         try{

  120.             JMSObject jmsObject = null;
  121.             try{
  122.                 resource = this.qmanager.getResource(this.codicePorta,this.idModulo,this.idTransazione);
  123.                 if(resource == null)
  124.                     throw new JMSException("Resource is null");
  125.                 if(resource.getResource() == null)
  126.                     throw new JMSException("JMSObject is null");
  127.                 jmsObject = (JMSObject) resource.getResource();
  128.                 if(jmsObject.getConnection()==null)
  129.                     throw new Exception("Connessione is null");
  130.                 if(jmsObject.getSession()==null)
  131.                     throw new Exception("Sessione is null");
  132.             }catch(Exception e){
  133.                 this.log.error("JMSObject non ottenibile dal Pool: "+e.getMessage(),e);
  134.                 this.errore ="JMSObject non ottenibile dal Pool: "+e.getMessage();
  135.                 return false;
  136.             }
  137.            
  138.             if(destinatario == null){
  139.                 this.qmanager.releaseResource(this.codicePorta,this.idModulo,resource);
  140.                 return false; // non deve essere ricevuto nulla ??.
  141.             }
  142.                
  143.             // Get Coda
  144.             Queue queue = this.qmanager.getQueue(destinatario);
  145.             if(queue == null){
  146.                 this.qmanager.releaseResource(this.codicePorta,this.idModulo,resource);
  147.                 this.errore="La coda ["+destinatario+"] non e' tra quelle registrate per OpenSPCoop";
  148.                 return false;
  149.             }

  150.             //  Start connection
  151.             try{
  152.                 jmsObject.getConnection().start();
  153.             }catch(javax.jms.JMSException e){
  154.                 this.qmanager.releaseResource(this.codicePorta,this.idModulo,resource);
  155.                 this.errore = "Riscontrato errore durante lo start della connessione ["+destinatario+"] :"+e.getMessage();
  156.                 return false;
  157.             }
  158.            
  159.             // Receiver
  160.             try{
  161.                 if(msgSelector != null)
  162.                     receiver = jmsObject.getSession().createConsumer(queue,msgSelector);
  163.                 else
  164.                     receiver = jmsObject.getSession().createConsumer(queue);
  165.             }catch(javax.jms.JMSException e){
  166.                 this.qmanager.releaseResource(this.codicePorta,this.idModulo,resource);
  167.                 this.log.error("Riscontrato errore durante la creazione del receiver ["+destinatario+"] :"+e.getMessage(),e);
  168.                 this.errore = "Riscontrato errore durante la creazione del receiver ["+destinatario+"] :"+e.getMessage();
  169.                 return false;
  170.             }

  171.             // Ricezione Oggetto
  172.             ObjectMessage receivedMsg = null;
  173.             long attesa = 0;
  174.             while(attesa<timeout){
  175.                
  176.                 //System.out.println("WHILE ["+this.idModulo+"]");
  177.                
  178.                 receivedMsg = (ObjectMessage) receiver.receive(checkInterval);
  179.                 attesa = attesa + checkInterval;
  180.                 if(receivedMsg==null){
  181.                    
  182.                     //  rilascio e riprendo la connessione JMS ogni checkInterval fino ad un timeout o alla ricezione di un oggetto
  183.                     if(this.singleConnection==false){
  184.                         //System.out.println("Rilascio ["+this.idModulo+"]");
  185.                         receiver.close();
  186.                         this.qmanager.releaseResource(this.codicePorta,this.idModulo,resource);
  187.                     }
  188.                    
  189.                     Utilities.sleep(checkInterval);
  190.                     attesa = attesa + checkInterval;
  191.                    
  192.                     if(this.singleConnection==false){
  193.                         try{
  194.                             resource = this.qmanager.getResource(this.codicePorta,this.idModulo,this.idTransazione);
  195.                             if(resource == null)
  196.                                 throw new JMSException("Resource is null");
  197.                             if(resource.getResource() == null)
  198.                                 throw new JMSException("JMSObject is null");
  199.                             jmsObject = (JMSObject) resource.getResource();
  200.                         }catch(Exception e){
  201.                             this.log.error("JMSObject non ottenibile dal Pool: "+e.getMessage(),e);
  202.                             this.errore ="JMSObject non ottenibile dal Pool: "+e.getMessage();
  203.                             return false;
  204.                         }
  205.                         //System.out.println("Inizializzo ["+this.idModulo+"] ["+msgSelector+"]");
  206.                         //  Start connection
  207.                         try{
  208.                             jmsObject.getConnection().start();
  209.                         }catch(javax.jms.JMSException e){
  210.                             this.qmanager.releaseResource(this.codicePorta,this.idModulo,resource);
  211.                             this.log.error("Riscontrato errore durante lo start della connessione ["+destinatario+"] :"+e.getMessage(),e);
  212.                             this.errore = "Riscontrato errore durante lo start della connessione ["+destinatario+"] :"+e.getMessage();
  213.                             return false;
  214.                         }  
  215.                         // Reinizializzo Receiver
  216.                         try{
  217.                             if(msgSelector != null){
  218.                                 //System.out.println("MSG SELECTOR ["+msgSelector+"]");
  219.                                 receiver = jmsObject.getSession().createConsumer(queue,msgSelector);
  220.                             }else{
  221.                                 receiver = jmsObject.getSession().createConsumer(queue);
  222.                             }
  223.                         }catch(javax.jms.JMSException e){
  224.                             this.qmanager.releaseResource(this.codicePorta,this.idModulo,resource);
  225.                             this.log.error("Riscontrato errore durante la creazione del receiver ["+destinatario+"] :"+e.getMessage(),e);
  226.                             this.errore = "Riscontrato errore durante la creazione del receiver ["+destinatario+"] :"+e.getMessage();
  227.                             return false;
  228.                         }
  229.                     }
  230.                 }else{
  231.                     //System.out.println("TROVATO ["+this.idModulo+"]");
  232.                     break;
  233.                 }
  234.             }
  235.             if(receivedMsg == null){
  236.                 this.qmanager.releaseResource(this.codicePorta,this.idModulo,resource);
  237.                 this.errore = "Riscontrato errore durante ricezione del messaggio: Messaggio non ricevuto";
  238.                 receiver.close();
  239.                 return false;
  240.             }

  241.             // LetturaOggetto interno
  242.             this.received = receivedMsg.getObject();
  243.             this.idHeaderJMS = receivedMsg.getJMSMessageID();

  244.             // Lettura Proprieta'
  245.             this.propertiesReceived = new HashMap<String,java.io.Serializable>();
  246.             try{
  247.                 // IDMessaggio
  248.                 String idMessaggio = receivedMsg.getStringProperty("ID");
  249.                 this.propertiesReceived.put("ID",idMessaggio);
  250.             }   catch(javax.jms.JMSException e){}
  251.             try{
  252.                 // ContenutoRispostaPresente
  253.                 boolean contenutoRisposta = receivedMsg.getBooleanProperty("ContenutoRispostaPresente");
  254.                 this.propertiesReceived.put("ContenutoRispostaPresente",Boolean.valueOf(contenutoRisposta));
  255.             }   catch(javax.jms.JMSException e){}
  256.             // ......
  257.            
  258.             // Rilascio producer
  259.             receiver.close();
  260.        
  261.             this.qmanager.releaseResource(this.codicePorta,this.idModulo,resource);
  262.             return true;

  263.         }catch(Exception e){
  264.             //  Rilascio producer
  265.             try{
  266.                 if(receiver!=null)
  267.                     receiver.close();
  268.             }catch(Exception eClose){}
  269.             if(resource!=null){
  270.                 try{
  271.                     this.qmanager.releaseResource(this.codicePorta,this.idModulo,resource);
  272.                 }catch(Exception eClose){}  
  273.             }
  274.             this.log.error("Riscontrato errore durante la ricezione da una coda :"+e.getMessage(),e);
  275.             this.errore = "Riscontrato errore durante la ricezione da una coda :"+e.getMessage();  
  276.             return false;
  277.         }

  278.     }






  279.     /**
  280.      * In caso di avvenuto errore in fase di consegna, questo metodo ritorna il motivo dell'errore.
  281.      *
  282.      * @return motivo dell'errore (se avvenuto in fase di consegna).
  283.      *
  284.      */
  285.     public String getErrore(){
  286.         return this.errore;
  287.     }
  288.     /**
  289.      * In caso di ricezione con successo, questo metodo ritorna l'oggetto ricevuto.
  290.      *
  291.      * @return Oggetto ricevuto.
  292.      *
  293.      */
  294.     public Object getObjectReceived(){
  295.         return this.received;
  296.     }
  297.     /**
  298.      * In caso di ricezione con successo, questo metodo ritorna le proprieta' ricevute.
  299.      *
  300.      * @return Proprieta' ricevute.
  301.      *
  302.      */
  303.     public Map<String,java.io.Serializable> getPropertiesReceived(){
  304.         return this.propertiesReceived;
  305.     }

  306.     /**
  307.      * ID JMS presente nell'header del messaggio ricevuto
  308.      *
  309.      * @return ID JMS presente nell'header del messaggio ricevuto
  310.      *
  311.      */
  312.     public String getIdHeaderJMS(){
  313.         return this.idHeaderJMS;
  314.     }
  315. }