JMSReceiver.java
/*
* GovWay - A customizable API Gateway
* https://govway.org
*
* Copyright (c) 2005-2024 Link.it srl (https://link.it).
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License version 3, as published by
* the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
*/
package org.openspcoop2.pdd.core;
import java.util.HashMap;
import java.util.Map;
import javax.jms.JMSException;
import javax.jms.ObjectMessage;
import javax.jms.MessageConsumer;
import javax.jms.Queue;
import org.slf4j.Logger;
import org.openspcoop2.core.id.IDSoggetto;
import org.openspcoop2.pdd.config.JMSObject;
import org.openspcoop2.pdd.config.QueueManager;
import org.openspcoop2.pdd.config.Resource;
import org.openspcoop2.utils.Utilities;
/**
* Classe utilizzata per ricevere messaggi JMS dai componenti dell'architettura di OpenSPCoop.
*
* @author Poli Andrea (apoli@link.it)
* @author $Author$
* @version $Rev$, $Date$
*/
public class JMSReceiver {
/** QueueManager */
private QueueManager qmanager;
/** Indicazione sul modulo che utilizza il Sender */
private String idModulo = null;
/** Indicazione sul codice porta del Sender */
private IDSoggetto codicePorta = null;
/** Object Ricevuto */
private Object received;
/** Proprieta' Ricevute */
private Map<String,java.io.Serializable> propertiesReceived;
/** motivo di un eventuale errore */
private String errore;
/** ID JMS presente nell'header del messaggio ricevuto */
private String idHeaderJMS;
/** Indicazione se deve essere utilizzata una singola connessione JMS */
private boolean singleConnection;
/** Logger */
private Logger log;
/** IDTransazione */
private String idTransazione;
/**
* Costruttore
*
* @param aCodicePorta Codice del dominio che sta gestendo la richiesta.
* @param aIDModulo Identificativo del Receiver.
* @param singleConnection Indicazione se deve essere utilizzata una singola connessione JMS
*
*/
public JMSReceiver(IDSoggetto aCodicePorta,String aIDModulo,boolean singleConnection,Logger log,String idTransazione) throws Exception {
this.codicePorta = aCodicePorta;
this.idModulo = "JMSReceiver."+aIDModulo;
this.singleConnection = singleConnection;
this.qmanager = QueueManager.getInstance();
this.log = log;
this.idTransazione = idTransazione;
}
/**
* Ricezione di un messaggio (Serve per filtrare una coda con un messaggio)
*
* @param destinatario Nodo destinatario per cui effettuare la ricezione.
* @param msgSelector Filtro da utilizzare per la ricezione
* @return true se la ricezione JMS e' andata a buon fine, false altrimenti.
*
*/
public boolean clean(String destinatario,String msgSelector){
return receive(destinatario,msgSelector,1,1);
}
/**
* Ricezione di un messaggio
*
* @param destinatario Nodo destinatario per cui effettuare la ricezione.
* @param timeout Timeout sulla ricezione
* @param checkInterval Intervallo di check sulla coda
* @return true se la ricezione JMS e' andata a buon fine, false altrimenti.
*
*/
public boolean receive(String destinatario,long timeout,long checkInterval){
return receive(destinatario,null,timeout,checkInterval);
}
/**
* Ricezione di un messaggio
*
* @param destinatario Nodo destinatario per cui effettuare la ricezione.
* @param msgSelector Filtro da utilizzare per la ricezione
* @param timeout Timeout sulla ricezione
* @param checkInterval Intervallo di check sulla coda
* @return true se la ricezione JMS e' andata a buon fine, false altrimenti.
*
*/
@SuppressWarnings("resource")
public boolean receive(String destinatario,String msgSelector,long timeout,long checkInterval){
Resource resource = null;
MessageConsumer receiver = null;
try{
JMSObject jmsObject = null;
try{
resource = this.qmanager.getResource(this.codicePorta,this.idModulo,this.idTransazione);
if(resource == null)
throw new JMSException("Resource is null");
if(resource.getResource() == null)
throw new JMSException("JMSObject is null");
jmsObject = (JMSObject) resource.getResource();
if(jmsObject.getConnection()==null)
throw new Exception("Connessione is null");
if(jmsObject.getSession()==null)
throw new Exception("Sessione is null");
}catch(Exception e){
this.log.error("JMSObject non ottenibile dal Pool: "+e.getMessage(),e);
this.errore ="JMSObject non ottenibile dal Pool: "+e.getMessage();
return false;
}
if(destinatario == null){
this.qmanager.releaseResource(this.codicePorta,this.idModulo,resource);
return false; // non deve essere ricevuto nulla ??.
}
// Get Coda
Queue queue = this.qmanager.getQueue(destinatario);
if(queue == null){
this.qmanager.releaseResource(this.codicePorta,this.idModulo,resource);
this.errore="La coda ["+destinatario+"] non e' tra quelle registrate per OpenSPCoop";
return false;
}
// Start connection
try{
jmsObject.getConnection().start();
}catch(javax.jms.JMSException e){
this.qmanager.releaseResource(this.codicePorta,this.idModulo,resource);
this.errore = "Riscontrato errore durante lo start della connessione ["+destinatario+"] :"+e.getMessage();
return false;
}
// Receiver
try{
if(msgSelector != null)
receiver = jmsObject.getSession().createConsumer(queue,msgSelector);
else
receiver = jmsObject.getSession().createConsumer(queue);
}catch(javax.jms.JMSException e){
this.qmanager.releaseResource(this.codicePorta,this.idModulo,resource);
this.log.error("Riscontrato errore durante la creazione del receiver ["+destinatario+"] :"+e.getMessage(),e);
this.errore = "Riscontrato errore durante la creazione del receiver ["+destinatario+"] :"+e.getMessage();
return false;
}
// Ricezione Oggetto
ObjectMessage receivedMsg = null;
long attesa = 0;
while(attesa<timeout){
//System.out.println("WHILE ["+this.idModulo+"]");
receivedMsg = (ObjectMessage) receiver.receive(checkInterval);
attesa = attesa + checkInterval;
if(receivedMsg==null){
// rilascio e riprendo la connessione JMS ogni checkInterval fino ad un timeout o alla ricezione di un oggetto
if(this.singleConnection==false){
//System.out.println("Rilascio ["+this.idModulo+"]");
receiver.close();
this.qmanager.releaseResource(this.codicePorta,this.idModulo,resource);
}
Utilities.sleep(checkInterval);
attesa = attesa + checkInterval;
if(this.singleConnection==false){
try{
resource = this.qmanager.getResource(this.codicePorta,this.idModulo,this.idTransazione);
if(resource == null)
throw new JMSException("Resource is null");
if(resource.getResource() == null)
throw new JMSException("JMSObject is null");
jmsObject = (JMSObject) resource.getResource();
}catch(Exception e){
this.log.error("JMSObject non ottenibile dal Pool: "+e.getMessage(),e);
this.errore ="JMSObject non ottenibile dal Pool: "+e.getMessage();
return false;
}
//System.out.println("Inizializzo ["+this.idModulo+"] ["+msgSelector+"]");
// Start connection
try{
jmsObject.getConnection().start();
}catch(javax.jms.JMSException e){
this.qmanager.releaseResource(this.codicePorta,this.idModulo,resource);
this.log.error("Riscontrato errore durante lo start della connessione ["+destinatario+"] :"+e.getMessage(),e);
this.errore = "Riscontrato errore durante lo start della connessione ["+destinatario+"] :"+e.getMessage();
return false;
}
// Reinizializzo Receiver
try{
if(msgSelector != null){
//System.out.println("MSG SELECTOR ["+msgSelector+"]");
receiver = jmsObject.getSession().createConsumer(queue,msgSelector);
}else{
receiver = jmsObject.getSession().createConsumer(queue);
}
}catch(javax.jms.JMSException e){
this.qmanager.releaseResource(this.codicePorta,this.idModulo,resource);
this.log.error("Riscontrato errore durante la creazione del receiver ["+destinatario+"] :"+e.getMessage(),e);
this.errore = "Riscontrato errore durante la creazione del receiver ["+destinatario+"] :"+e.getMessage();
return false;
}
}
}else{
//System.out.println("TROVATO ["+this.idModulo+"]");
break;
}
}
if(receivedMsg == null){
this.qmanager.releaseResource(this.codicePorta,this.idModulo,resource);
this.errore = "Riscontrato errore durante ricezione del messaggio: Messaggio non ricevuto";
receiver.close();
return false;
}
// LetturaOggetto interno
this.received = receivedMsg.getObject();
this.idHeaderJMS = receivedMsg.getJMSMessageID();
// Lettura Proprieta'
this.propertiesReceived = new HashMap<String,java.io.Serializable>();
try{
// IDMessaggio
String idMessaggio = receivedMsg.getStringProperty("ID");
this.propertiesReceived.put("ID",idMessaggio);
} catch(javax.jms.JMSException e){}
try{
// ContenutoRispostaPresente
boolean contenutoRisposta = receivedMsg.getBooleanProperty("ContenutoRispostaPresente");
this.propertiesReceived.put("ContenutoRispostaPresente",Boolean.valueOf(contenutoRisposta));
} catch(javax.jms.JMSException e){}
// ......
// Rilascio producer
receiver.close();
this.qmanager.releaseResource(this.codicePorta,this.idModulo,resource);
return true;
}catch(Exception e){
// Rilascio producer
try{
if(receiver!=null)
receiver.close();
}catch(Exception eClose){}
if(resource!=null){
try{
this.qmanager.releaseResource(this.codicePorta,this.idModulo,resource);
}catch(Exception eClose){}
}
this.log.error("Riscontrato errore durante la ricezione da una coda :"+e.getMessage(),e);
this.errore = "Riscontrato errore durante la ricezione da una coda :"+e.getMessage();
return false;
}
}
/**
* In caso di avvenuto errore in fase di consegna, questo metodo ritorna il motivo dell'errore.
*
* @return motivo dell'errore (se avvenuto in fase di consegna).
*
*/
public String getErrore(){
return this.errore;
}
/**
* In caso di ricezione con successo, questo metodo ritorna l'oggetto ricevuto.
*
* @return Oggetto ricevuto.
*
*/
public Object getObjectReceived(){
return this.received;
}
/**
* In caso di ricezione con successo, questo metodo ritorna le proprieta' ricevute.
*
* @return Proprieta' ricevute.
*
*/
public Map<String,java.io.Serializable> getPropertiesReceived(){
return this.propertiesReceived;
}
/**
* ID JMS presente nell'header del messaggio ricevuto
*
* @return ID JMS presente nell'header del messaggio ricevuto
*
*/
public String getIdHeaderJMS(){
return this.idHeaderJMS;
}
}