TimerFileSystemRecoveryThread.java
/*
* GovWay - A customizable API Gateway
* https://govway.org
*
* Copyright (c) 2005-2025 Link.it srl (https://link.it).
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License version 3, as published by
* the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
*/
package org.openspcoop2.pdd.timers;
import java.sql.Connection;
import java.util.ArrayList;
import java.util.List;
import org.openspcoop2.core.commons.CoreException;
import org.openspcoop2.core.commons.dao.DAOFactory;
import org.openspcoop2.core.commons.dao.DAOFactoryProperties;
import org.openspcoop2.core.config.OpenspcoopAppender;
import org.openspcoop2.core.config.Property;
import org.openspcoop2.core.config.utils.OpenSPCoopAppenderUtilities;
import org.openspcoop2.generic_project.utils.ServiceManagerProperties;
import org.openspcoop2.monitor.engine.fs_recovery.FSRecoveryConfig;
import org.openspcoop2.monitor.engine.fs_recovery.FSRecoveryLibrary;
import org.openspcoop2.monitor.engine.fs_recovery.FSRecoveryObjectType;
import org.openspcoop2.pdd.config.DBTransazioniManager;
import org.openspcoop2.pdd.config.OpenSPCoop2Properties;
import org.openspcoop2.pdd.config.Resource;
import org.openspcoop2.pdd.core.CostantiPdD;
import org.openspcoop2.pdd.core.GestoreMessaggi;
import org.openspcoop2.pdd.logger.MsgDiagnosticiProperties;
import org.openspcoop2.pdd.logger.MsgDiagnostico;
import org.openspcoop2.protocol.sdk.diagnostica.IDiagnosticProducer;
import org.openspcoop2.protocol.sdk.dump.IDumpProducer;
import org.openspcoop2.protocol.sdk.tracciamento.ITracciaProducer;
import org.openspcoop2.utils.TipiDatabase;
import org.openspcoop2.utils.Utilities;
import org.openspcoop2.utils.date.DateManager;
import org.openspcoop2.utils.id.serial.InfoStatistics;
import org.openspcoop2.utils.semaphore.Semaphore;
import org.openspcoop2.utils.semaphore.SemaphoreConfiguration;
import org.openspcoop2.utils.semaphore.SemaphoreMapping;
import org.openspcoop2.utils.threads.BaseThread;
import org.slf4j.Logger;
/**
* TimerFileSystemRecoveryThread
*
* @author Poli Andrea (poli@link.it)
* @author $Author$
* @version $Rev$, $Date$
*/
public class TimerFileSystemRecoveryThread extends BaseThread{
private static TimerState STATE = TimerState.OFF; // abilitato in OpenSPCoop2Startup al momento dell'avvio
public static TimerState getSTATE() {
return STATE;
}
public static void setSTATE(TimerState sTATE) {
STATE = sTATE;
}
public static final String ID_MODULO = "TimerFileSystemRecovery";
/** Numero massimo di iterazioni per evitare loop infinito */
private static final int MAX_ITERATIONS = 10000;
/** Logger utilizzato per debug. */
private Logger logCore = null;
private Logger logSql = null;
private Logger logTimer = null;
/** Indicazione se deve essere effettuato il log delle query */
private boolean debug = false;
private boolean recoveryEventi;
private boolean recoveryTransazioni;
private long recoveryEventiProcessingFileAfterMs;
private long recoveryTransazioniProcessingFileAfterMs;
private long recoveryMaxFileLimit;
private OpenSPCoop2Properties properties;
/** Database */
private String tipoDatabaseRuntime = null; //tipoDatabase
private DAOFactory daoFactory = null;
private Logger daoFactoryLogger = null;
private ServiceManagerProperties daoFactoryServiceManagerPropertiesTransazioni = null;
private ServiceManagerProperties daoFactoryServiceManagerPropertiesPluginsEventi = null;
/** OpenSPCoopAppender */
/** Appender personalizzati per i tracciamenti di OpenSPCoop */
private ITracciaProducer loggerTracciamentoOpenSPCoopAppender = null;
/** Appender personalizzati per i messaggi diagnostici di OpenSPCoop2 */
private IDiagnosticProducer loggerMsgDiagnosticoOpenSPCoopAppender = null;
/** Appender personalizzati per i dump di OpenSPCoop2 */
private IDumpProducer loggerDumpOpenSPCoopAppender = null;
private boolean transazioniRegistrazioneDumpHeadersCompactEnabled = false;
private static final String ID_TIMER = "__timerFileSystemRecovery";
private TimerLock timerLock = null;
/** Semaforo */
private Semaphore semaphore = null;
private InfoStatistics semaphore_statistics;
private MsgDiagnostico msgDiag = null;
/** Costruttore
* @throws CoreException */
public TimerFileSystemRecoveryThread(Logger logCore, Logger logSql, Logger logTimer) throws TimerException, CoreException{
this.properties = OpenSPCoop2Properties.getInstance();
this.logCore = logCore;
this.logSql = logSql;
this.logTimer = logTimer;
this.setTimeout(this.properties.getFileSystemRecoveryTimerIntervalSeconds());
try {
this.msgDiag = MsgDiagnostico.newInstance(ID_MODULO);
this.msgDiag.setPrefixMsgPersonalizzati(MsgDiagnosticiProperties.MSG_DIAG_TIMER_FILESYSTEM_RECOVERY);
this.msgDiag.addKeyword(CostantiPdD.KEY_TIMER, ID_MODULO);
} catch (Exception e) {
String msgErrore = "Riscontrato Errore durante l'inizializzazione del MsgDiagnostico";
this.logTimerError(msgErrore,e);
throw new TimerException(msgErrore,e);
}
this.msgDiag.logPersonalizzato("avvioInCorso");
this.logTimer.info(this.msgDiag.getMessaggio_replaceKeywords("avvioInCorso"));
this.debug = this.properties.isFileSystemRecoveryDebug();
this.recoveryEventi = this.properties.isFileSystemRecoveryTimerEventEnabled();
this.recoveryTransazioni = this.properties.isFileSystemRecoveryTimerTransactionEnabled();
this.recoveryEventiProcessingFileAfterMs = this.properties.getFileSystemRecoveryEventsProcessingFileAfterMs();
this.recoveryTransazioniProcessingFileAfterMs = this.properties.getFileSystemRecoveryTransactionProcessingFileAfterMs();
this.recoveryMaxFileLimit = this.properties.getFileSystemRecoveryMaxFileLimit();
DAOFactoryProperties daoFactoryProperties = null;
try{
this.tipoDatabaseRuntime = this.properties.getDatabaseType();
if(this.tipoDatabaseRuntime==null){
throw new TimerException("Tipo Database non definito");
}
// DAOFactory
this.daoFactoryLogger = this.logSql;
this.daoFactory = DAOFactory.getInstance(this.daoFactoryLogger);
daoFactoryProperties = DAOFactoryProperties.getInstance(this.daoFactoryLogger);
if(this.recoveryTransazioni){
this.daoFactoryServiceManagerPropertiesTransazioni = daoFactoryProperties.getServiceManagerProperties(org.openspcoop2.core.transazioni.utils.ProjectInfo.getInstance());
this.daoFactoryServiceManagerPropertiesTransazioni.setShowSql(this.debug);
this.daoFactoryServiceManagerPropertiesTransazioni.setDatabaseType(DBTransazioniManager.getInstance().getTipoDatabase());
}
if(this.recoveryEventi){
this.daoFactoryServiceManagerPropertiesPluginsEventi = daoFactoryProperties.getServiceManagerProperties(org.openspcoop2.core.eventi.utils.ProjectInfo.getInstance());
this.daoFactoryServiceManagerPropertiesPluginsEventi.setShowSql(this.debug);
this.daoFactoryServiceManagerPropertiesPluginsEventi.setDatabaseType(DBTransazioniManager.getInstance().getTipoDatabase());
}
}catch(Exception e){
throw new TimerException("Errore durante l'inizializzazione del datasource: "+e.getMessage(),e);
}
if(this.recoveryTransazioni){
boolean usePdDConnection = true;
try{
// Init
this.loggerTracciamentoOpenSPCoopAppender = new org.openspcoop2.pdd.logger.TracciamentoOpenSPCoopProtocolAppender();
OpenspcoopAppender tracciamentoOpenSPCoopAppender = new OpenspcoopAppender();
tracciamentoOpenSPCoopAppender.setTipo(ID_TIMER);
List<Property> tracciamentoOpenSPCoopAppenderProperties = new ArrayList<>();
// Verra poi utilizzata la connessione ottenuta ogni volta che il timer viene eseguito, infatti si usa usePdDConnection
OpenSPCoopAppenderUtilities.addParameters(this.daoFactoryLogger, tracciamentoOpenSPCoopAppenderProperties,
null, // nessun datasource
null, null, null, null, // nessuna connection
this.tipoDatabaseRuntime,
usePdDConnection, // viene usata la connessione della PdD
this.debug
);
OpenSPCoopAppenderUtilities.addCheckProperties(tracciamentoOpenSPCoopAppenderProperties, false);
tracciamentoOpenSPCoopAppender.setPropertyList(tracciamentoOpenSPCoopAppenderProperties);
this.loggerTracciamentoOpenSPCoopAppender.initializeAppender(tracciamentoOpenSPCoopAppender);
this.loggerTracciamentoOpenSPCoopAppender.isAlive();
}catch(Exception e){
throw new TimerException("Errore durante l'inizializzazione del TracciamentoAppender: "+e.getMessage(),e);
}
try{
// Init
this.loggerMsgDiagnosticoOpenSPCoopAppender = new org.openspcoop2.pdd.logger.MsgDiagnosticoOpenSPCoopProtocolAppender();
OpenspcoopAppender diagnosticoOpenSPCoopAppender = new OpenspcoopAppender();
diagnosticoOpenSPCoopAppender.setTipo(ID_TIMER);
List<Property> diagnosticoOpenSPCoopAppenderProperties = new ArrayList<>();
// Verra poi utilizzata la connessione ottenuta ogni volta che il timer viene eseguito, infatti si usa usePdDConnection
OpenSPCoopAppenderUtilities.addParameters(this.daoFactoryLogger, diagnosticoOpenSPCoopAppenderProperties,
null, // nessun datasource
null, null, null, null, // nessuna connection
this.tipoDatabaseRuntime,
usePdDConnection, // viene usata la connessione della PdD
this.debug
);
OpenSPCoopAppenderUtilities.addCheckProperties(diagnosticoOpenSPCoopAppenderProperties, false);
diagnosticoOpenSPCoopAppender.setPropertyList(diagnosticoOpenSPCoopAppenderProperties);
this.loggerMsgDiagnosticoOpenSPCoopAppender.initializeAppender(diagnosticoOpenSPCoopAppender);
this.loggerMsgDiagnosticoOpenSPCoopAppender.isAlive();
}catch(Exception e){
throw new TimerException("Errore durante l'inizializzazione del DiagnosticoAppender: "+e.getMessage(),e);
}
try{
// Init
this.loggerDumpOpenSPCoopAppender = new org.openspcoop2.pdd.logger.DumpOpenSPCoopProtocolAppender();
OpenspcoopAppender dumpOpenSPCoopAppender = new OpenspcoopAppender();
dumpOpenSPCoopAppender.setTipo(ID_TIMER);
List<Property> dumpOpenSPCoopAppenderProperties = new ArrayList<>();
// Verra poi utilizzata la connessione ottenuta ogni volta che il timer viene eseguito, infatti si usa usePdDConnection
OpenSPCoopAppenderUtilities.addParameters(this.daoFactoryLogger, dumpOpenSPCoopAppenderProperties,
null, // nessun datasource
null, null, null, null, // nessuna connection
this.tipoDatabaseRuntime,
usePdDConnection, // viene usata la connessione della PdD
this.debug
);
OpenSPCoopAppenderUtilities.addCheckProperties(dumpOpenSPCoopAppenderProperties, false);
dumpOpenSPCoopAppender.setPropertyList(dumpOpenSPCoopAppenderProperties);
this.loggerDumpOpenSPCoopAppender.initializeAppender(dumpOpenSPCoopAppender);
this.loggerDumpOpenSPCoopAppender.isAlive();
// Indicazioni sulle modalita' di salvataggio degli header del dump
this.transazioniRegistrazioneDumpHeadersCompactEnabled = this.properties.isTransazioniRegistrazioneDumpHeadersCompactEnabled();
}catch(Exception e){
throw new TimerException("Errore durante l'inizializzazione del DumpAppender: "+e.getMessage(),e);
}
}
if(this.properties.isFileSystemRecoveryLockEnabled()) {
this.timerLock = new TimerLock(TipoLock.GESTIONE_FILESYSTEM_TRACE_RECOVERY);
if(this.properties.isTimerLockByDatabase()) {
this.semaphore_statistics = new InfoStatistics();
SemaphoreConfiguration config = GestoreMessaggi.newSemaphoreConfiguration(this.properties.getFileSystemRecoveryLockMaxLife(),
this.properties.getFileSystemRecoveryLockIdleTime());
TipiDatabase databaseType = TipiDatabase.toEnumConstant(this.properties.getDatabaseType());
try {
this.semaphore = new Semaphore(this.semaphore_statistics, SemaphoreMapping.newInstance(this.timerLock.getIdLock()),
config, databaseType, this.logTimer);
}catch(Exception e) {
throw new TimerException(e.getMessage(),e);
}
}
}
String sec = "secondi";
if(this.getTimeout() == 1)
sec = "secondo";
this.msgDiag.addKeyword(CostantiPdD.KEY_TIMEOUT, this.getTimeout()+" "+sec);
this.msgDiag.logPersonalizzato("avvioEffettuato");
this.logTimer.info(this.msgDiag.getMessaggio_replaceKeywords("avvioEffettuato"));
}
private FSRecoveryConfig conf = null;
@Override
public boolean initialize(){
try{
this.conf = new FSRecoveryConfig(false);
this.conf.setLogCore(this.logCore);
this.conf.setLogSql(this.logSql);
this.conf.setDebug(this.debug);
this.conf.setDefaultProtocol(this.properties.getDefaultProtocolName());
this.conf.setRepository(this.properties.getFileSystemRecoveryRepository().getAbsolutePath());
this.conf.setRipristinoEventi(this.recoveryEventi);
this.conf.setRipristinoTransazioni(this.recoveryTransazioni);
this.conf.setTentativi(this.properties.getFileSystemRecoveryMaxAttempts());
this.conf.setProcessingEventFileAfterMs(this.recoveryEventiProcessingFileAfterMs);
this.conf.setProcessingTransactionFileAfterMs(this.recoveryTransazioniProcessingFileAfterMs);
this.conf.setMaxFileLimit(this.recoveryMaxFileLimit);
return true;
}catch(Exception e){
this.logCore.error("Errore durante il recovery da file system (InitConfigurazione): "+e.getMessage(),e);
return false;
}
}
@Override
public void process(){
if(TimerState.ENABLED.equals(STATE)) {
DBTransazioniManager dbManager = null;
Resource r = null;
try{
dbManager = DBTransazioniManager.getInstance();
r = dbManager.getResource(this.properties.getIdentitaPortaDefaultWithoutProtocol(), ID_MODULO, null);
if(r==null){
throw new CoreException("Risorsa al database non disponibile");
}
Connection con = (Connection) r.getResource();
if(con == null)
throw new TimerException("Connessione non disponibile");
org.openspcoop2.core.transazioni.dao.IServiceManager transazioniSM = null;
if(this.recoveryTransazioni){
transazioniSM = (org.openspcoop2.core.transazioni.dao.IServiceManager)
this.daoFactory.getServiceManager(org.openspcoop2.core.transazioni.utils.ProjectInfo.getInstance(), con,
this.daoFactoryServiceManagerPropertiesTransazioni, this.daoFactoryLogger);
}
org.openspcoop2.core.eventi.dao.IServiceManager pluginsSM = null;
if(this.recoveryEventi){
pluginsSM = (org.openspcoop2.core.eventi.dao.IServiceManager)
this.daoFactory.getServiceManager(org.openspcoop2.core.eventi.utils.ProjectInfo.getInstance(), con,
this.daoFactoryServiceManagerPropertiesPluginsEventi, this.daoFactoryLogger);
}
String causa = "Recupero transazioni da file system";
try{
if(this.properties.isFileSystemRecoveryLockEnabled()) {
GestoreMessaggi.acquireLock(
this.semaphore, con, this.timerLock,
this.msgDiag, causa,
this.properties.getFileSystemRecoveryGetLockAttesaAttiva(),
this.properties.getFileSystemRecoveryGetLockCheckInterval());
}
// Eventi
List<FSRecoveryObjectType> eventi = FSRecoveryObjectType.getOperazioniEventi();
if(eventi!=null && !eventi.isEmpty()) {
boolean maxIterationsReached = false;
for (FSRecoveryObjectType fsRecoveryObjectType : eventi) {
int iteration = 0;
long processed = 0;
do {
processed = process(fsRecoveryObjectType, con,
transazioniSM,
pluginsSM);
iteration++;
if(iteration >= MAX_ITERATIONS) {
String msg = "Raggiunto numero massimo di iterazioni ("+MAX_ITERATIONS+") per il tipo '"+fsRecoveryObjectType+"' (Eventi)";
this.logCore.warn(msg);
this.logTimer.warn(msg);
maxIterationsReached = true;
break;
}
} while(processed > 0);
if(maxIterationsReached) {
// voglio che vega comuunque terminata la gestione delle transazioni
break;
}
}
}
// Transazioni
List<FSRecoveryObjectType> transazioni = FSRecoveryObjectType.getOperazioniTransazioni();
if(transazioni!=null && !transazioni.isEmpty()) {
boolean maxIterationsReached = false;
for (FSRecoveryObjectType fsRecoveryObjectType : transazioni) {
int iteration = 0;
long processed = 0;
do {
processed = process(fsRecoveryObjectType, con,
transazioniSM,
pluginsSM);
iteration++;
if(iteration >= MAX_ITERATIONS) {
String msg = "Raggiunto numero massimo di iterazioni ("+MAX_ITERATIONS+") per il tipo '"+fsRecoveryObjectType+"' (Transazioni)";
this.logCore.warn(msg);
this.logTimer.warn(msg);
maxIterationsReached = true;
break;
}
} while(processed > 0);
if(maxIterationsReached) {
// voglio che vega comuunque terminata la gestione delle transazioni
break;
}
}
}
}finally{
if(this.properties.isFileSystemRecoveryLockEnabled()) {
try{
GestoreMessaggi.releaseLock(
this.semaphore, con, this.timerLock,
this.msgDiag, causa);
}catch(Exception e){
// ignore
}
}
}
}catch(Exception e){
this.logCore.error("Errore durante il recovery da file system: "+e.getMessage(),e);
}finally{
try{
if(r!=null)
dbManager.releaseResource(this.properties.getIdentitaPortaDefaultWithoutProtocol(), ID_MODULO, r);
}catch(Exception eClose){
// ignore
}
}
}
else {
this.logCore.info("Timer "+ID_MODULO+" disabilitato");
this.msgDiag.logPersonalizzato("disabilitato");
this.logTimer.info(this.msgDiag.getMessaggio_replaceKeywords("disabilitato"));
}
}
@Override
public void close(){
this.logCore.info("Thread per il recovery da file system terminato");
}
private long process(FSRecoveryObjectType objectType, Connection con,
org.openspcoop2.core.transazioni.dao.IServiceManager transazioniSM,
org.openspcoop2.core.eventi.dao.IServiceManager pluginsSM) {
long startGenerazione = DateManager.getTimeMillis();
this.msgDiag.addKeyword(CostantiPdD.KEY_TIPO_RECORD, objectType.name()); // riferito a intervallo
this.msgDiag.logPersonalizzato("recovery.inCorso");
this.logTimer.info(this.msgDiag.getMessaggio_replaceKeywords("recovery.inCorso"));
if(this.properties.isFileSystemRecoveryLockEnabled()) {
try{
GestoreMessaggi.updateLock(
this.semaphore, con, this.timerLock,
this.msgDiag, "Recovery '"+objectType+"' ...");
}catch(Throwable e){
this.msgDiag.logErroreGenerico(e,ID_MODULO+"-UpdateLock");
this.logTimer.error(ID_MODULO+"-UpdateLock: "+e.getMessage(),e);
return -1;
}
}
long l = FSRecoveryLibrary.generate(this.conf,
this.daoFactory, this.daoFactoryLogger, this.daoFactoryServiceManagerPropertiesTransazioni,
this.properties.getGestioneSerializableDBAttesaAttiva(), this.properties.getGestioneSerializableDBCheckInterval(),
transazioniSM,
this.loggerTracciamentoOpenSPCoopAppender,
this.loggerMsgDiagnosticoOpenSPCoopAppender,
this.loggerDumpOpenSPCoopAppender, this.transazioniRegistrazioneDumpHeadersCompactEnabled,
pluginsSM, con,
objectType);
long endGenerazione = DateManager.getTimeMillis();
String tempoImpiegato = Utilities.convertSystemTimeIntoStringMillisecondi((endGenerazione-startGenerazione), true);
this.msgDiag.addKeyword(CostantiPdD.KEY_TEMPO_GENERAZIONE, tempoImpiegato);
this.msgDiag.addKeyword(CostantiPdD.KEY_NUMERO_RECORD, l+"");
this.msgDiag.logPersonalizzato("recovery.effettuata");
this.logTimer.info(this.msgDiag.getMessaggio_replaceKeywords("recovery.effettuata"));
return l;
}
private void logTimerError(String msgErrore, Exception e) {
if(this.logTimer!=null) {
this.logTimer.error(msgErrore,e);
}
}
}