GestoreCodaRunnable.java

  1. /*
  2.  * GovWay - A customizable API Gateway
  3.  * https://govway.org
  4.  *
  5.  * Copyright (c) 2005-2025 Link.it srl (https://link.it).
  6.  *
  7.  * This program is free software: you can redistribute it and/or modify
  8.  * it under the terms of the GNU General Public License version 3, as published by
  9.  * the Free Software Foundation.
  10.  *
  11.  * This program is distributed in the hope that it will be useful,
  12.  * but WITHOUT ANY WARRANTY; without even the implied warranty of
  13.  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  14.  * GNU General Public License for more details.
  15.  *
  16.  * You should have received a copy of the GNU General Public License
  17.  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
  18.  *
  19.  */



  20. package org.openspcoop2.utils.threads;

  21. import java.util.ArrayList;
  22. import java.util.HashMap;
  23. import java.util.List;
  24. import java.util.Map;
  25. import java.util.Set;
  26. import java.util.concurrent.ExecutorService;
  27. import java.util.concurrent.Executors;
  28. import java.util.concurrent.ThreadPoolExecutor;

  29. import org.openspcoop2.utils.UtilsException;
  30. import org.slf4j.Logger;

  31. /**
  32.  * GestoreCodaRunnable
  33.  *
  34.  *  
  35.  * @author Poli Andrea (apoli@link.it)
  36.  * @author $Author$
  37.  * @version $Rev$, $Date$
  38.  */
  39. public class GestoreCodaRunnable extends BaseThread{
  40.    
  41.     /** Logger utilizzato per debug. */
  42.     private RunnableLogger log = null;
  43.    
  44.     /** ThreadsPool */
  45.     private ExecutorService threadsPool = null;
  46.     private int poolSize = -1;
  47.     private int queueSize = -1;
  48.     private int limit = -1;
  49.     private Map<String, Runnable> threads = new HashMap<>();
  50.        
  51.     /** Instance */
  52.     private IGestoreCodaRunnableInstance gestoreRunnable;
  53.    
  54.     /** Nome */
  55.     private String name;
  56.    

  57.    
  58.     public String getThreadsImage() {
  59.         if(this.threadsPool instanceof ThreadPoolExecutor) {
  60.             ThreadPoolExecutor tpe = (ThreadPoolExecutor) this.threadsPool;
  61.             return
  62.                     String.format("(queue:%d) [%d/%d] Active: %d, Completed: %d, Task: %d, isShutdown: %s, isTerminated: %s",
  63.                             this.threads.size(),
  64.                             tpe.getPoolSize(),
  65.                             tpe.getCorePoolSize(),
  66.                             tpe.getActiveCount(),
  67.                             tpe.getCompletedTaskCount(),
  68.                             tpe.getTaskCount(),
  69.                             tpe.isShutdown(),
  70.                             tpe.isTerminated());
  71.         }
  72.         return null;
  73.     }
  74.    
  75.    
  76.     /** Costruttore */
  77.     public GestoreCodaRunnable(String name, int poolSize, int queueSize, int limit, int timeoutNextCheck, IGestoreCodaRunnableInstance gestoreRunnable, Logger log) throws UtilsException{
  78.        
  79.         this.name = name;
  80.         this.log = new RunnableLogger(name, log);
  81.         this.gestoreRunnable = gestoreRunnable;
  82.        
  83.         try {
  84.             if(this.gestoreRunnable!=null) {
  85.                 this.gestoreRunnable.initialize(this.log);
  86.             }
  87.         }catch(Throwable t) {
  88.             throw new UtilsException(t.getMessage(),t);
  89.         }  

  90.         try {

  91.             this.poolSize = poolSize;
  92.             if(this.poolSize>0) {
  93.                 this.threadsPool = Executors.newFixedThreadPool(this.poolSize);
  94.                 this.log.info("Inizializzato correttamente");
  95.             }
  96.             else {
  97.                 this.log.error("Non sono stati definiti threads");
  98.             }
  99.            
  100.             this.queueSize = queueSize;
  101.             if(this.queueSize<=0) {
  102.                 this.log.error("Non è stata definita la dimensione della coda");
  103.             }
  104.            
  105.             this.limit = limit;
  106.             if(this.limit<=0) {
  107.                 this.log.error("Non è stata definito il limite di quanti thread creare per volta");
  108.             }
  109.            
  110.             this.setTimeout(timeoutNextCheck);
  111.             if(this.limit<=0) {
  112.                 this.log.error("Non è stata definito il timeout di attesa prima di verificare la presenza di nuovi threads da attivare");
  113.             }
  114.            
  115.         }catch(Exception e) {
  116.             throw new UtilsException("Inizializzazione pool di threads non riuscita: "+e.getMessage(),e);
  117.         }
  118.        
  119.        
  120.     }
  121.    
  122.     @Override
  123.     public void process(){
  124.         // nop: ho ridefinito il metodo run
  125.     }
  126.    
  127.     @Override
  128.     public void run(){
  129.        
  130.         try {
  131.        
  132.             if(this.threadsPool==null) {
  133.                 return; // termino subito
  134.             }
  135.            
  136.             HashMap<String, Object> context = new HashMap<>();
  137.             this.gestoreRunnable.logCheckInProgress(context);
  138.            
  139.             while(this.isStop() == false) {
  140.                            
  141.                 // Print actual image status
  142.                 this.log.info("Immagine prima del controllo sui threads terminati: "+this.getThreadsImage());
  143.                
  144.                 // Verifico se nella tabella dei threads registrati vi sono thread terminati
  145.                 if(!this.threads.isEmpty()) {
  146.                     this.log.debug("Verifico se tra i threads registrati ve ne sono alcuni terminati ...");
  147.                     List<String> ids = new ArrayList<>();
  148.                     ids.addAll(this.threads.keySet());
  149.                     for (String id : ids) {
  150.                         Runnable r = this.threads.get(id);
  151.                         if(r.isFinished()) {
  152.                             this.log.debug("Elimino dalla coda thread '"+id+"' terminato");
  153.                             this.threads.remove(id);
  154.                         }
  155.                     }
  156.                 }
  157.                
  158.                 // Print actual image status
  159.                 this.log.info("Immagine dopo il controllo sui threads terminati: "+this.getThreadsImage());
  160.                
  161.                 // Se vi è la possibilità di inserire in coda nuovi threads lo faccio
  162.                 int limit = this.queueSize - this.threads.size();
  163.                 boolean sleep = false;
  164.                 if(limit>0) {
  165.                     if(limit>this.limit) {
  166.                         limit = this.limit;
  167.                     }
  168.                     this.log.info("Ricerco nuovi threads da attivare (limit: "+limit+") ...");
  169.                     List<Runnable> list_nextRunnable = null;
  170.                     try {
  171.                         list_nextRunnable = this.gestoreRunnable.nextRunnable(limit);
  172.                     }catch(Throwable t) {
  173.                         this.log.error("Errore durante la ricerca di nuovi threads (limit: "+limit+"): "+t.getMessage(),t);
  174.                     }
  175.                     if(list_nextRunnable!=null && !list_nextRunnable.isEmpty()) {
  176.                         this.log.info("Trovati "+list_nextRunnable.size()+" threads da attivare");
  177.                         for (Runnable thread : list_nextRunnable) {
  178.                             String threadName = this.name+"-t"+getUniqueSerialNumber();
  179.                             if(thread.getIdentifier()!=null && !"".equals(thread.getIdentifier())) {
  180.                                 threadName = threadName+"-"+thread.getIdentifier();
  181.                             }
  182.                             try {
  183.                                 this.log.debug("Aggiungo in coda nuovo thread '"+threadName+"' ...");
  184.                                 thread.initialize(new RunnableLogger(threadName,this.log.getLog()));
  185.                                 this.threadsPool.execute(thread);
  186.                                 this.threads.put(threadName, thread);
  187.                                 this.log.info("Thread '"+threadName+"' aggiunto in coda");
  188.                             }catch(Throwable t) {
  189.                                 this.log.error("Errore durante l'aggiunta in coda del thread '"+threadName+"': "+t.getMessage(),t);
  190.                             }
  191.                         }
  192.                        
  193.                         // Print actual image status
  194.                         this.log.info("Immagine dopo l'inserimento in coda dei nuovi threads: "+this.getThreadsImage());
  195.                        
  196.                         this.gestoreRunnable.logRegisteredThreads(context, list_nextRunnable.size());
  197.                     }
  198.                     else {
  199.                         this.log.info("Trovati "+0+" threads da attivare");
  200.                         sleep = true;
  201.                     }
  202.                 }
  203.                 else {
  204.                     this.log.info("La coda dei threads ha raggiunto la capacità massima (size: "+this.queueSize+")");
  205.                     sleep = true;
  206.                 }
  207.                
  208.                 if(sleep) {
  209.                    
  210.                     this.gestoreRunnable.logCheckFinished(context);
  211.                    
  212.                     this.sleepForNextCheck(this.getTimeout(), 1000);
  213.                    
  214.                     context = new HashMap<>();
  215.                     this.gestoreRunnable.logCheckInProgress(context);
  216.                 }
  217.             }
  218.    
  219.            
  220.             try {      
  221.                 this.log.debug("Richiedo sospensione threads ...");
  222.                 // Fermo threads
  223.                 Set<String> keySet = this.threads.keySet();
  224.                 for (String threadName : keySet) {
  225.                     Runnable thread = this.threads.get(threadName);
  226.                     thread.setStop(true);
  227.                 }          
  228.             }catch(Throwable t) {
  229.                 this.log.error("Errore durante lo stop dei threads: "+t.getMessage(),t);
  230.             }
  231.                
  232.             try{
  233.                 // Attendo chiusura dei threads
  234.                 int timeout = 10;
  235.                 boolean terminated = false;
  236.                 while(terminated == false){
  237.                     this.log.info((this.threads.size())+" threads avviati correttamente, attendo terminazione (timeout "+timeout+"s) ...");
  238.                     for (int i = 0; i < timeout*4; i++) {
  239.                         boolean tmpTerminated = true;
  240.                         Set<String> keySet = this.threads.keySet();
  241.                         for (String threadName : keySet) {
  242.                             Runnable thread = this.threads.get(threadName);
  243.                             if(thread.isFinished()==false){
  244.                                 tmpTerminated = false;
  245.                                 break;
  246.                             }
  247.                         }
  248.                         if(tmpTerminated==false){
  249.                             org.openspcoop2.utils.Utilities.sleep(250);
  250.                         }
  251.                         else{
  252.                             terminated = true;
  253.                         }
  254.                     }
  255.                 }
  256.                 this.log.info((this.threads.size())+" threads avviati correttamente, attesa della terminazione (timeout "+timeout+"s) completata");
  257.                
  258.             }catch(Exception e){
  259.                 this.log.error("Errore durante l'attesa della terminazione dei threads: "+e.getMessage(),e);
  260.             }finally{
  261.             }
  262.        
  263.         }finally {
  264.             this.finished();
  265.         }
  266.        
  267.     }
  268.    
  269.     private long uniqueSerialNumber = 0;
  270.     private synchronized long getUniqueSerialNumber(){
  271.         if((this.uniqueSerialNumber+1) > Long.MAX_VALUE){
  272.             this.uniqueSerialNumber = 0;
  273.         }
  274.         this.uniqueSerialNumber++;
  275.         return this.uniqueSerialNumber;
  276.     }
  277. }