HttpServletConnectorAsyncOutMessage.java

/*
 * GovWay - A customizable API Gateway 
 * https://govway.org
 * 
 * Copyright (c) 2005-2026 Link.it srl (https://link.it). 
 * 
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU General Public License version 3, as published by
 * the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
 *
 */
package org.openspcoop2.pdd.services.connector.messages;

import java.io.IOException;
import java.io.OutputStream;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

import org.openspcoop2.message.OpenSPCoop2Message;
import org.openspcoop2.message.exception.MessageException;
import org.openspcoop2.pdd.logger.OpenSPCoop2Logger;
import org.openspcoop2.pdd.services.connector.AsyncResponseCallbackClientEvent;
import org.openspcoop2.pdd.services.connector.ConnectorException;
import org.openspcoop2.protocol.sdk.IProtocolFactory;
import org.openspcoop2.protocol.sdk.constants.IDService;
import org.openspcoop2.protocol.sdk.state.RequestInfo;
import org.openspcoop2.utils.io.DumpByteArrayOutputStream;

import jakarta.servlet.AsyncContext;
import jakarta.servlet.WriteListener;
import jakarta.servlet.http.HttpServletResponse;

/**
 * HttpServletConnectorOutMessage
 *
 * @author Andrea Poli (apoli@link.it)
 * @author $Author$
 * @version $Rev$, $Date$
 */
public class HttpServletConnectorAsyncOutMessage extends HttpServletConnectorOutMessage {

	protected AsyncContext asyncContext;	
	private CompletableFuture<Boolean> asyncWriteTask = null;
	private boolean flowStream = false;
	
	public HttpServletConnectorAsyncOutMessage(RequestInfo requestInfo,IProtocolFactory<?> protocolFactory, AsyncContext ac,
			IDService idModuloAsIDService, String idModulo) throws ConnectorException{
		super(requestInfo, protocolFactory, getHttpServletResponse(ac), idModuloAsIDService, idModulo);
		this.asyncContext = ac;
	}
	public static HttpServletResponse getHttpServletResponse(AsyncContext ac) {
		return (HttpServletResponse) ac.getResponse();
	}

	private Throwable nioException;
	public void setNioException(Throwable nioException) {
		this.nioException = nioException;
	}
		
	@Override
	protected void writeTo(OutputStream out, OpenSPCoop2Message msg, boolean consume) throws MessageException {
		try {
			this.asyncWriteTask = new CompletableFuture<>();
			if ( this.flowStream ) {
				this.res.getOutputStream().setWriteListener( new WriteListener() {
					
					@Override
					public void onWritePossible() throws IOException {
						try {
							msg.writeTo( out, consume );
							HttpServletConnectorAsyncOutMessage.this.asyncWriteTask.complete( true );
						} catch ( MessageException e ) {
							/**OpenSPCoop2Logger.getLoggerOpenSPCoopCore().error("Errore durante la scrittura della risposta asincrona: " + e.getMessage(), e);*/
							throw new IOException( e.getMessage(),e );
						}
					}
					
					@Override
					public void onError( Throwable t ) {
						HttpServletConnectorAsyncOutMessage.this.asyncWriteTask.complete( false );
						OpenSPCoop2Logger.getLoggerOpenSPCoopCore().error("Errore durante la consegna della risposta asincrona: " + t.getMessage(), t);
				        HttpServletConnectorAsyncOutMessage.this.asyncContext.complete();
					}
				});
			} else {
				super.writeTo(out, msg, consume);
			}
		} catch (Exception e) {
			doMessageErrorWriteTo(e);
		} finally {
			if ( !this.flowStream ) {
				this.asyncWriteTask.complete( true );
			}
		}
	}
	private void doMessageErrorWriteTo(Exception e) throws MessageException {
		OpenSPCoop2Logger.getLoggerOpenSPCoopCore().error("Errore durante la scrittura della risposta asincrona: " + e.getMessage(), e);
		throw new MessageException( e.getMessage(),e );
	}
	
