QueueManager.java

/*
 * GovWay - A customizable API Gateway 
 * https://govway.org
 * 
 * Copyright (c) 2005-2024 Link.it srl (https://link.it). 
 * 
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU General Public License version 3, as published by
 * the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
 *
 */



package org.openspcoop2.pdd.config;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Queue;
import javax.jms.Session;

import org.openspcoop2.core.commons.CoreException;
import org.openspcoop2.core.commons.IMonitoraggioRisorsa;
import org.openspcoop2.core.id.IDSoggetto;
import org.openspcoop2.pdd.logger.MsgDiagnostico;
import org.openspcoop2.pdd.mdb.ConsegnaContenutiApplicativi;
import org.openspcoop2.pdd.mdb.Imbustamento;
import org.openspcoop2.pdd.mdb.ImbustamentoRisposte;
import org.openspcoop2.pdd.mdb.InoltroBuste;
import org.openspcoop2.pdd.mdb.InoltroRisposte;
import org.openspcoop2.pdd.mdb.Sbustamento;
import org.openspcoop2.pdd.mdb.SbustamentoRisposte;
import org.openspcoop2.pdd.services.core.RicezioneBuste;
import org.openspcoop2.pdd.services.core.RicezioneContenutiApplicativi;
import org.openspcoop2.utils.date.DateManager;
import org.openspcoop2.utils.resources.GestoreJNDI;


/**
 * Contiene la gestione delle connessioni al broker delle code JMS.
 * Il nome della risorsa JNDI da cui e' possibili attingere connessioni verso il Provider JMS, 
 * viene selezionato attraverso le impostazioni lette dal file 'govway.properties'
 * e gestite attraverso l'utilizzo della classe  {@link org.openspcoop2.pdd.config.OpenSPCoop2Properties}.
 *
 *
 * @author Poli Andrea (apoli@link.it)
 * @author $Author$
 * @version $Rev$, $Date$
 */

public class QueueManager implements java.io.Serializable,IMonitoraggioRisorsa{


	/**
	 * 
	 */
	private static final long serialVersionUID = 2542909452367582138L;
	/** Coda su cui il webService 'RicezioneContenutiApplicativi_XXX' sta' attendendo una risposta */
	public static Queue QUEUE_RICEZIONE_CONTENUTI_APPLICATIVI;
	/** Coda su cui il webService 'RicezioneBuste_XXX' sta' attendendo una risposta */
	public static Queue QUEUE_RICEZIONE_BUSTE;
	/** Coda su cui l'MDB 'Imbustamento' sta' attendendo un messaggio */
	public static Queue QUEUE_IMBUSTAMENTO;
	/** Coda su cui l'MDB 'ImbustamentoRisposte' sta' attendendo un messaggio */
	public static Queue QUEUE_IMBUSTAMENTO_RISPOSTE;
	/** Coda su cui l'MDB 'Sbustamento' sta' attendendo un messaggio */
	public static Queue QUEUE_SBUSTAMENTO;
	/** Coda su cui l'MDB 'SbustamentoRisposte' sta' attendendo un messaggio */
	public static Queue QUEUE_SBUSTAMENTO_RISPOSTE;
	/** Coda su cui l'MDB 'InoltroBuste' sta' attendendo un messaggio */
	public static Queue QUEUE_INOLTRO_BUSTE;
	/** Coda su cui l'MDB 'InoltroRisposte' sta' attendendo un messaggio */
	public static Queue QUEUE_INOLTRO_RISPOSTE;
	/** Coda su cui l'MDB 'ConsegnaMessaggiSoap' sta' attendendo un messaggio */
	public static Queue QUEUE_CONSEGNA_CONTENUTI_APPLICATIVI;

	
	/** OpenSPCoopProperties */
	private static OpenSPCoop2Properties openspcoopProperties = OpenSPCoop2Properties.getInstance();
	
	/** Informazione sui proprietari che hanno richiesto una connessione */
	protected static Map<String,Resource> risorseInGestione = new ConcurrentHashMap<String,Resource>();
	
