AbstractRicezioneConnectorAsync.java
/*
* GovWay - A customizable API Gateway
* https://govway.org
*
* Copyright (c) 2005-2025 Link.it srl (https://link.it).
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License version 3, as published by
* the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
*/
package org.openspcoop2.pdd.services.connector;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.Date;
import jakarta.servlet.AsyncContext;
import jakarta.servlet.AsyncEvent;
import jakarta.servlet.AsyncListener;
import jakarta.servlet.ReadListener;
import jakarta.servlet.ServletException;
import jakarta.servlet.ServletInputStream;
import jakarta.servlet.http.HttpServletRequest;
import jakarta.servlet.http.HttpServletResponse;
import org.openspcoop2.message.exception.ParseException;
import org.openspcoop2.pdd.config.OpenSPCoop2Properties;
import org.openspcoop2.pdd.core.CostantiPdD;
import org.openspcoop2.pdd.core.PdDContext;
import org.openspcoop2.pdd.logger.OpenSPCoop2Logger;
import org.openspcoop2.pdd.services.connector.messages.ConnectorInMessage;
import org.openspcoop2.pdd.services.connector.messages.ConnectorOutMessage;
import org.openspcoop2.pdd.services.connector.messages.HttpServletConnectorAsyncInMessage;
import org.openspcoop2.pdd.services.connector.messages.HttpServletConnectorAsyncOutMessage;
import org.openspcoop2.pdd.services.connector.messages.HttpServletConnectorInMessage;
import org.openspcoop2.pdd.services.error.AbstractErrorGenerator;
import org.openspcoop2.pdd.services.service.IRicezioneService;
import org.openspcoop2.protocol.sdk.IProtocolFactory;
import org.openspcoop2.protocol.sdk.constants.CodiceErroreIntegrazione;
import org.openspcoop2.protocol.sdk.constants.ErroreIntegrazione;
import org.openspcoop2.protocol.sdk.constants.ErroriIntegrazione;
import org.openspcoop2.protocol.sdk.constants.IDService;
import org.openspcoop2.protocol.sdk.constants.IntegrationFunctionError;
import org.openspcoop2.protocol.sdk.state.RequestInfo;
import org.openspcoop2.utils.TimeoutIOException;
import org.openspcoop2.utils.Utilities;
import org.openspcoop2.utils.date.DateManager;
import org.openspcoop2.utils.io.notifier.unblocked.IPipedUnblockedStream;
import org.openspcoop2.utils.io.notifier.unblocked.PipedUnblockedStreamFactory;
import org.openspcoop2.utils.transport.http.HttpRequestMethod;
import org.slf4j.Logger;
/**
* AbstractRicezioneConnectorAsync
*
* @author Poli Andrea (apoli@link.it)
* @author Lorenzo Nardi (nardi@link.it)
* @author $Author$
* @version $Rev$, $Date$
*/
public abstract class AbstractRicezioneConnectorAsync {
protected abstract IDService getIdService();
protected abstract String getIdModulo();
protected abstract AbstractErrorGenerator getErrorGenerator(Logger logCore, RequestInfo requestInfo) throws ConnectorException;
protected abstract void doError(RequestInfo requestInfo, AbstractErrorGenerator generatoreErrore,
ErroreIntegrazione erroreIntegrazione, IntegrationFunctionError integrationFunctionError,
Throwable t, HttpServletResponse res, Logger logCore);
protected abstract ConnectorDispatcherErrorInfo doError(RequestInfo requestInfo, AbstractErrorGenerator generatoreErrore,
ErroreIntegrazione erroreIntegrazione, IntegrationFunctionError integrationFunctionError,
Throwable t, ParseException parseException,
ConnectorOutMessage res, Logger log, boolean clientError) throws ConnectorException;
protected abstract void emitTransaction(Logger logCore, ConnectorInMessage req, PdDContext pddContext, Date dataAccettazioneRichiesta, ConnectorDispatcherInfo info);
protected abstract long getTimeout();
protected abstract IRicezioneService newRicezioneService(AbstractErrorGenerator errorGenerator);
public void doEngine(RequestInfo requestInfo,
HttpServletRequest req, HttpServletResponse res, HttpRequestMethod method) throws ServletException, IOException {
Date dataAccettazioneRichiesta = DateManager.getDate();
Logger logCore = OpenSPCoop2Logger.getLoggerOpenSPCoopCore();
// Devo prima leggere l'API invocata per comprendere il service binding effettivo
if(method!=null) {
// nop
}
AbstractErrorGenerator generatoreErrore = null;
try{
generatoreErrore = getErrorGenerator(logCore, requestInfo);
}catch(Exception e){
String msg = "Inizializzazione Generatore Errore fallita: "+Utilities.readFirstErrorValidMessageFromException(e);
logCore.error(msg,e);
doError(requestInfo, generatoreErrore, // il metodo doError gestisce il generatoreErrore a null
ErroriIntegrazione.ERRORE_5XX_GENERICO_PROCESSAMENTO_MESSAGGIO.
get5XX_ErroreProcessamento(msg,CodiceErroreIntegrazione.CODICE_501_PDD_NON_INIZIALIZZATA),
IntegrationFunctionError.GOVWAY_NOT_INITIALIZED, e, res, logCore);
return;
}
AsyncContext ac = req.startAsync();
HttpServletConnectorAsyncInMessage httpIn = null;
try{
httpIn = new HttpServletConnectorAsyncInMessage(requestInfo, req, getIdService(), getIdModulo());
}catch(Exception e){
doError("HttpServletConnectorInMessage init error", e);
}
IProtocolFactory<?> protocolFactory = null;
try{
protocolFactory = httpIn.getProtocolFactory();
}catch(Exception e){
// ignore
}
HttpServletConnectorAsyncOutMessage httpOut = null;
try{
httpOut = new HttpServletConnectorAsyncOutMessage(requestInfo, protocolFactory, ac,
getIdService(), getIdModulo());
}catch(Exception e){
doError("HttpServletConnectorOutMessage init error", e);
}
OpenSPCoop2Properties op2Properties = OpenSPCoop2Properties.getInstance();
boolean stream = op2Properties.isNIOConfigAsyncRequestStreamEnabled();
int dimensioneBuffer = op2Properties.getNIOConfigAsyncRequestPipedUnblockedStreamBuffer();
long timeout = getTimeout();
req.getInputStream().setReadListener( (new ReadListener() {
private ServletInputStream is = null;
private Logger log;
private AbstractErrorGenerator generatoreErrore;
private HttpServletConnectorAsyncInMessage httpIn;
private HttpServletConnectorAsyncOutMessage httpOut;
private boolean stream;
private ByteArrayOutputStream os = null; // soluzione che bufferizza tutta la richiesta
private IPipedUnblockedStream pipe; // soluzione stream
public ReadListener init(Logger log, ServletInputStream is, boolean stream, int sizeBuffer,
AbstractErrorGenerator generatoreErrore,
HttpServletConnectorAsyncInMessage httpIn, HttpServletConnectorAsyncOutMessage httpOut ) throws ServletException {
this.is = is;
this.log = log;
this.generatoreErrore = generatoreErrore;
this.httpIn = httpIn;
this.httpOut = httpOut;
this.stream = stream;
if(stream) {
try {
this.pipe = PipedUnblockedStreamFactory.newPipedUnblockedStream(logCore, sizeBuffer,
IDService.PORTA_APPLICATIVA.equals(getIdService()) || IDService.PORTA_APPLICATIVA_NIO.equals(getIdService()) ?
CostantiPdD.CONNETTORE_READ_CONNECTION_TIMEOUT_CONSEGNA_CONTENUTI_APPLICATIVI :
CostantiPdD.CONNETTORE_READ_CONNECTION_TIMEOUT_INOLTRO_BUSTE, // verra' poi aggiornato dentro HttpServletConnectorAsyncInMessage
CostantiPdD.CONNETTORE_FASE_GESTIIONE_RICHIESTA);
}catch(Throwable t) {
this.log.error("Istanziazione PipedStream fallita: "+t.getMessage(),t);
throw new ServletException(t.getMessage(),t);
}
this.httpIn.updateInputStream(this.pipe);
}
else {
this.os = new ByteArrayOutputStream();
}
return this;
}
@Override
public void onError(Throwable t) {
String msg = "Avvenuto errore durante la lettura della richiesta: "+Utilities.readFirstErrorValidMessageFromException(t);
this.log.error(msg,t);
try {
ConnectorDispatcherErrorInfo cInfo = doError(requestInfo, this.generatoreErrore, // il metodo doError gestisce il generatoreErrore a null
ErroriIntegrazione.ERRORE_5XX_GENERICO_PROCESSAMENTO_MESSAGGIO.
get5XX_ErroreProcessamento(msg,CodiceErroreIntegrazione.CODICE_509_READ_REQUEST_MSG),
IntegrationFunctionError.INTERNAL_REQUEST_ERROR, t, null, this.httpOut, this.log, ConnectorDispatcherUtils.GENERAL_ERROR);
emitTransaction(this.log, this.httpIn, null, dataAccettazioneRichiesta, cInfo);
}catch(Exception e) {
this.log.error("Gestione errore fallita: "+e.getMessage(),e);
}finally {
ac.complete();
}
}
@Override
public void onDataAvailable() throws IOException {
int len = -1;
byte [] b = new byte[Utilities.DIMENSIONE_BUFFER];
while ( this.is.isReady() && (len = this.is.read(b)) != -1) {
if(this.stream) {
this.pipe.write(b, 0, len);
}
else {
this.os.write(b, 0, len);
}
}
}
@Override
public void onAllDataRead() throws IOException {
if(this.stream) {
this.pipe.close();
}
else {
this.os.flush();
this.os.close();
this.is.close();
// Avvio un thread su cui poi chiamare un wait / notify in fase di consegna NIO.
IRicezioneService ricezioneService = newRicezioneService(this.generatoreErrore);
this.httpIn.updateInputStream(new ByteArrayInputStream(this.os.toByteArray()));
try{
ricezioneService.process(this.httpIn, this.httpOut, dataAccettazioneRichiesta, ConnectorCostanti.ASYNC);
}catch(Throwable e){
ConnectorUtils.getErrorLog().error(getIdService().getValue()+".process error: "+e.getMessage(),e);
throw new IOException(getIdService().getValue()+".process error: "+e.getMessage(),e);
}
}
}
}).init(logCore, req.getInputStream(), stream, dimensioneBuffer,
generatoreErrore,
httpIn, httpOut) );
ac.addListener(new AsyncListener() {
private HttpServletConnectorAsyncOutMessage httpOut;
public AsyncListener init(HttpServletConnectorAsyncOutMessage httpOut) {
this.httpOut = httpOut;
return this;
}
@Override
public void onTimeout(AsyncEvent event) throws IOException {
if(event.getThrowable()!=null) {
this.httpOut.setNioException(new TimeoutIOException("Timeout '"+timeout+"' exceeded: "+event.getThrowable().getMessage(),event.getThrowable()));
}
else {
this.httpOut.setNioException(new TimeoutIOException("Timeout '"+timeout+"' exceeded"));
}
/**((HttpServletResponse)ac.getResponse()).setStatus(500);*/
}
@Override
public void onStartAsync(AsyncEvent event) throws IOException {
// nop
}
@Override
public void onError(AsyncEvent event) throws IOException {
if(event.getThrowable()!=null) {
this.httpOut.setNioException(new ConnectorException("Async IO error: "+event.getThrowable().getMessage(),event.getThrowable()));
}
else {
this.httpOut.setNioException(new ConnectorException("Async IO error"));
}
/**((HttpServletResponse)ac.getResponse()).setStatus(500);*/
}
@Override
public void onComplete(AsyncEvent event) throws IOException {
/**((HttpServletResponse)ac.getResponse()).setStatus(500);*/
}
}.init(httpOut));
ac.setTimeout(timeout);
if(stream) {
IRicezioneService ricezioneService = newRicezioneService(generatoreErrore);
Runnable runnable = new Runnable() {
private IRicezioneService ricezioneService;
private HttpServletConnectorInMessage httpIn;
private HttpServletConnectorAsyncOutMessage httpOut;
public Runnable init(IRicezioneService ricezioneService,
HttpServletConnectorInMessage httpIn,
HttpServletConnectorAsyncOutMessage httpOut) {
this.ricezioneService = ricezioneService;
this.httpIn = httpIn;
this.httpOut = httpOut;
return this;
}
@Override
public void run() {
try{
this.ricezioneService.process(this.httpIn, this.httpOut, dataAccettazioneRichiesta, ConnectorCostanti.ASYNC);
}catch(Throwable e){
ConnectorUtils.getErrorLog().error("NIO RicezioneBuste.process error: "+e.getMessage(),e);
}
}
}.init(ricezioneService, httpIn, httpOut);
boolean delegata = getIdService()!=null && getIdService().isPortaDelegata();
if(delegata) {
ConnectorApplicativeThreadPool.executeByAsyncOutRequestPool(runnable);
}
else {
ConnectorApplicativeThreadPool.executeByAsyncInRequestPool(runnable);
}
}
}
private void doError(String msg, Exception e) throws ServletException {
String msgError = msg+": "+e.getMessage();
ConnectorUtils.getErrorLog().error(msgError,e);
throw new ServletException(e.getMessage(),e);
}
}