TransactionLibrary.java

  1. /*
  2.  * GovWay - A customizable API Gateway
  3.  * https://govway.org
  4.  *
  5.  * Copyright (c) 2005-2025 Link.it srl (https://link.it).
  6.  *
  7.  * This program is free software: you can redistribute it and/or modify
  8.  * it under the terms of the GNU General Public License version 3, as published by
  9.  * the Free Software Foundation.
  10.  *
  11.  * This program is distributed in the hope that it will be useful,
  12.  * but WITHOUT ANY WARRANTY; without even the implied warranty of
  13.  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  14.  * GNU General Public License for more details.
  15.  *
  16.  * You should have received a copy of the GNU General Public License
  17.  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
  18.  *
  19.  */
  20. package org.openspcoop2.monitor.engine.transaction;

  21. import java.util.ArrayList;
  22. import java.util.Date;
  23. import java.util.HashMap;
  24. import java.util.List;
  25. import java.util.concurrent.ExecutorService;
  26. import java.util.concurrent.Executors;

  27. import org.openspcoop2.core.commons.dao.DAOFactory;
  28. import org.openspcoop2.core.id.IDServizio;
  29. import org.openspcoop2.core.transazioni.Transazione;
  30. import org.openspcoop2.core.transazioni.TransazioneInfo;
  31. import org.openspcoop2.core.transazioni.dao.ITransazioneInfoService;
  32. import org.openspcoop2.core.transazioni.dao.ITransazioneInfoServiceSearch;
  33. import org.openspcoop2.core.transazioni.dao.ITransazioneServiceSearch;
  34. import org.openspcoop2.generic_project.beans.NonNegativeNumber;
  35. import org.openspcoop2.generic_project.exception.NotFoundException;
  36. import org.openspcoop2.generic_project.expression.IExpression;
  37. import org.openspcoop2.generic_project.expression.IPaginatedExpression;
  38. import org.openspcoop2.generic_project.expression.SortOrder;
  39. import org.openspcoop2.monitor.engine.config.BasicServiceLibrary;
  40. import org.openspcoop2.monitor.engine.config.BasicServiceLibraryReader;
  41. import org.openspcoop2.monitor.engine.config.TransactionServiceLibrary;
  42. import org.openspcoop2.monitor.engine.config.TransactionServiceLibraryReader;
  43. import org.openspcoop2.utils.Utilities;
  44. import org.slf4j.Logger;

  45. /**
  46.  * TransactionLibrary
  47.  *
  48.  * @author Poli Andrea (apoli@link.it)
  49.  * @author $Author$
  50.  * @version $Rev$, $Date$
  51.  */
  52. public class TransactionLibrary {
  53.    
  54.     private static final String ID_TRANSACTION_PROCESSOR = TransactionLibrary.class.getName();
  55.    
  56.     private static HashMap<String,TransactionServiceLibrary> pluginTransazioni = new HashMap<String, TransactionServiceLibrary>();

  57.     public static TransactionServiceLibrary getTransactionServiceLibrary(IDServizio idServizio,BasicServiceLibraryReader basicServiceLibraryReader,
  58.             TransactionServiceLibraryReader transactionServiceLibraryReader,Logger log) throws Exception{
  59.         TransactionServiceLibrary transactionServiceLibrary = null;
  60.         if(pluginTransazioni.containsKey(idServizio.toString())){
  61.             transactionServiceLibrary = pluginTransazioni.get(idServizio.toString());
  62.         }
  63.         else{
  64.             transactionServiceLibrary = initAndGetTransactionServiceLibrary(idServizio, basicServiceLibraryReader, transactionServiceLibraryReader, log);
  65.         }
  66.         return transactionServiceLibrary;
  67.     }
  68.     private static synchronized TransactionServiceLibrary initAndGetTransactionServiceLibrary(IDServizio idServizio,BasicServiceLibraryReader basicServiceLibraryReader,
  69.             TransactionServiceLibraryReader transactionServiceLibraryReader,Logger log) throws Exception{
  70.         TransactionServiceLibrary transactionServiceLibrary = null;
  71.         if(pluginTransazioni.containsKey(idServizio.toString())){
  72.             transactionServiceLibrary = pluginTransazioni.get(idServizio.toString());
  73.         }
  74.         else{
  75.             BasicServiceLibrary basicServiceLibrary = basicServiceLibraryReader.read(idServizio, log);
  76.             if(basicServiceLibrary!=null){
  77.                 transactionServiceLibrary = transactionServiceLibraryReader.readConfigurazioneTransazione(basicServiceLibrary, log);
  78.             }
  79.             if(transactionServiceLibrary==null){
  80.                 transactionServiceLibrary = new TransactionServiceLibrary();
  81.             }
  82.             pluginTransazioni.put(idServizio.toString(), transactionServiceLibrary);
  83.         }
  84.         return transactionServiceLibrary;
  85.     }
  86.    
  87.     public static void process(Logger logCore, DAOFactory daoFactory, boolean debug,
  88.             int poolSize, int msgForThread) {
  89.        
  90.         int totaleTransazioni = 0;
  91.         ExecutorService threadsPool = null;
  92.         try {
  93.            
  94.             org.openspcoop2.core.transazioni.dao.IServiceManager serviceManagerTransazioni = (org.openspcoop2.core.transazioni.dao.IServiceManager)
  95.                     daoFactory.getServiceManager(org.openspcoop2.core.transazioni.utils.ProjectInfo.getInstance());
  96.             org.openspcoop2.core.plugins.dao.IServiceManager serviceManagerPluginsBase = (org.openspcoop2.core.plugins.dao.IServiceManager)
  97.                     daoFactory.getServiceManager(org.openspcoop2.core.plugins.utils.ProjectInfo.getInstance());
  98.             org.openspcoop2.monitor.engine.config.transazioni.dao.IServiceManager serviceManagerPluginsTransazioni = (org.openspcoop2.monitor.engine.config.transazioni.dao.IServiceManager)
  99.                     daoFactory.getServiceManager(org.openspcoop2.monitor.engine.config.transazioni.utils.ProjectInfo.getInstance());
  100.             org.openspcoop2.core.commons.search.dao.IServiceManager serviceManagerUtils = (org.openspcoop2.core.commons.search.dao.IServiceManager)
  101.                     daoFactory.getServiceManager(org.openspcoop2.core.commons.search.utils.ProjectInfo.getInstance());
  102.            
  103.             threadsPool = Executors.newFixedThreadPool(poolSize);
  104.            
  105.            
  106.             // ServiceLibraryManager
  107.             BasicServiceLibraryReader basicServiceLibraryReader = new BasicServiceLibraryReader(serviceManagerPluginsBase, serviceManagerUtils, debug);
  108.             TransactionServiceLibraryReader transactionServiceLibraryReader = new TransactionServiceLibraryReader(serviceManagerPluginsTransazioni, debug);
  109.            
  110.            
  111.             // Devo recuperare l'ultima data di esecuzione del processor.
  112.             ITransazioneInfoService transazioneInfoDAO = serviceManagerTransazioni.getTransazioneInfoService();
  113.             ITransazioneInfoServiceSearch  transazioneInfoSearchDAO = serviceManagerTransazioni.getTransazioneInfoServiceSearch();
  114.             IExpression exprTransactionInfo = transazioneInfoSearchDAO.newExpression();
  115.             exprTransactionInfo.
  116.                 equals(TransazioneInfo.model().TIPO, ID_TRANSACTION_PROCESSOR);
  117.             TransazioneInfo tInfo = null;
  118.             Date now = new Date();
  119.             Date lastRunning = null;
  120.             try{
  121.                 tInfo = transazioneInfoSearchDAO.find(exprTransactionInfo);
  122.                 lastRunning = tInfo.getData();
  123.             }catch(NotFoundException notFound){
  124.                 // ignore
  125.             }
  126.            
  127.            
  128.             // Esamino le transazioni da processare
  129.             ITransazioneServiceSearch transazioneSearchDAO = serviceManagerTransazioni.getTransazioneServiceSearch();
  130.             IPaginatedExpression exprTransazioni = transazioneSearchDAO.newPaginatedExpression();
  131.             exprTransazioni.and();
  132.             if(lastRunning!=null){
  133.                 exprTransazioni.greaterThan(Transazione.model().DATA_INGRESSO_RICHIESTA, lastRunning);
  134.             }
  135.             exprTransazioni.lessEquals(Transazione.model().DATA_INGRESSO_RICHIESTA, now);
  136.                
  137.             NonNegativeNumber countTransazioniObject = transazioneSearchDAO.count(transazioneSearchDAO.toExpression(exprTransazioni));
  138.             long countTransazioni = countTransazioniObject.longValue();
  139.             logCore.info("Trovate ["+countTransazioni+"] transazioni da processare ...");
  140.            
  141.            
  142.             exprTransazioni.sortOrder(SortOrder.ASC).addOrder(Transazione.model().DATA_INGRESSO_RICHIESTA);
  143.             int limit = msgForThread*poolSize;
  144.             exprTransazioni.limit(limit).offset(0);
  145.             List<String> list = transazioneSearchDAO.findAllIds(exprTransazioni);
  146.            
  147.             Date lastTransactionProcessed = null;
  148.            
  149.             int threadNumber = 1;
  150.            
  151.             while(list!=null && list.size()>0){
  152.                
  153.                 threadNumber = 1;
  154.                
  155.                 lastTransactionProcessed = null;
  156.                
  157.                 List<String> listIdTransazioni = new ArrayList<>();
  158.                
  159.                 List<TransactionProcessorThread> processorThreads = new ArrayList<TransactionProcessorThread>();
  160.                
  161.                 for (String idTransazione : list) {

  162.                     listIdTransazioni.add(idTransazione);
  163.                    
  164.                     if(listIdTransazioni.size()==msgForThread){
  165.                         TransactionProcessorThread pt = new TransactionProcessorThread(listIdTransazioni, logCore,
  166.                                 basicServiceLibraryReader, transactionServiceLibraryReader, daoFactory,
  167.                                 threadNumber, debug);
  168.                         threadsPool.execute(pt);
  169.                         processorThreads.add(pt);
  170.                         logCore.info("Avviato Thread #"+threadNumber+" ...");
  171.                         listIdTransazioni = new ArrayList<>();
  172.                         threadNumber++;
  173.                     }
  174.                    
  175.                 }

  176.                 if(listIdTransazioni.size()>0){
  177.                     // sono rimasti messaggi da far gestire ad un thread.
  178.                     TransactionProcessorThread pt = new TransactionProcessorThread(listIdTransazioni, logCore,
  179.                             basicServiceLibraryReader, transactionServiceLibraryReader, daoFactory,
  180.                             threadNumber, debug);
  181.                     threadsPool.execute(pt);
  182.                     processorThreads.add(pt);
  183.                     logCore.info("Avviato (coda msg non piena) Thread #"+threadNumber+" ...");
  184.                     listIdTransazioni = new ArrayList<>();
  185.                     threadNumber++;
  186.                 }
  187.                
  188.                 int timeout = 10;
  189.                 boolean terminated = false;
  190.                 while(terminated == false){
  191.                     logCore.info((threadNumber-1)+" threads avviati correttamente, attendo terminazione (timeout "+timeout+"s) ...");
  192.                     for (int i = 0; i < timeout*4; i++) {
  193.                         boolean tmpTerminated = true;
  194.                         for (TransactionProcessorThread processorThread : processorThreads) {
  195.                             if(processorThread.isFinished()==false){
  196.                                 tmpTerminated = false;
  197.                                 break;
  198.                             }
  199.                         }
  200.                         if(tmpTerminated==false){
  201.                             Utilities.sleep(250);
  202.                         }
  203.                         else{
  204.                             terminated = true;
  205.                         }
  206.                     }
  207.                 }
  208.                
  209.                
  210.                 if(debug)
  211.                     logCore.debug("Check Last Update ...");
  212.                    
  213.                 String error = null;
  214.                 for (TransactionProcessorThread processorThread : processorThreads) {
  215.                     if(processorThread.getLastDateTransaction()!=null){
  216.                         if(lastTransactionProcessed==null){
  217.                             lastTransactionProcessed = processorThread.getLastDateTransaction();
  218.                         }
  219.                         else if(processorThread.getLastDateTransaction().after(lastTransactionProcessed)){
  220.                             lastTransactionProcessed = processorThread.getLastDateTransaction();
  221.                         }
  222.                     }
  223.                     if(error==null)
  224.                         error = processorThread.getError();
  225.                 }
  226.                 if(error!=null){
  227.                     logCore.error("Threads terminati con errori");
  228.                     return;
  229.                 }
  230.                 if(lastTransactionProcessed!=null){
  231.                     if(tInfo==null){
  232.                         tInfo = new TransazioneInfo();
  233.                         tInfo.setData(lastTransactionProcessed);
  234.                         tInfo.setTipo(ID_TRANSACTION_PROCESSOR);
  235.                         transazioneInfoDAO.create(tInfo);
  236.                     }else{
  237.                         tInfo.setData(lastTransactionProcessed);
  238.                         transazioneInfoDAO.update(tInfo);
  239.                     }
  240.                 }
  241.                
  242.                 totaleTransazioni = totaleTransazioni + list.size();
  243.                    
  244.                 // al termine del ciclo si aggiorna il timestamp dell'ultima transazione processata
  245.                 logCore.info(totaleTransazioni+"/["+countTransazioni+"] transazioni processate (lastUpdate:"+lastTransactionProcessed+") ...");
  246.                 exprTransazioni.offset(totaleTransazioni);
  247.                 list = transazioneSearchDAO.findAllIds(exprTransazioni);
  248.             }
  249.                        
  250.             logCore.info(countTransazioni+"] transazioni processate correttamente");
  251.             return;
  252.            
  253.         } catch (Exception e) {
  254.             logCore.error("TransactionProcessor ha riscontrato un errore: "+e.getMessage(),e);
  255.             return;
  256.         }finally{
  257.             try{
  258.                 logCore.info("Shutdown pool ...");
  259.                 threadsPool.shutdown();
  260.                 logCore.info("Shutdown pool ok");
  261.             }catch(Throwable e){
  262.                 // ignore
  263.             }
  264.         }


  265.     }

  266. }