	public static String[] getStatoRisorse() throws Exception{
		
		Object[] o = QueueManager.risorseInGestione.values().toArray(new Resource[0]);
		if(o==null)
			return null;
		Resource[] resources = (Resource[]) o;
		if(resources==null || resources.length<=0)
			return null;
	
		String [] r = new String[resources.length];
		for(int i=0; i<resources.length; i++){
			Resource rr = resources[i];
			r[i] = rr.getIdentificativoPorta()+"."+rr.getModuloFunzionale();
			if(rr.getIdTransazione()!=null){
				r[i] = r[i] +"."+rr.getIdTransazione();
			}
			r[i] = r[i] +" ("+rr.getDate().toString()+")";
		}
		return r;
	}


	/** QueueManager */
	private static QueueManager manager = null;

	/**
	 * Il Metodo si occupa di inizializzare il QueueManager 
	 *
	 * @param jndiName Nome JNDI del QueueConnectionFactory
	 * @param contextFactory Contesto JNDI da utilizzare per la connection factory/openSPCoopQueueManager
	 * 
	 */
	public static void initialize(String jndiName,
			java.util.Properties contextFactory) throws Exception{

		// Provo ad ottenere un QueueManager
		QueueManager.manager = new QueueManager(jndiName,contextFactory);

	}
	
	/**
	 * Il Metodo si occupa di inizializzare le code di ricezione 
	 *
	 * @param contextQueue Contesto JNDI da utilizzare per le code interne
	 * 
	 */
	public static void initializeQueueNodeReceiver(java.util.Properties contextQueue) throws Exception{

		// Tabella per i nomi jndi delle code
		java.util.Map<String,String> nomiJndi = 
			OpenSPCoop2Properties.getInstance().getJNDIQueueName(true,false);
			
		// Inizializzazione Code
		GestoreJNDI jndiQueue = new GestoreJNDI(contextQueue);
		QueueManager.QUEUE_RICEZIONE_CONTENUTI_APPLICATIVI = 
			(Queue) jndiQueue.lookup(nomiJndi.get(RicezioneContenutiApplicativi.ID_MODULO));
		QueueManager.QUEUE_RICEZIONE_BUSTE= 
			(Queue) jndiQueue.lookup(nomiJndi.get(RicezioneBuste.ID_MODULO));

	}


