NodeReceiverDB.java

/*
 * GovWay - A customizable API Gateway 
 * https://govway.org
 * 
 * Copyright (c) 2005-2024 Link.it srl (https://link.it). 
 * 
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU General Public License version 3, as published by
 * the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
 *
 */

package org.openspcoop2.pdd.core.node;

import java.sql.Connection;

import org.openspcoop2.core.id.IDSoggetto;
import org.openspcoop2.pdd.config.DBManager;
import org.openspcoop2.pdd.config.OpenSPCoop2Properties;
import org.openspcoop2.pdd.config.Resource;
import org.openspcoop2.pdd.core.AbstractCore;
import org.openspcoop2.pdd.core.GestoreMessaggi;
import org.openspcoop2.pdd.core.PdDContext;
import org.openspcoop2.pdd.core.state.OpenSPCoopStateful;
import org.openspcoop2.pdd.logger.MsgDiagnostico;
import org.openspcoop2.pdd.services.core.RicezioneBuste;
import org.openspcoop2.pdd.services.core.RicezioneBusteMessage;
import org.openspcoop2.pdd.services.core.RicezioneContenutiApplicativi;
import org.openspcoop2.pdd.services.core.RicezioneContenutiApplicativiMessage;
import org.openspcoop2.protocol.engine.constants.Costanti;
import org.openspcoop2.protocol.engine.driver.RepositoryBuste;
import org.openspcoop2.protocol.sdk.state.StateMessage;
import org.openspcoop2.protocol.sdk.state.StatefulMessage;
import org.openspcoop2.utils.Utilities;

/**
 * Classe utilizzata per la ricezione di messaggi contenuti nell'architettura di OpenSPCoop (versione DB).
 *
 * @author Poli Andrea (apoli@link.it)
 * @author $Author$
 * @version $Rev$, $Date$
 */

public class NodeReceiverDB extends AbstractCore implements INodeReceiver{


	private static OpenSPCoop2Properties openspcoopProperties = 
		OpenSPCoop2Properties.getInstance();

	
	
