TransactionLibrary.java
/*
* GovWay - A customizable API Gateway
* https://govway.org
*
* Copyright (c) 2005-2024 Link.it srl (https://link.it).
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License version 3, as published by
* the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
*/
package org.openspcoop2.monitor.engine.transaction;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.openspcoop2.core.commons.dao.DAOFactory;
import org.openspcoop2.core.id.IDServizio;
import org.openspcoop2.core.transazioni.Transazione;
import org.openspcoop2.core.transazioni.TransazioneInfo;
import org.openspcoop2.core.transazioni.dao.ITransazioneInfoService;
import org.openspcoop2.core.transazioni.dao.ITransazioneInfoServiceSearch;
import org.openspcoop2.core.transazioni.dao.ITransazioneServiceSearch;
import org.openspcoop2.generic_project.beans.NonNegativeNumber;
import org.openspcoop2.generic_project.exception.NotFoundException;
import org.openspcoop2.generic_project.expression.IExpression;
import org.openspcoop2.generic_project.expression.IPaginatedExpression;
import org.openspcoop2.generic_project.expression.SortOrder;
import org.openspcoop2.monitor.engine.config.BasicServiceLibrary;
import org.openspcoop2.monitor.engine.config.BasicServiceLibraryReader;
import org.openspcoop2.monitor.engine.config.TransactionServiceLibrary;
import org.openspcoop2.monitor.engine.config.TransactionServiceLibraryReader;
import org.openspcoop2.utils.Utilities;
import org.slf4j.Logger;
/**
* TransactionLibrary
*
* @author Poli Andrea (apoli@link.it)
* @author $Author$
* @version $Rev$, $Date$
*/
public class TransactionLibrary {
private static final String ID_TRANSACTION_PROCESSOR = TransactionLibrary.class.getName();
private static HashMap<String,TransactionServiceLibrary> pluginTransazioni = new HashMap<String, TransactionServiceLibrary>();
public static TransactionServiceLibrary getTransactionServiceLibrary(IDServizio idServizio,BasicServiceLibraryReader basicServiceLibraryReader,
TransactionServiceLibraryReader transactionServiceLibraryReader,Logger log) throws Exception{
TransactionServiceLibrary transactionServiceLibrary = null;
if(pluginTransazioni.containsKey(idServizio.toString())){
transactionServiceLibrary = pluginTransazioni.get(idServizio.toString());
}
else{
transactionServiceLibrary = initAndGetTransactionServiceLibrary(idServizio, basicServiceLibraryReader, transactionServiceLibraryReader, log);
}
return transactionServiceLibrary;
}
private static synchronized TransactionServiceLibrary initAndGetTransactionServiceLibrary(IDServizio idServizio,BasicServiceLibraryReader basicServiceLibraryReader,
TransactionServiceLibraryReader transactionServiceLibraryReader,Logger log) throws Exception{
TransactionServiceLibrary transactionServiceLibrary = null;
if(pluginTransazioni.containsKey(idServizio.toString())){
transactionServiceLibrary = pluginTransazioni.get(idServizio.toString());
}
else{
BasicServiceLibrary basicServiceLibrary = basicServiceLibraryReader.read(idServizio, log);
if(basicServiceLibrary!=null){
transactionServiceLibrary = transactionServiceLibraryReader.readConfigurazioneTransazione(basicServiceLibrary, log);
}
if(transactionServiceLibrary==null){
transactionServiceLibrary = new TransactionServiceLibrary();
}
pluginTransazioni.put(idServizio.toString(), transactionServiceLibrary);
}
return transactionServiceLibrary;
}
public static void process(Logger logCore, DAOFactory daoFactory, boolean debug,
int poolSize, int msgForThread) {
int totaleTransazioni = 0;
ExecutorService threadsPool = null;
try {
org.openspcoop2.core.transazioni.dao.IServiceManager serviceManagerTransazioni = (org.openspcoop2.core.transazioni.dao.IServiceManager)
daoFactory.getServiceManager(org.openspcoop2.core.transazioni.utils.ProjectInfo.getInstance());
org.openspcoop2.core.plugins.dao.IServiceManager serviceManagerPluginsBase = (org.openspcoop2.core.plugins.dao.IServiceManager)
daoFactory.getServiceManager(org.openspcoop2.core.plugins.utils.ProjectInfo.getInstance());
org.openspcoop2.monitor.engine.config.transazioni.dao.IServiceManager serviceManagerPluginsTransazioni = (org.openspcoop2.monitor.engine.config.transazioni.dao.IServiceManager)
daoFactory.getServiceManager(org.openspcoop2.monitor.engine.config.transazioni.utils.ProjectInfo.getInstance());
org.openspcoop2.core.commons.search.dao.IServiceManager serviceManagerUtils = (org.openspcoop2.core.commons.search.dao.IServiceManager)
daoFactory.getServiceManager(org.openspcoop2.core.commons.search.utils.ProjectInfo.getInstance());
threadsPool = Executors.newFixedThreadPool(poolSize);
// ServiceLibraryManager
BasicServiceLibraryReader basicServiceLibraryReader = new BasicServiceLibraryReader(serviceManagerPluginsBase, serviceManagerUtils, debug);
TransactionServiceLibraryReader transactionServiceLibraryReader = new TransactionServiceLibraryReader(serviceManagerPluginsTransazioni, debug);
// Devo recuperare l'ultima data di esecuzione del processor.
ITransazioneInfoService transazioneInfoDAO = serviceManagerTransazioni.getTransazioneInfoService();
ITransazioneInfoServiceSearch transazioneInfoSearchDAO = serviceManagerTransazioni.getTransazioneInfoServiceSearch();
IExpression exprTransactionInfo = transazioneInfoSearchDAO.newExpression();
exprTransactionInfo.
equals(TransazioneInfo.model().TIPO, ID_TRANSACTION_PROCESSOR);
TransazioneInfo tInfo = null;
Date now = new Date();
Date lastRunning = null;
try{
tInfo = transazioneInfoSearchDAO.find(exprTransactionInfo);
lastRunning = tInfo.getData();
}catch(NotFoundException notFound){
// ignore
}
// Esamino le transazioni da processare
ITransazioneServiceSearch transazioneSearchDAO = serviceManagerTransazioni.getTransazioneServiceSearch();
IPaginatedExpression exprTransazioni = transazioneSearchDAO.newPaginatedExpression();
exprTransazioni.and();
if(lastRunning!=null){
exprTransazioni.greaterThan(Transazione.model().DATA_INGRESSO_RICHIESTA, lastRunning);
}
exprTransazioni.lessEquals(Transazione.model().DATA_INGRESSO_RICHIESTA, now);
NonNegativeNumber countTransazioniObject = transazioneSearchDAO.count(transazioneSearchDAO.toExpression(exprTransazioni));
long countTransazioni = countTransazioniObject.longValue();
logCore.info("Trovate ["+countTransazioni+"] transazioni da processare ...");
exprTransazioni.sortOrder(SortOrder.ASC).addOrder(Transazione.model().DATA_INGRESSO_RICHIESTA);
int limit = msgForThread*poolSize;
exprTransazioni.limit(limit).offset(0);
List<String> list = transazioneSearchDAO.findAllIds(exprTransazioni);
Date lastTransactionProcessed = null;
int threadNumber = 1;
while(list!=null && list.size()>0){
threadNumber = 1;
lastTransactionProcessed = null;
List<String> listIdTransazioni = new ArrayList<>();
List<TransactionProcessorThread> processorThreads = new ArrayList<TransactionProcessorThread>();
for (String idTransazione : list) {
listIdTransazioni.add(idTransazione);
if(listIdTransazioni.size()==msgForThread){
TransactionProcessorThread pt = new TransactionProcessorThread(listIdTransazioni, logCore,
basicServiceLibraryReader, transactionServiceLibraryReader, daoFactory,
threadNumber, debug);
threadsPool.execute(pt);
processorThreads.add(pt);
logCore.info("Avviato Thread #"+threadNumber+" ...");
listIdTransazioni = new ArrayList<>();
threadNumber++;
}
}
if(listIdTransazioni.size()>0){
// sono rimasti messaggi da far gestire ad un thread.
TransactionProcessorThread pt = new TransactionProcessorThread(listIdTransazioni, logCore,
basicServiceLibraryReader, transactionServiceLibraryReader, daoFactory,
threadNumber, debug);
threadsPool.execute(pt);
processorThreads.add(pt);
logCore.info("Avviato (coda msg non piena) Thread #"+threadNumber+" ...");
listIdTransazioni = new ArrayList<>();
threadNumber++;
}
int timeout = 10;
boolean terminated = false;
while(terminated == false){
logCore.info((threadNumber-1)+" threads avviati correttamente, attendo terminazione (timeout "+timeout+"s) ...");
for (int i = 0; i < timeout*4; i++) {
boolean tmpTerminated = true;
for (TransactionProcessorThread processorThread : processorThreads) {
if(processorThread.isFinished()==false){
tmpTerminated = false;
break;
}
}
if(tmpTerminated==false){
Utilities.sleep(250);
}
else{
terminated = true;
}
}
}
if(debug)
logCore.debug("Check Last Update ...");
String error = null;
for (TransactionProcessorThread processorThread : processorThreads) {
if(processorThread.getLastDateTransaction()!=null){
if(lastTransactionProcessed==null){
lastTransactionProcessed = processorThread.getLastDateTransaction();
}
else if(processorThread.getLastDateTransaction().after(lastTransactionProcessed)){
lastTransactionProcessed = processorThread.getLastDateTransaction();
}
}
if(error==null)
error = processorThread.getError();
}
if(error!=null){
logCore.error("Threads terminati con errori");
return;
}
if(lastTransactionProcessed!=null){
if(tInfo==null){
tInfo = new TransazioneInfo();
tInfo.setData(lastTransactionProcessed);
tInfo.setTipo(ID_TRANSACTION_PROCESSOR);
transazioneInfoDAO.create(tInfo);
}else{
tInfo.setData(lastTransactionProcessed);
transazioneInfoDAO.update(tInfo);
}
}
totaleTransazioni = totaleTransazioni + list.size();
// al termine del ciclo si aggiorna il timestamp dell'ultima transazione processata
logCore.info(totaleTransazioni+"/["+countTransazioni+"] transazioni processate (lastUpdate:"+lastTransactionProcessed+") ...");
exprTransazioni.offset(totaleTransazioni);
list = transazioneSearchDAO.findAllIds(exprTransazioni);
}
logCore.info(countTransazioni+"] transazioni processate correttamente");
return;
} catch (Exception e) {
logCore.error("TransactionProcessor ha riscontrato un errore: "+e.getMessage(),e);
return;
}finally{
try{
logCore.info("Shutdown pool ...");
threadsPool.shutdown();
logCore.info("Shutdown pool ok");
}catch(Throwable e){
// ignore
}
}
}
}