	/**
	 * Il Metodo si occupa di inizializzare le code di spedizione 
	 *
	 * @param contextQueue Contesto JNDI da utilizzare per le code interne
	 * 
	 */
	public static void initializeQueueNodeSender(java.util.Properties contextQueue) throws Exception{

		// Tabella per i nomi jndi delle code
		java.util.Map<String,String> nomiJndi = OpenSPCoop2Properties.getInstance().getJNDIQueueName(false,true);
		
		// Inizializzazione Code
		GestoreJNDI jndiQueue = new GestoreJNDI(contextQueue);
		QueueManager.QUEUE_IMBUSTAMENTO= 
			(Queue) jndiQueue.lookup(nomiJndi.get(Imbustamento.ID_MODULO));
		QueueManager.QUEUE_IMBUSTAMENTO_RISPOSTE= 
			(Queue) jndiQueue.lookup(nomiJndi.get(ImbustamentoRisposte.ID_MODULO));
		QueueManager.QUEUE_SBUSTAMENTO= 
			(Queue) jndiQueue.lookup(nomiJndi.get(Sbustamento.ID_MODULO));
		QueueManager.QUEUE_SBUSTAMENTO_RISPOSTE= 
			(Queue) jndiQueue.lookup(nomiJndi.get(SbustamentoRisposte.ID_MODULO));
		QueueManager.QUEUE_INOLTRO_BUSTE= 
			(Queue) jndiQueue.lookup(nomiJndi.get(InoltroBuste.ID_MODULO));
		QueueManager.QUEUE_INOLTRO_RISPOSTE= 
			(Queue) jndiQueue.lookup(nomiJndi.get(InoltroRisposte.ID_MODULO));
		QueueManager.QUEUE_CONSEGNA_CONTENUTI_APPLICATIVI= 
			(Queue) jndiQueue.lookup(nomiJndi.get(ConsegnaContenutiApplicativi.ID_MODULO));
	}
	
	
	
	
	/**
	 * Ritorna l'istanza di questo QueueManager
	 *
	 * @return Istanza di QueueManager
	 * 
	 */
	public static QueueManager getInstance(){
		return QueueManager.manager;
	}

	
	/**
	 * Restituisce la coda associata al nodo con identificativo <var>nomeNodo</var>.
	 *
	 * @param idNodo Identificatore del nodo.
	 * 
	 */
	public Queue getQueue(String idNodo) throws Exception{
		if(idNodo.startsWith(RicezioneContenutiApplicativi.ID_MODULO))
			return QueueManager.QUEUE_RICEZIONE_CONTENUTI_APPLICATIVI;
		else if(idNodo.startsWith(RicezioneBuste.ID_MODULO))
			return QueueManager.QUEUE_RICEZIONE_BUSTE;
		else if(Imbustamento.ID_MODULO.equals(idNodo))
			return QueueManager.QUEUE_IMBUSTAMENTO;
		else if(ImbustamentoRisposte.ID_MODULO.equals(idNodo))
			return QueueManager.QUEUE_IMBUSTAMENTO_RISPOSTE;
		else if(Sbustamento.ID_MODULO.equals(idNodo))
			return QueueManager.QUEUE_SBUSTAMENTO;
		else if(SbustamentoRisposte.ID_MODULO.equals(idNodo))
			return QueueManager.QUEUE_SBUSTAMENTO_RISPOSTE;
		else if(InoltroBuste.ID_MODULO.equals(idNodo))
			return QueueManager.QUEUE_INOLTRO_BUSTE;
		else if(InoltroRisposte.ID_MODULO.equals(idNodo))
			return QueueManager.QUEUE_INOLTRO_RISPOSTE;
		else if(ConsegnaContenutiApplicativi.ID_MODULO.equals(idNodo))
			return QueueManager.QUEUE_CONSEGNA_CONTENUTI_APPLICATIVI;
		else 
			return null;
	}
	
	
	
	
	
	/** ConnectionFactory dove attingere connessioni */
	private ConnectionFactory qcf = null;
	/** MsgDiagnostico */
	private transient MsgDiagnostico msgDiag = null;
	
	/**
	 * Costruttore
	 *
	 * @param jndiName Nome JNDI del QueueConnectionFactory
	 * @param context Contesto JNDI da utilizzare
	 * 
	 */
	public QueueManager(String jndiName,java.util.Properties context) throws OpenSPCoop2ConfigurationException{

		this.msgDiag = MsgDiagnostico.newInstance("WrapperQueueManager");
		try {
			GestoreJNDI jndi = new GestoreJNDI(context);

			// ConnectionFactory
			this.qcf = (ConnectionFactory) jndi.lookup(jndiName);
		}
		catch(Exception e) {
			throw new OpenSPCoop2ConfigurationException("WrapperQueueManager: "+e.getMessage(),e);
		}

	}





