TransactionManager.java
/*
* GovWay - A customizable API Gateway
* https://govway.org
*
* Copyright (c) 2005-2024 Link.it srl (https://link.it).
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License version 3, as published by
* the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
*/
package org.openspcoop2.pdd.core.node;
import java.sql.Connection;
import org.openspcoop2.core.id.IDSoggetto;
import org.openspcoop2.pdd.config.DBManager;
import org.openspcoop2.pdd.config.OpenSPCoop2Properties;
import org.openspcoop2.pdd.config.Resource;
import org.openspcoop2.pdd.core.GestoreMessaggi;
import org.openspcoop2.pdd.core.PdDContext;
import org.openspcoop2.pdd.core.state.OpenSPCoopStateful;
import org.openspcoop2.pdd.logger.MsgDiagnostico;
import org.openspcoop2.pdd.logger.OpenSPCoop2Logger;
import org.openspcoop2.pdd.mdb.ConsegnaContenutiApplicativi;
import org.openspcoop2.pdd.mdb.Imbustamento;
import org.openspcoop2.pdd.mdb.ImbustamentoRisposte;
import org.openspcoop2.pdd.mdb.InoltroBuste;
import org.openspcoop2.pdd.mdb.InoltroRisposte;
import org.openspcoop2.pdd.mdb.Sbustamento;
import org.openspcoop2.pdd.mdb.SbustamentoRisposte;
import org.openspcoop2.pdd.services.core.RicezioneBuste;
import org.openspcoop2.pdd.services.core.RicezioneContenutiApplicativi;
import org.openspcoop2.protocol.sdk.state.StatefulMessage;
import org.openspcoop2.utils.Utilities;
import org.openspcoop2.utils.date.DateManager;
/**
* Classe che implementa la logica transazionale di OpenSPCoop
*
* @author Poli Andrea (apoli@link.it)
* @author $Author$
* @version $Rev$, $Date$
*/
public class TransactionManager {
/** Logger utilizzato per debug. */
private static org.slf4j.Logger log = OpenSPCoop2Logger.getLoggerOpenSPCoopCore();
/** Variabile che indica il Nome del modulo dell'architettura di OpenSPCoop rappresentato da questa classe */
public static final String ID_MODULO = "TransactionManager";
/**
* Quando un modulo di OpenSPCoop chiama questo metodo, si aspetta come risposta una indicazione
* se utilizzare o meno il messaggio ricevuto.
*
* @param idModulo Identificativo del Modulo OpenSPCoop
* @param idBusta Identificativo del Messaggio Ricevuto
* @param tipo Tipo di Messaggio da gestire (INBOX/OUTBOX)
* @param idJMS ID JMS del Messaggio ricevuto
* @return Il metodo ritorna true se il modulo e' autorizzato a processare il messaggio, false altrimenti.
*
*/
public static boolean validityCheck(MsgDiagnostico msgDiag,String idModulo,String idBusta,String tipo,
String idJMS, PdDContext pddContext) throws Exception{
return TransactionManager.validityCheck(msgDiag,idModulo,idBusta,tipo,idJMS,pddContext,null);
}
/**
* Quando un modulo di OpenSPCoop chiama questo metodo, si aspetta come risposta una indicazione
* se utilizzare o meno il messaggio ricevuto.
*
* @param idModulo Identificativo del Modulo OpenSPCoop
* @param idBusta Identificativo del Messaggio Ricevuto
* @param tipo Tipo di Messaggio da gestire (INBOX/OUTBOX)
* @param idJMS ID JMS del Messaggio ricevuto
* @param servizioApplicativo Servizio Applicativo utilizzato come chiave insieme all'id
* @return Il metodo ritorna true se il modulo e' autorizzato a processare il messaggio, false altrimenti.
*
*/
public static boolean validityCheck(MsgDiagnostico msgDiag,String idModulo,String idBusta,String tipo,
String idJMS, PdDContext pddContext,String servizioApplicativo) throws Exception{
// Risorse
OpenSPCoop2Properties properties = OpenSPCoop2Properties.getInstance();
IDSoggetto dominio = properties.getIdentitaPortaDefaultWithoutProtocol();
String idModuloTransaction = TransactionManager.ID_MODULO + "_" + idModulo;
DBManager dbManager = DBManager.getInstance();
Resource resource = null;
OpenSPCoopStateful openspcoopstate = new OpenSPCoopStateful();
StatefulMessage state = new StatefulMessage(null, TransactionManager.log);
openspcoopstate.setStatoRichiesta(state);
GestoreMessaggi msg = new GestoreMessaggi(openspcoopstate, true,idBusta,tipo,msgDiag,null);
long scadenzaWhile = DateManager.getTimeMillis() + properties.getTransactionManager_AttesaAttiva();
try{
// Algoritmo
int refreshOnlyCacheCount = 0;
while( DateManager.getTimeMillis() < scadenzaWhile ){
refreshOnlyCacheCount++;
// Il riferimento messaggio e il proprietario puo' essere letto dalla cache.
// Per migliorare le performance, se non e' presente nella cache si assume anche non presente nel database.
// - Per rifMsg se si chiama il metodo mapRiferimentoIntoIDBusta(true) l'id viene cercato solo nella cache
// - Per proprietario se si chiama il metodo getProprietario(true) viene cercato solo nella cache
// La cache puo' cmq svuotarsi velocemente se la dimensione e' troppo piccola rispetto al numero di msg in parallelo gestiti.
// Quindi ogni 20 volte (se checkInterval=500 ogni 10 secondi) viene controllato anche il database, oltre alla cache.
boolean checkOnlyCache = properties.isAbilitataCacheGestoreMessaggi();
if(refreshOnlyCacheCount==(properties.getTransactionManager_CheckDBInterval())){
msgDiag.highDebug("Re-inizializzo contatore refreshOnlyCacheCount");
refreshOnlyCacheCount = 1;
checkOnlyCache = false;
}
msgDiag.highDebug("Transaction su IDM["+idModulo+"] IDBusta["+idBusta+"] Tipo["+
tipo+"] IDJMS["+idJMS+"] SA["+servizioApplicativo+"] ancora interval["
+properties.getTransactionManager_CheckInterval()+"] millisecondi "+(scadenzaWhile-DateManager.getTimeMillis()));
boolean needConnection = false;
if(properties.singleConnection_TransactionManager()==false)
needConnection = true;
else if(resource==null)
needConnection = true;
if( (checkOnlyCache==false) && needConnection ){
msgDiag.highDebug("Prendo Connessione per TransactionManager");
try{
resource = dbManager.getResource(dominio,idModuloTransaction,PdDContext.getValue(org.openspcoop2.core.constants.Costanti.ID_TRANSAZIONE, pddContext));
}catch(Exception e){
throw new NodeException("Impossibile ottenere una Risorsa dal DBManager",e);
}
if(resource==null)
throw new NodeException("Risorsa is null");
if(resource.getResource() == null)
throw new NodeException("Connessione is null");
Connection connectionDB = (Connection) resource.getResource();
state.setConnectionDB(connectionDB);
}
msgDiag.highDebug("getProprietario("+checkOnlyCache+")");
String proprietarioMessaggio = msg.getProprietario(idModulo,checkOnlyCache);
msgDiag.highDebug("getProprietario("+checkOnlyCache+") proprietario="+proprietarioMessaggio);
/*
if( (idModulo.startsWith(RicezioneContenutiApplicativi.ID_MODULO)) ||
(ConsegnaContenutiApplicativi.ID_MODULO.equals(idModulo)) ){
//log.info("getProprietarioSerializable");
proprietarioMessaggio = msg.getProprietario_SerializableRead(properties.getGestioneSerializableDB_AttesaAttiva(),
properties.getGestioneSerializableDB_CheckInterval());
} else {
//log.info("getProprietario");
proprietarioMessaggio = msg.getProprietario();
}
*/
if(proprietarioMessaggio==null){
// N.B.: anche se il thread eliminazione messaggi elimina un messaggio, poi comunque dopo 500 tentativi il
// messaggio verra' eliminato!
msgDiag.highDebug("Messaggio per il modulo ["+idModulo+"]: Attesa attiva, modulo precendente [Punto di inizio]");
// Attesa attiva del modulo:
if((properties.singleConnection_TransactionManager()==false) && (checkOnlyCache==false)){
msgDiag.highDebug("Rilascio connessione per TransactionManager");
dbManager.releaseResource(dominio,idModuloTransaction,resource);
}
//try {
//Utilities.sleep((new java.util.Random()).nextInt(properties.getTransactionManager_CheckInterval())); // random da 0ms a TransactionManagerCheckInterval ms
//}catch(Exception eRandom){}
Utilities.sleep(properties.getTransactionManager_CheckInterval());
}
else if(proprietarioMessaggio.equals(idModulo)){
msgDiag.highDebug("Messaggio per il modulo ["+idModulo+"]: proprietario["+proprietarioMessaggio+"] OK");
if( (properties.singleConnection_TransactionManager() && (resource!=null) ) || (checkOnlyCache==false) ){
msgDiag.highDebug("Rilascio connessione per TransactionManager");
dbManager.releaseResource(dominio,idModuloTransaction,resource);
}
return true; // messaggio per il modulo
}
else if( TransactionManager.isModuloPrecedente(idModulo,proprietarioMessaggio,false) == false ){
msgDiag.highDebug("Messaggio per il modulo ["+idModulo+"]: proprietario["+proprietarioMessaggio+"], scarta");
if( (properties.singleConnection_TransactionManager() && (resource!=null) ) || (checkOnlyCache==false) ){
msgDiag.highDebug("Rilascio connessione per TransactionManager");
dbManager.releaseResource(dominio,idModuloTransaction,resource);
}
return false; // scarta messaggio poiche' il messaggio e' gia stato consegnato al modulo successivo
}
else{
// Se avevo onlyCache, non possedevo la connessione
if(needConnection && checkOnlyCache){
msgDiag.highDebug("Prendo Connessione per TransactionManager NeedForJMS");
try{
resource = dbManager.getResource(dominio,idModuloTransaction,PdDContext.getValue(org.openspcoop2.core.constants.Costanti.ID_TRANSAZIONE, pddContext));
}catch(Exception e){
throw new NodeException("Impossibile ottenere una Risorsa dal DBManager",e);
}
if(resource==null)
throw new NodeException("Risorsa is null");
if(resource.getResource() == null)
throw new NodeException("Connessione is null");
Connection connectionDB = (Connection) resource.getResource();
state.setConnectionDB(connectionDB);
}
// Aggiornamento primo msg ricevuto
String idJMSRicevutoPrecedentemente = null;
if( ConsegnaContenutiApplicativi.ID_MODULO.equals(idModulo) ){
idJMSRicevutoPrecedentemente = msg.getIDJMSRicevuto(idModulo,servizioApplicativo);
}else{
idJMSRicevutoPrecedentemente =msg.getIDJMSRicevuto(idModulo);
}
if(idJMSRicevutoPrecedentemente == null){
// Se null, e' la prima copia ricevuta, la memorizzo
if( ConsegnaContenutiApplicativi.ID_MODULO.equals(idModulo) ){
msg.aggiornaIDHeaderJMS(idModulo,idJMS,servizioApplicativo);
}else{
msg.aggiornaIDHeaderJMS(idModulo,idJMS);
}
}else{
// se non e' null guardo se e' una copia per me
if(idJMSRicevutoPrecedentemente.equals(idJMS) == false){
msgDiag.highDebug("Messaggio per il modulo ["+idModulo+"]: un altro messaggio JMS e' gia stato ricevuto, scarta");
msgDiag.highDebug("Rilascio connessione per TransactionManager");
dbManager.releaseResource(dominio,idModuloTransaction,resource);
return false; // scarta messaggio
}
}
msgDiag.highDebug("Messaggio per il modulo ["+idModulo+"]: Attesa attiva, modulo precedente["+proprietarioMessaggio+"]");
// Attesa attiva del modulo:
if( properties.singleConnection_TransactionManager()==false ){
msgDiag.highDebug("Rilascio connessione per TransactionManager");
dbManager.releaseResource(dominio,idModuloTransaction,resource);
}
Utilities.sleep(properties.getTransactionManager_CheckInterval());
}
}
if( properties.singleConnection_TransactionManager() ){
msgDiag.highDebug("Rilascio connessione per TransactionManager");
dbManager.releaseResource(dominio,idModuloTransaction,resource);
}
String msgTerminato = "TransactionManager: Attesa attiva terminata, probabilmente il messaggio e' stato gia gestito ed eliminato IDModulo["+idModulo+"] IDBusta["+idBusta+"] Tipo["+tipo+"]";
msgDiag.highDebug(msgTerminato);
TransactionManager.log.warn(msgTerminato);
return false;
}catch(Exception e){
TransactionManager.log.error("TransactionManager exception: "+e.getMessage(),e);
if(resource != null)
dbManager.releaseResource(dominio,idModuloTransaction,resource);
throw e;
}
}
/**
* Restituisce true se il modulo <var>proprietarioMessaggio</var> e' un modulo precedente, nel flusso dei messaggi
* all'interno dell'architettura di OpenSPCoop del modulo <var>idModulo</var>
*
* @param idModulo Identificatore del nodo che effettua il test.
* @param proprietarioMessaggio 'Potenziale' modulo precedente
*
*/
public static boolean isModuloPrecedente(String idModulo,String proprietarioMessaggio,boolean checkProprietarioNull) throws Exception{
if(idModulo.startsWith(RicezioneContenutiApplicativi.ID_MODULO)){
if(Imbustamento.ID_MODULO.equals(proprietarioMessaggio))
return true;
else if(InoltroBuste.ID_MODULO.equals(proprietarioMessaggio))
return true;
else if(SbustamentoRisposte.ID_MODULO.equals(proprietarioMessaggio))
return true;
else if(Sbustamento.ID_MODULO.equals(proprietarioMessaggio))
return true; // ricevute asincrone!
else
return false;
}
else if(idModulo.startsWith(RicezioneBuste.ID_MODULO)){
if(Sbustamento.ID_MODULO.equals(proprietarioMessaggio))
return true;
else if(ConsegnaContenutiApplicativi.ID_MODULO.equals(proprietarioMessaggio))
return true;
else if(ImbustamentoRisposte.ID_MODULO.equals(proprietarioMessaggio))
return true;
else
return false;
}
else if(Imbustamento.ID_MODULO.equals(idModulo)){
if(checkProprietarioNull){
if(proprietarioMessaggio == null)
return true; // punto di inizio
}
if(proprietarioMessaggio.startsWith(RicezioneContenutiApplicativi.ID_MODULO))
return true;
else
return false;
}
else if(ImbustamentoRisposte.ID_MODULO.equals(idModulo)){
if(ConsegnaContenutiApplicativi.ID_MODULO.equals(proprietarioMessaggio))
return true;
else
return false;
}
else if(Sbustamento.ID_MODULO.equals(idModulo)){
if(checkProprietarioNull){
if(proprietarioMessaggio == null)
return true; // punto di inizio
}
if(proprietarioMessaggio.startsWith(RicezioneBuste.ID_MODULO))
return true;
else
return false;
}
else if(SbustamentoRisposte.ID_MODULO.equals(idModulo)){
if(InoltroBuste.ID_MODULO.equals(proprietarioMessaggio))
return true;
else
return false;
}
else if(InoltroBuste.ID_MODULO.equals(idModulo)){
if(Imbustamento.ID_MODULO.equals(proprietarioMessaggio))
return true;
else
return false;
}
else if(InoltroRisposte.ID_MODULO.equals(idModulo)){
// E' un punto di uscita senza risposta
// qualsiasi nodo e' un potenziale precedente
return true;
}
else if(ConsegnaContenutiApplicativi.ID_MODULO.equals(idModulo)){
if(Sbustamento.ID_MODULO.equals(proprietarioMessaggio))
return true;
else if(SbustamentoRisposte.ID_MODULO.equals(proprietarioMessaggio))
return true;
else if(Imbustamento.ID_MODULO.equals(proprietarioMessaggio))
return true;
else if(InoltroBuste.ID_MODULO.equals(proprietarioMessaggio))
return true;
else
return false;
}else{
return false;
}
}
}