TransactionLibrary.java
- /*
- * GovWay - A customizable API Gateway
- * https://govway.org
- *
- * Copyright (c) 2005-2025 Link.it srl (https://link.it).
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU General Public License version 3, as published by
- * the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- *
- */
- package org.openspcoop2.monitor.engine.transaction;
- import java.util.ArrayList;
- import java.util.Date;
- import java.util.HashMap;
- import java.util.List;
- import java.util.concurrent.ExecutorService;
- import java.util.concurrent.Executors;
- import org.openspcoop2.core.commons.dao.DAOFactory;
- import org.openspcoop2.core.id.IDServizio;
- import org.openspcoop2.core.transazioni.Transazione;
- import org.openspcoop2.core.transazioni.TransazioneInfo;
- import org.openspcoop2.core.transazioni.dao.ITransazioneInfoService;
- import org.openspcoop2.core.transazioni.dao.ITransazioneInfoServiceSearch;
- import org.openspcoop2.core.transazioni.dao.ITransazioneServiceSearch;
- import org.openspcoop2.generic_project.beans.NonNegativeNumber;
- import org.openspcoop2.generic_project.exception.NotFoundException;
- import org.openspcoop2.generic_project.expression.IExpression;
- import org.openspcoop2.generic_project.expression.IPaginatedExpression;
- import org.openspcoop2.generic_project.expression.SortOrder;
- import org.openspcoop2.monitor.engine.config.BasicServiceLibrary;
- import org.openspcoop2.monitor.engine.config.BasicServiceLibraryReader;
- import org.openspcoop2.monitor.engine.config.TransactionServiceLibrary;
- import org.openspcoop2.monitor.engine.config.TransactionServiceLibraryReader;
- import org.openspcoop2.utils.Utilities;
- import org.slf4j.Logger;
- /**
- * TransactionLibrary
- *
- * @author Poli Andrea (apoli@link.it)
- * @author $Author$
- * @version $Rev$, $Date$
- */
- public class TransactionLibrary {
-
- private static final String ID_TRANSACTION_PROCESSOR = TransactionLibrary.class.getName();
-
- private static HashMap<String,TransactionServiceLibrary> pluginTransazioni = new HashMap<String, TransactionServiceLibrary>();
- public static TransactionServiceLibrary getTransactionServiceLibrary(IDServizio idServizio,BasicServiceLibraryReader basicServiceLibraryReader,
- TransactionServiceLibraryReader transactionServiceLibraryReader,Logger log) throws Exception{
- TransactionServiceLibrary transactionServiceLibrary = null;
- if(pluginTransazioni.containsKey(idServizio.toString())){
- transactionServiceLibrary = pluginTransazioni.get(idServizio.toString());
- }
- else{
- transactionServiceLibrary = initAndGetTransactionServiceLibrary(idServizio, basicServiceLibraryReader, transactionServiceLibraryReader, log);
- }
- return transactionServiceLibrary;
- }
- private static synchronized TransactionServiceLibrary initAndGetTransactionServiceLibrary(IDServizio idServizio,BasicServiceLibraryReader basicServiceLibraryReader,
- TransactionServiceLibraryReader transactionServiceLibraryReader,Logger log) throws Exception{
- TransactionServiceLibrary transactionServiceLibrary = null;
- if(pluginTransazioni.containsKey(idServizio.toString())){
- transactionServiceLibrary = pluginTransazioni.get(idServizio.toString());
- }
- else{
- BasicServiceLibrary basicServiceLibrary = basicServiceLibraryReader.read(idServizio, log);
- if(basicServiceLibrary!=null){
- transactionServiceLibrary = transactionServiceLibraryReader.readConfigurazioneTransazione(basicServiceLibrary, log);
- }
- if(transactionServiceLibrary==null){
- transactionServiceLibrary = new TransactionServiceLibrary();
- }
- pluginTransazioni.put(idServizio.toString(), transactionServiceLibrary);
- }
- return transactionServiceLibrary;
- }
-
- public static void process(Logger logCore, DAOFactory daoFactory, boolean debug,
- int poolSize, int msgForThread) {
-
- int totaleTransazioni = 0;
- ExecutorService threadsPool = null;
- try {
-
- org.openspcoop2.core.transazioni.dao.IServiceManager serviceManagerTransazioni = (org.openspcoop2.core.transazioni.dao.IServiceManager)
- daoFactory.getServiceManager(org.openspcoop2.core.transazioni.utils.ProjectInfo.getInstance());
- org.openspcoop2.core.plugins.dao.IServiceManager serviceManagerPluginsBase = (org.openspcoop2.core.plugins.dao.IServiceManager)
- daoFactory.getServiceManager(org.openspcoop2.core.plugins.utils.ProjectInfo.getInstance());
- org.openspcoop2.monitor.engine.config.transazioni.dao.IServiceManager serviceManagerPluginsTransazioni = (org.openspcoop2.monitor.engine.config.transazioni.dao.IServiceManager)
- daoFactory.getServiceManager(org.openspcoop2.monitor.engine.config.transazioni.utils.ProjectInfo.getInstance());
- org.openspcoop2.core.commons.search.dao.IServiceManager serviceManagerUtils = (org.openspcoop2.core.commons.search.dao.IServiceManager)
- daoFactory.getServiceManager(org.openspcoop2.core.commons.search.utils.ProjectInfo.getInstance());
-
- threadsPool = Executors.newFixedThreadPool(poolSize);
-
-
- // ServiceLibraryManager
- BasicServiceLibraryReader basicServiceLibraryReader = new BasicServiceLibraryReader(serviceManagerPluginsBase, serviceManagerUtils, debug);
- TransactionServiceLibraryReader transactionServiceLibraryReader = new TransactionServiceLibraryReader(serviceManagerPluginsTransazioni, debug);
-
-
- // Devo recuperare l'ultima data di esecuzione del processor.
- ITransazioneInfoService transazioneInfoDAO = serviceManagerTransazioni.getTransazioneInfoService();
- ITransazioneInfoServiceSearch transazioneInfoSearchDAO = serviceManagerTransazioni.getTransazioneInfoServiceSearch();
- IExpression exprTransactionInfo = transazioneInfoSearchDAO.newExpression();
- exprTransactionInfo.
- equals(TransazioneInfo.model().TIPO, ID_TRANSACTION_PROCESSOR);
- TransazioneInfo tInfo = null;
- Date now = new Date();
- Date lastRunning = null;
- try{
- tInfo = transazioneInfoSearchDAO.find(exprTransactionInfo);
- lastRunning = tInfo.getData();
- }catch(NotFoundException notFound){
- // ignore
- }
-
-
- // Esamino le transazioni da processare
- ITransazioneServiceSearch transazioneSearchDAO = serviceManagerTransazioni.getTransazioneServiceSearch();
- IPaginatedExpression exprTransazioni = transazioneSearchDAO.newPaginatedExpression();
- exprTransazioni.and();
- if(lastRunning!=null){
- exprTransazioni.greaterThan(Transazione.model().DATA_INGRESSO_RICHIESTA, lastRunning);
- }
- exprTransazioni.lessEquals(Transazione.model().DATA_INGRESSO_RICHIESTA, now);
-
- NonNegativeNumber countTransazioniObject = transazioneSearchDAO.count(transazioneSearchDAO.toExpression(exprTransazioni));
- long countTransazioni = countTransazioniObject.longValue();
- logCore.info("Trovate ["+countTransazioni+"] transazioni da processare ...");
-
-
- exprTransazioni.sortOrder(SortOrder.ASC).addOrder(Transazione.model().DATA_INGRESSO_RICHIESTA);
- int limit = msgForThread*poolSize;
- exprTransazioni.limit(limit).offset(0);
- List<String> list = transazioneSearchDAO.findAllIds(exprTransazioni);
-
- Date lastTransactionProcessed = null;
-
- int threadNumber = 1;
-
- while(list!=null && list.size()>0){
-
- threadNumber = 1;
-
- lastTransactionProcessed = null;
-
- List<String> listIdTransazioni = new ArrayList<>();
-
- List<TransactionProcessorThread> processorThreads = new ArrayList<TransactionProcessorThread>();
-
- for (String idTransazione : list) {
- listIdTransazioni.add(idTransazione);
-
- if(listIdTransazioni.size()==msgForThread){
- TransactionProcessorThread pt = new TransactionProcessorThread(listIdTransazioni, logCore,
- basicServiceLibraryReader, transactionServiceLibraryReader, daoFactory,
- threadNumber, debug);
- threadsPool.execute(pt);
- processorThreads.add(pt);
- logCore.info("Avviato Thread #"+threadNumber+" ...");
- listIdTransazioni = new ArrayList<>();
- threadNumber++;
- }
-
- }
- if(listIdTransazioni.size()>0){
- // sono rimasti messaggi da far gestire ad un thread.
- TransactionProcessorThread pt = new TransactionProcessorThread(listIdTransazioni, logCore,
- basicServiceLibraryReader, transactionServiceLibraryReader, daoFactory,
- threadNumber, debug);
- threadsPool.execute(pt);
- processorThreads.add(pt);
- logCore.info("Avviato (coda msg non piena) Thread #"+threadNumber+" ...");
- listIdTransazioni = new ArrayList<>();
- threadNumber++;
- }
-
- int timeout = 10;
- boolean terminated = false;
- while(terminated == false){
- logCore.info((threadNumber-1)+" threads avviati correttamente, attendo terminazione (timeout "+timeout+"s) ...");
- for (int i = 0; i < timeout*4; i++) {
- boolean tmpTerminated = true;
- for (TransactionProcessorThread processorThread : processorThreads) {
- if(processorThread.isFinished()==false){
- tmpTerminated = false;
- break;
- }
- }
- if(tmpTerminated==false){
- Utilities.sleep(250);
- }
- else{
- terminated = true;
- }
- }
- }
-
-
- if(debug)
- logCore.debug("Check Last Update ...");
-
- String error = null;
- for (TransactionProcessorThread processorThread : processorThreads) {
- if(processorThread.getLastDateTransaction()!=null){
- if(lastTransactionProcessed==null){
- lastTransactionProcessed = processorThread.getLastDateTransaction();
- }
- else if(processorThread.getLastDateTransaction().after(lastTransactionProcessed)){
- lastTransactionProcessed = processorThread.getLastDateTransaction();
- }
- }
- if(error==null)
- error = processorThread.getError();
- }
- if(error!=null){
- logCore.error("Threads terminati con errori");
- return;
- }
- if(lastTransactionProcessed!=null){
- if(tInfo==null){
- tInfo = new TransazioneInfo();
- tInfo.setData(lastTransactionProcessed);
- tInfo.setTipo(ID_TRANSACTION_PROCESSOR);
- transazioneInfoDAO.create(tInfo);
- }else{
- tInfo.setData(lastTransactionProcessed);
- transazioneInfoDAO.update(tInfo);
- }
- }
-
- totaleTransazioni = totaleTransazioni + list.size();
-
- // al termine del ciclo si aggiorna il timestamp dell'ultima transazione processata
- logCore.info(totaleTransazioni+"/["+countTransazioni+"] transazioni processate (lastUpdate:"+lastTransactionProcessed+") ...");
- exprTransazioni.offset(totaleTransazioni);
- list = transazioneSearchDAO.findAllIds(exprTransazioni);
- }
-
- logCore.info(countTransazioni+"] transazioni processate correttamente");
- return;
-
- } catch (Exception e) {
- logCore.error("TransactionProcessor ha riscontrato un errore: "+e.getMessage(),e);
- return;
- }finally{
- try{
- logCore.info("Shutdown pool ...");
- threadsPool.shutdown();
- logCore.info("Shutdown pool ok");
- }catch(Throwable e){
- // ignore
- }
- }
- }
- }