	/**
	 * Ritorna un JMSObject che contiene una connessione/sessione al JMS Broker
	 *
	 * @param idPDD Identificatore della porta di dominio.
	 * @param modulo Modulo che richiede una connessione.
	 * @return JMSObject.
	 * 
	 */
	public Resource getResource(IDSoggetto idPDD,String modulo,String idTransazione) throws OpenSPCoop2ConfigurationException{
		
		Resource risorsa = new Resource();
		try {
			Connection con = this.qcf.createConnection();
			checkConnection(con);

			// Sessione
			Session s = con.createSession(false,QueueManager.openspcoopProperties.getAcknowledgeModeSessioneConnectionFactory());
			if(s == null){
				con.close();
				throw new OpenSPCoop2ConfigurationException("SessioneNonDisponibile");
			}

			// Object JMS
			
			JMSObject jms = new JMSObject();
			jms.setConnection(con);
			jms.setSession(s); 
			
			String idUnivoco = Resource.generaIdentificatoreUnivoco(idPDD, modulo);
			risorsa.setId(idUnivoco);
			risorsa.setDate(DateManager.getDate());
			risorsa.setIdentificativoPorta(idPDD);
			risorsa.setModuloFunzionale(modulo);
			risorsa.setResource(jms);
			risorsa.setResourceType(JMSObject.class.getName());
			risorsa.setIdTransazione(idTransazione);
				
			QueueManager.risorseInGestione.put(idUnivoco, risorsa);
			
			return risorsa;

		}
		catch(Exception e) {
			this.msgDiag.aggiornaFiltri();
			this.msgDiag.setDominio(idPDD);
			this.msgDiag.setFunzione("QueueManager."+modulo);
			this.msgDiag.logFatalError(e, "Richiesta connessione al QueueManager");
			throw new OpenSPCoop2ConfigurationException("getJMSObject: "+e.getMessage());
		}
	}
	private static void checkConnection(Connection con) throws OpenSPCoop2ConfigurationException {
		if(con == null)
			throw new OpenSPCoop2ConfigurationException("ConnessioneNonDisponibile");
	}
	/**
	 * Restituisce un JMSObject al pool
	 *
	 * @param idPDD Identificatore della porta di dominio.
	 * @param modulo Modulo che richiede una connessione.
	 * @param resource JMSObject.
	 * 
	 */
	public void releaseResource(IDSoggetto idPDD,String modulo,Resource resource) throws OpenSPCoop2ConfigurationException{
		try {
			if(resource!=null){
				if(resource.getResource()!=null){
					JMSObject jms = (JMSObject) resource.getResource();
					// Controllo dell'oggetto ricevuto
					if(jms == null){
						throw new OpenSPCoop2ConfigurationException("PassivateObject[JMSObjectNull]");
					}
					if(jms.getConnection()==null){
						throw new OpenSPCoop2ConfigurationException("PassivateObject[ConnectionNull]");
					}
					if(jms.getSession()==null){
						throw new OpenSPCoop2ConfigurationException("PassivateObject[SessionNull]");
					}

					// rilascio
					try{
						jms.getSession().close();
					}catch(Exception e){
						jms.getConnection().close();
						throw e;
					}
					jms.getConnection().close();
				}
				if(QueueManager.risorseInGestione.containsKey(resource.getId()))
					QueueManager.risorseInGestione.remove(resource.getId());
			}
		}
		catch(Exception e) {
			this.msgDiag.aggiornaFiltri();
			this.msgDiag.setDominio(idPDD);
			this.msgDiag.setFunzione("QueueManager."+modulo);
			this.msgDiag.logFatalError(e, "Rilasciata connessione al QueueManager");
			throw new OpenSPCoop2ConfigurationException("releaseJMSObject: "+e.getMessage());
		}
	}
	
	




	/**
	 * Metodo che verica la connessione ad una risorsa.
	 * Se la connessione non e' presente, viene lanciata una eccezione che contiene il motivo della mancata connessione
	 * 
	 * @throws DriverException eccezione che contiene il motivo della mancata connessione
	 */
	@Override
	public void isAlive() throws CoreException{
		Resource resource = null;
		IDSoggetto idSoggettAlive = new IDSoggetto();
		idSoggettAlive.setCodicePorta("QueueManager");
		idSoggettAlive.setTipo("QueueManager");
		idSoggettAlive.setNome("QueueManager");
		try{
			resource = this.getResource(idSoggettAlive, "CheckIsAlive", null);
			if(resource == null)
				throw new Exception("Resource is null");
			if(resource.getResource() == null)
				throw new Exception("JMSObject is null");
			JMSObject jmsObject = (JMSObject) resource.getResource();
			if(jmsObject.getConnection()==null)
				throw new Exception("Connessione is null");
			if(jmsObject.getSession()==null)
				throw new Exception("Sessione is null");
			Connection connectionJMS = jmsObject.getConnection();
			// test
			connectionJMS.getClientID();
		}catch(Exception e){
			throw new CoreException("Connessione al broker JMS non disponibile: "+e.getMessage(),e);
		}finally{
			try{
				this.releaseResource(idSoggettAlive, "CheckIsAlive" ,resource);
			}catch(Exception e){
				// close
			}
		}
	}
}