ConnectorApplicativeThreadPool.java
/*
* GovWay - A customizable API Gateway
* https://govway.org
*
* Copyright (c) 2005-2025 Link.it srl (https://link.it).
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License version 3, as published by
* the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
*/
package org.openspcoop2.pdd.services.connector;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicInteger;
import org.openspcoop2.pdd.config.OpenSPCoop2Properties;
import org.openspcoop2.utils.threads.MonitoredVirtualThreadExecutor;
/**
* ConnectorApplicativeThreadPool
*
* @author Poli Andrea (apoli@link.it)
*
* @author $Author$
* @version $Rev$, $Date$
*/
public class ConnectorApplicativeThreadPool {
private ConnectorApplicativeThreadPool() {}
// I thread attivi oltre ai seguenti sono i worker thread del web server e il pool di thread utilizzato dalla libreria client NIO
/* Per i client della libreria NIO utilizzando CloseableHttpAsyncClient (dalla libreria Apache HttpComponents),
* il pool di thread predefinito dipende dal java.util.concurrent.ExecutorService utilizzato internamente.
* Se non configurato esplicitamente un executor, il client utilizza un DefaultConnectingIOReactor per gestire le connessioni, che a sua volta crea un numero di worker thread pari a:
*
* Runtime.getRuntime().availableProcessors()
*
* Il numero di worker thread è uguale al numero di core della CPU disponibile.
* Il pool utilizza un comportamento predefinito per la gestione degli I/O asincroni.
*
* Ulteriori spiegazioni in org.openspcoop2.pdd.core.connettori.httpcore5.nio.ConnettoreHTTPCOREConnectionManager e nella documentazione BIO/NIO
*/
/**
* thread lanciato in org.openspcoop2.pdd.core.connettori.httpcore5.ConnettoreHTTPCORE
* Viene utilizzato per inviare in streaming i messaggi SOAP tramite l'utilizzo di un PipedUnblockedStream
* Questa funzionalità per default non è attiva.
* Si attiva disabilitando l'utilizzo del MessageObjectEntity per spedire in streaming tramite la proprietà 'org.openspcoop2.pdd.connettori.syncClient.useCustomMessageObjectEntity'
*/
private static ExecutorService syncRequestPool;
/**
* thread lanciato in org.openspcoop2.pdd.services.connector.AbstractRicezioneConnectorAsync
* Viene utilizzato per sganciare subito il worker thread del connettore NIO del webserver, in modo che tutta la logica di gestione della richiesta avvenga su un thread applicativo,
* mentre il worker thread del connettore NIO può esssere utilizzato per accettare una nuova richiesta e/o consegnare una risposta pronta
* Questa funzionalità per default è attiva.
* Si gestisce tramite la proprietà 'org.openspcoop2.pdd.connettori.asyncRequest.stream'
*/
// erogazioni
private static String inRequestThreadPoolId = null;
public String getInRequestThreadPoolId() {
return inRequestThreadPoolId;
}
// fruizioni
private static String outRequestThreadPoolId = null;
public String getOutRequestThreadPoolId() {
return outRequestThreadPoolId;
}
/**
* thread lanciato in org.openspcoop2.pdd.core.connettori.httpcore5.nio.ConnettoreHTTPCOREInputStreamEntityConsumer
* Viene utilizzato per sganciare subito il worker thread del connettore NIO della libreria client, in modo che tutta la logica di gestione della risposta avvenga su un thread applicativo,
* mentre il worker thread del connettore NIO può esssere utilizzato per gestire l'invio di una nuova richiesta e/o ricevere una risposta
* Questa funzionalità per default è attiva.
* Si gestisce tramite la proprietà 'org.openspcoop2.pdd.connettori.asyncResponse.stream'
*/
// erogazioni
private static String outResponseThreadPoolId = null;
public String getOutResponseThreadPoolId() {
return outResponseThreadPoolId;
}
// fruizioni
private static String inResponseThreadPoolId = null;
public String getInResponseThreadPoolId() {
return inResponseThreadPoolId;
}
/** Pool Thread Asincroni */
private static Map<String, ExecutorService> asyncThreadPool = null;
public static void initialize(OpenSPCoop2Properties op2) {
if(!op2.isBIOConfigSyncClientUseCustomMessageObjectEntity()) {
int size = op2.getBIOConfigSyncClientApplicativeThreadPoolSize();
if(size>0) {
syncRequestPool = Executors.newFixedThreadPool(size, new ConnectorApplicativeThreadFactory("request-bio-nonblocking-io"));
}
}
if(op2.isNIOEnabled() &&
(op2.isNIOConfigAsyncRequestStreamEnabled() || op2.isNIOConfigAsyncResponseStreamEnabled())
){
ConnectorAsyncThreadPoolConfig poolConfig = op2.getNIOConfigAsyncThreadPoolConfig();
inRequestThreadPoolId = poolConfig.getInRequestThreadPoolId();
outRequestThreadPoolId = poolConfig.getOutRequestThreadPoolId();
inResponseThreadPoolId = poolConfig.getInResponseThreadPoolId();
outResponseThreadPoolId = poolConfig.getOutResponseThreadPoolId();
asyncThreadPool = new HashMap<>();
for (Map.Entry<String,Boolean> entry : poolConfig.getPoolVirtualThreadType().entrySet()) {
String poolName = entry.getKey();
boolean virtualThreads = entry.getValue();
if(virtualThreads) {
asyncThreadPool.put(poolName, new MonitoredVirtualThreadExecutor(poolName + "-worker-")); // non limita la concorrenza, perché i virtual threads sono leggeri (puoi averne migliaia senza problemi)
}
else {
int size = poolConfig.getPoolSize().get(poolName);
asyncThreadPool.put(poolName, Executors.newFixedThreadPool(size, new ConnectorApplicativeThreadFactory(poolName+"-worker"))); // limita la concorrenza a size thread fisici
}
}
}
}
public static void executeBySyncRequestPool(Runnable runnable) {
syncRequestPool.execute(runnable);
}
public static void executeByAsyncInRequestPool(Runnable runnable) {
asyncThreadPool.get(inRequestThreadPoolId).execute(runnable);
}
public static void executeByAsyncOutRequestPool(Runnable runnable) {
asyncThreadPool.get(outRequestThreadPoolId).execute(runnable);
}
public static void executeByAsyncInResponsePool(Runnable runnable) {
asyncThreadPool.get(inResponseThreadPoolId).execute(runnable);
}
public static void executeByAsyncOutResponsePool(Runnable runnable) {
asyncThreadPool.get(outResponseThreadPoolId).execute(runnable);
}
public static ExecutorService getSyncRequestPool() {
return syncRequestPool;
}
public static ExecutorService getAsyncInRequestPool() {
return asyncThreadPool.get(inRequestThreadPoolId);
}
public static ExecutorService getAsyncOutRequestPool() {
return asyncThreadPool.get(outRequestThreadPoolId);
}
public static ExecutorService getAsyncInResponsePool() {
return asyncThreadPool.get(inResponseThreadPoolId);
}
public static ExecutorService getAsyncOutResponsePool() {
return asyncThreadPool.get(outResponseThreadPoolId);
}
public static void shutdown() {
if(syncRequestPool!=null) {
syncRequestPool.shutdown();
}
for (Map.Entry<String,ExecutorService> entry : asyncThreadPool.entrySet()) {
if(entry.getValue()!=null) {
entry.getValue().shutdown();
}
}
}
public static String getSyncRequestPoolThreadsImage() {
if(syncRequestPool instanceof ThreadPoolExecutor tpe) {
return getThreadsImage(tpe);
}
return null;
}
public static boolean isSyncRequestPoolThreadsEnabled() {
return syncRequestPool!=null;
}
public static String getAsyncPoolThreadsImage() {
StringBuilder sb = new StringBuilder();
for (Map.Entry<String,ExecutorService> entry : asyncThreadPool.entrySet()) {
if(entry.getKey()!=null) {
fillAsyncPoolThreadsImage(entry, sb);
}
}
if(sb.length()>0) {
return sb.toString();
}
return null;
}
private static void fillAsyncPoolThreadsImage(Map.Entry<String,ExecutorService> entry, StringBuilder sb) {
if(entry.getValue() instanceof ThreadPoolExecutor tpe) {
if(sb.length()>0) {
sb.append("\n\n");
}
sb.append("[").append(entry.getKey()).append("] ").
append(getThreadsImage(tpe)).append(", virtualThreads:false");
}
else if(entry.getValue() instanceof MonitoredVirtualThreadExecutor tpe) {
if(sb.length()>0) {
sb.append("\n\n");
}
sb.append("[").append(entry.getKey()).append("] ").
append(tpe.getStatus()).append(", virtualThreads:true");
}
}
public static boolean isAsyncPoolThreadsEnabled() {
return asyncThreadPool!=null;
}
private static String getThreadsImage(ThreadPoolExecutor tpe) {
return
String.format("(queue:%d) [%d/%d] Active: %d, Completed: %d, Task: %d, isShutdown: %s, isTerminated: %s",
tpe.getQueue()!=null ? tpe.getQueue().size() : -1,
tpe.getPoolSize(),
tpe.getCorePoolSize(),
tpe.getActiveCount(),
tpe.getCompletedTaskCount(),
tpe.getTaskCount(),
tpe.isShutdown(),
tpe.isTerminated());
}
}
class ConnectorApplicativeThreadFactory implements ThreadFactory {
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
public ConnectorApplicativeThreadFactory(String namePrefix) {
this.namePrefix = namePrefix;
}
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName(this.namePrefix +"-" + this.threadNumber.getAndIncrement());
return thread;
}
}