	/**
	 * Ricezione di un messaggio  
	 *
	 * @param codicePorta Codice Porta per cui effettuare la receive
	 * @param idModulo Nodo destinatario per cui effettuare la ricezione. 
	 * @param timeout Timeout sulla ricezione
	 * @param checkInterval Intervallo di check sulla coda
	 * @return true se la ricezione JMS e' andata a buon fine, false altrimenti.
	 * 
	 */
	@Override
	public Object receive(MsgDiagnostico msgDiag, IDSoggetto codicePorta, String idModulo, String idMessaggio, 
			long timeout, long checkInterval) throws NodeException,NodeTimeoutException{

		/* ------------  Lettura parametri del messaggio ricevuto e ValidityCheck -------------- */
		Object objReturn = null;

		// connessione al database
		DBManager dbManager = DBManager.getInstance();
		Resource resource = null;
		Connection connectionDB = null;
						
		// GestoreMessaggi
		String tipoMessaggio = null;
		if(idModulo.startsWith(RicezioneContenutiApplicativi.ID_MODULO)){
			tipoMessaggio = Costanti.INBOX;
		}
		else if(idModulo.startsWith(RicezioneBuste.ID_MODULO)){
			tipoMessaggio = Costanti.OUTBOX;
		}
		
		OpenSPCoopStateful openspcoopstate = new OpenSPCoopStateful();
		openspcoopstate.setStatoRichiesta(new StatefulMessage(null, null));
		openspcoopstate.setStatoRisposta(new StatefulMessage(null, null));
		
		GestoreMessaggi gestoreMessaggioRichiesta = new GestoreMessaggi(openspcoopstate, false, idMessaggio,tipoMessaggio,msgDiag,null);

		try{
			long attesa = 0;
			String idRisposta = null;
			GestoreMessaggi gestoreMessaggioRisposta = null;
			boolean messaggioPresente = false;
			
			int refreshOnlyCacheCount = 0;
			
			while(attesa<timeout){

				refreshOnlyCacheCount++;
				msgDiag.highDebug("Analisi richiesta con ID ["+idMessaggio+"] tipo ["+tipoMessaggio+"] contatoreRif["+refreshOnlyCacheCount+"]");
				
				int nodeDBRefresh = NodeReceiverDB.openspcoopProperties.getNodeReceiverCheckDBInterval();
				int nodeRiferimentoMsgRefresh = NodeReceiverDB.openspcoopProperties.getNodeReceiverCheckDBInterval();
				try{
					if(nodeRiferimentoMsgRefresh>2)
						nodeRiferimentoMsgRefresh = nodeDBRefresh / 2;
				}catch(Exception e){
					msgDiag.highDebug("CheckDBInterval (proprieta' 'org.openspcoop.pdd.nodeReceiver.checkDB') non corretto: "+e.getMessage());
				}
				
				msgDiag.highDebug("Proprieta' nodeDBRefresh["+nodeDBRefresh+"] nodeRiferimentoMsgRefresh["+nodeRiferimentoMsgRefresh+"]");
				
				// Il riferimento messaggio e il proprietario puo' essere letto dalla cache.
				// Per migliorare le performance, se non e' presente nella cache si assume anche non presente nel database.
				// - Per rifMsg se si chiama il metodo mapRiferimentoIntoIDBusta(true) l'id viene cercato solo nella cache
				// - Per proprietario se si chiama il metodo getProprietario(true) viene cercato solo nella cache
				// La cache puo' cmq svuotarsi velocemente se la dimensione e' troppo piccola rispetto al numero di msg in parallelo gestiti.
				// Quindi ogni nodeDBRefresh volte (se checkInterval=500 ogni nodeDBRefresh*500 millisecondi) viene controllato anche il database, oltre alla cache.
				boolean checkOnlyCache = NodeReceiverDB.openspcoopProperties.isAbilitataCacheGestoreMessaggi();
				if(refreshOnlyCacheCount==(nodeDBRefresh+1)){
					msgDiag.highDebug("Re-inizializzo contatore refreshOnlyCacheCount");
					refreshOnlyCacheCount = 1;
					checkOnlyCache = false;
				}
				
				// Il riferimentoMessaggio puo' cambiare l'associazione nel tempo.
				// Es. rispostaSincrona che e' un MessaggioErroreProtocollo.... prima viene salvato l'errore, e poi viene salvato il msg ritornato al servizio applicativo.
				// Entrambi, in ordine avranno come key il rifMessaggio.
				// Ogni (nodeDBRefresh/2) volte (se checkInterval=500 ogni (nodeDBRefresh/2)*500 millisecondi) viene fatto il refresh del messaggio
				if( (refreshOnlyCacheCount%nodeRiferimentoMsgRefresh)==0 ){
					msgDiag.highDebug("Re-inizializzo idRisposta e gestoreMessaggioRisposta");
					idRisposta = null;
					gestoreMessaggioRisposta = null;
				}
				
				// Prendo la connessione solo se mi serve per la ricerca su database
				boolean needConnection = false;
				if(NodeReceiverDB.openspcoopProperties.singleConnection_TransactionManager()==false)
					needConnection = true;
				else if(resource==null)
					needConnection = true;
				if( (checkOnlyCache==false) && needConnection ){
					msgDiag.highDebug("Prendo Connessione per NodeReceiver");
					try{
						resource = dbManager.getResource(codicePorta,idModulo, PdDContext.getValue(org.openspcoop2.core.constants.Costanti.ID_TRANSAZIONE, this.getPddContext()));
					}catch(Exception e){
						throw new NodeException("Impossibile ottenere una Risorsa dal DBManager",e);
					}
					if(resource==null)
						throw new NodeException("Risorsa is null");
					if(resource.getResource() == null)	
						throw new NodeException("Connessione is null");
					connectionDB = (Connection) resource.getResource();
					((StateMessage)openspcoopstate.getStatoRichiesta()).setConnectionDB(connectionDB);
					((StateMessage)openspcoopstate.getStatoRisposta()).setConnectionDB(connectionDB);
				}
				
				// Check esistenza messaggio
				if(idRisposta==null){
					msgDiag.highDebug("Analisi richiesta con ID ["+idMessaggio+"] tipo ["+tipoMessaggio+"]: lettura ID Risposta");
					idRisposta = gestoreMessaggioRichiesta.mapRiferimentoIntoIDBusta(checkOnlyCache); //only cache
				}
				if(idRisposta!=null){
					if(gestoreMessaggioRisposta==null){
						msgDiag.highDebug("Analisi richiesta con ID ["+idMessaggio+"] tipo ["+tipoMessaggio+"]: Costruisco GestoreRisposta per ID["+idRisposta+"]");
						gestoreMessaggioRisposta = new GestoreMessaggi(openspcoopstate, false, idRisposta,tipoMessaggio,msgDiag,null);
						msgDiag.highDebug("Analisi risposta con ID ["+idRisposta+"] tipo ["+tipoMessaggio+"]: existsMessage("+checkOnlyCache+")");
						if(checkOnlyCache)
							messaggioPresente = gestoreMessaggioRisposta.existsMessage_onlyCache();
						else
							messaggioPresente = gestoreMessaggioRisposta.existsMessage();
					}else{
						if(messaggioPresente==false){
							msgDiag.highDebug("Analisi risposta con ID ["+idRisposta+"] tipo ["+tipoMessaggio+"]: existsMessage("+checkOnlyCache+")");
							if(checkOnlyCache)
								messaggioPresente = gestoreMessaggioRisposta.existsMessage_onlyCache();
							else
								messaggioPresente = gestoreMessaggioRisposta.existsMessage();
						}
					}
				}
				
				msgDiag.highDebug("Analisi risposta con ID ["+idRisposta+"] tipo ["+tipoMessaggio+"]: existsMessage="+messaggioPresente);
				
				
				if(messaggioPresente){

					// read Proprietario
					/*String proprietario = null;
					if( idModulo.startsWith(RicezioneContenutiApplicativi.ID_MODULO)) {
						proprietario = gestoreMessaggioRisposta.getProprietario_SerializableRead(NodeReceiverDB.openspcoopProperties.getGestioneSerializableDB_AttesaAttiva(),NodeReceiverDB.openspcoopProperties.getGestioneSerializableDB_CheckInterval());
					} else {
						proprietario = gestoreMessaggioRisposta.getProprietario();
					}*/
					msgDiag.highDebug("getProprietario("+checkOnlyCache+")");
					String proprietario = gestoreMessaggioRisposta.getProprietario(idModulo,checkOnlyCache);
					msgDiag.highDebug("getProprietario("+checkOnlyCache+") proprietario="+proprietario);
					
					// check proprietario
					if( idModulo.startsWith(RicezioneContenutiApplicativi.ID_MODULO)) {
						messaggioPresente = idModulo.equals(proprietario);
					}else if( idModulo.startsWith(RicezioneBuste.ID_MODULO)) {
						messaggioPresente = idModulo.equals(proprietario);
					}
					msgDiag.highDebug("Analisi risposta con ID ["+idRisposta+"] tipo ["+tipoMessaggio+"] proprietario["+proprietario+" existsMessage="+messaggioPresente);
					
				}
				
				// Gestione messaggio
				if(messaggioPresente==false){
					
					//	rilascio e riprendo la connessione ogni checkInterval fino ad un timeout o alla ricezione di un oggetto
					if( (NodeReceiverDB.openspcoopProperties.singleConnection_NodeReceiver()==false) && (checkOnlyCache==false) ){
						msgDiag.highDebug("Rilascio connessione per NodeReceiver");
						dbManager.releaseResource(codicePorta, idModulo, resource);
					}
					
					msgDiag.highDebug("Sleep...");
					Utilities.sleep(checkInterval);
					attesa = attesa + checkInterval;

				}else{
					
					// Prendo informazioni da RepositoryBuste, se non era settata la connesione, la setto
					if( needConnection && checkOnlyCache){
						msgDiag.highDebug("Prendo Connessione per NodeReceiver");
						try{
							resource = dbManager.getResource(codicePorta,idModulo, PdDContext.getValue(org.openspcoop2.core.constants.Costanti.ID_TRANSAZIONE, this.getPddContext()));
						}catch(Exception e){
							throw new NodeException("Impossibile ottenere una Risorsa dal DBManager",e);
						}
						if(resource==null)
							throw new NodeException("Risorsa is null");
						if(resource.getResource() == null)
							throw new NodeException("Connessione is null");
						connectionDB = (Connection) resource.getResource();
					}
					
					StatefulMessage state = new StatefulMessage(connectionDB, null);
					if(idModulo.startsWith(RicezioneContenutiApplicativi.ID_MODULO)){
						msgDiag.highDebug("Lettura risposta per RicezioneContenutiApplicativi...");
						objReturn = new RicezioneContenutiApplicativiMessage();
						((RicezioneContenutiApplicativiMessage)objReturn).setIdBustaRisposta(idRisposta);
						RepositoryBuste repositoryBuste = new RepositoryBuste(state, false,null);
						((RicezioneContenutiApplicativiMessage)objReturn).setIdCollaborazione(repositoryBuste.getCollaborazioneFromInBox(idRisposta));
						((RicezioneContenutiApplicativiMessage)objReturn).setProfiloCollaborazione(repositoryBuste.getProfiloCollaborazioneFromInBox(idRisposta),
								repositoryBuste.getProfiloCollaborazioneValueFromInBox(idRisposta));
						try{
							((RicezioneContenutiApplicativiMessage)objReturn).setPddContext(gestoreMessaggioRisposta.getPdDContext());
						}catch(Exception e){
							// ignore
						}
						msgDiag.highDebug("Lettura risposta per RicezioneContenutiApplicativi effettuata");
					}
					else if(idModulo.startsWith(RicezioneBuste.ID_MODULO)){
						msgDiag.highDebug("Lettura risposta per RicezioneBuste...");
						objReturn = new RicezioneBusteMessage();
						RepositoryBuste repositoryBuste = new RepositoryBuste(state, false,null);
						if(repositoryBuste.isRegistrataIntoOutBox(idRisposta)){
							((RicezioneBusteMessage)objReturn).setBustaRisposta(repositoryBuste.getBustaFromOutBox(idRisposta));
						}else{
							((RicezioneBusteMessage)objReturn).setIdMessaggioSblocco(idRisposta);
						}
						try{
							((RicezioneBusteMessage)objReturn).setPddContext(gestoreMessaggioRisposta.getPdDContext());
						}catch(Exception e){
							// ignore
						}
						msgDiag.highDebug("Lettura risposta per RicezioneBuste effettuata");
					}
					
					// rilascio e riprendo la connessione ogni checkInterval fino ad un timeout o alla ricezione di un oggetto
					if( (NodeReceiverDB.openspcoopProperties.singleConnection_NodeReceiver()==false) && (checkOnlyCache==false) ){
						msgDiag.highDebug("Rilascio connessione per NodeReceiver");
						dbManager.releaseResource(codicePorta, idModulo, resource);
					}
					
					msgDiag.highDebug("Fine Lettura");
					break;
				}

			}

		} catch (Exception e) {
			throw new NodeException("Riscontrato errore nella ricezione del messaggio di risposta per la gestione della richiesta:"
					+e.getMessage(),e);
		} finally{
			msgDiag.highDebug("Rilascio connessione per NodeReceiver");
			dbManager.releaseResource(codicePorta, idModulo, resource);
		}
		
		if(objReturn == null){
			throw new NodeTimeoutException("Riscontrato errore durante ricezione del messaggio: Messaggio non ricevuto");
		}
		
		return objReturn;
	}

}