	@Override
	public void sendResponse(DumpByteArrayOutputStream message) throws ConnectorException{
		try {
			this.asyncWriteTask = new CompletableFuture<>();
			HttpServletConnectorOutMessage httpServletConnectorOutMessage = this;
			if ( this.flowStream ) {
				this.res.getOutputStream().setWriteListener( new WriteListener() {
					
					@Override
					public void onWritePossible() throws IOException {
						try {
							httpServletConnectorOutMessage.sendResponseByBuffer(message);
							HttpServletConnectorAsyncOutMessage.this.asyncWriteTask.complete( true );
						} catch ( ConnectorException e ) {
							/**OpenSPCoop2Logger.getLoggerOpenSPCoopCore().error("Errore durante la scrittura della risposta asincrona (DumpByteArrayOutputStream): " + e.getMessage(), e);*/
							throw new IOException( e.getMessage(),e );
						}
					}
					
					@Override
					public void onError( Throwable t ) {
						HttpServletConnectorAsyncOutMessage.this.asyncWriteTask.complete( false );
						OpenSPCoop2Logger.getLoggerOpenSPCoopCore().error("Errore durante la consegna della risposta asincrona (DumpByteArrayOutputStream): " + t.getMessage(), t);
				        HttpServletConnectorAsyncOutMessage.this.asyncContext.complete();
					}
				});
			} else {
				httpServletConnectorOutMessage.sendResponseByBuffer(message);
			}
		} catch (Exception e) {
			doConnectionErrorWriteTo(e);
		} finally {
			if ( !this.flowStream ) {
				this.asyncWriteTask.complete( true );
			}
		}
	}
	private void doConnectionErrorWriteTo(Exception e) throws ConnectorException {
		OpenSPCoop2Logger.getLoggerOpenSPCoopCore().error("Errore durante la scrittura della risposta asincrona (DumpByteArrayOutputStream): " + e.getMessage(), e);
		throw new ConnectorException( e.getMessage(),e );
	}
	
	
	
	/**
	private void asyncWrite(OutputStream os, byte[] content) {
		jakarta.servlet.ServletOutputStream servletOutputStream = (jakarta.servlet.ServletOutputStream) os;
		AsyncWriteListener writeListener = new AsyncWriteListener(this._ac, content, 
				servletOutputStream, this.protocolFactory.getLogger());
		servletOutputStream.setWriteListener(writeListener);
	}
	*/
	
	@Override
	public void flush(boolean throwException) throws ConnectorException {
		try {
			if ( this.asyncWriteTask != null ) {
				boolean taskRes = this.asyncWriteTask.get();
				this.asyncWriteTask = null;
				if(!taskRes && throwException) {
					throw new ConnectorException("Response write uncomplete (flush)?");
				}
			}
		} catch (InterruptedException | ExecutionException e) {
			if(throwException) {
				Thread.currentThread().interrupt();
				throw new ConnectorException(e.getMessage(),e);
			}
		}
		if(this.nioException!=null && throwException) {
			throw new ConnectorException(this.nioException.getMessage(),this.nioException);
		}
		
		super.flush(throwException);
	}
	
	@Override
	public void close(AsyncResponseCallbackClientEvent clientEvent, boolean throwException) throws ConnectorException{
		
		super.close(clientEvent, throwException);
		
		try{
			if ( this.asyncWriteTask != null ) {
				boolean taskRes = this.asyncWriteTask.get();
				this.asyncWriteTask = null;
				if(!taskRes && throwException) {
					throw new ConnectorException("Response write uncomplete (close)?");
				}
			}
			
			if(this.asyncContext!=null) {
				asyncContextCompleteSafe(throwException);
			}
		}catch(Exception e){
			Thread.currentThread().interrupt();
			throw new ConnectorException(e.getMessage(),e);
		}	
	}
	private void asyncContextCompleteSafe(boolean throwException) {
		try{
			/**if(!AsyncResponseCallbackClientEvent.FAILED.equals(clientEvent)) {*/ 
			this.asyncContext.complete();
		}catch(Exception e){
			if(throwException){
				throw e;
			}
		}
	}
}

/**
class AsyncWriteListener implements jakarta.servlet.WriteListener {
	
	private AsyncContext ac;
	private int index = 0;
	private byte[] content;
	private jakarta.servlet.ServletOutputStream servletOutputStream;
	private org.slf4j.Logger log;
	
	public AsyncWriteListener(AsyncContext ac, byte[] content, jakarta.servlet.ServletOutputStream servletOutputStream, org.slf4j.Logger log){
		this.ac = ac;
		this.content = content;
		this.servletOutputStream = servletOutputStream;
		this.log = log;
	}
	
	@Override
	public void onWritePossible() throws java.io.IOException {
        while ( this.index < this.content.length && this.servletOutputStream.isReady() ) {
        	int toBeWrite = this.content.length - this.index;
        	if ( toBeWrite > 1024 )
        		toBeWrite = 1024;
        	this.servletOutputStream.write(this.content, this.index, toBeWrite);
        	this.index += toBeWrite;
        }
		if ( this.index >= this.content.length ) {
			this.ac.complete();
		}
	}

	@Override
	public void onError(Throwable t) {
        this.log.error( "Errore durante la consegna della risposta asincrona: "+t.getMessage(), t );
        this.ac.complete();
	}
}
*/