ConnettoreHTTPCOREInputStreamEntityConsumer.java

/*
 * GovWay - A customizable API Gateway 
 * https://govway.org
 * 
 * Copyright (c) 2005-2025 Link.it srl (https://link.it). 
 * 
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU General Public License version 3, as published by
 * the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
 *
 */
package org.openspcoop2.pdd.core.connettori.httpcore5.nio;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;

import org.apache.hc.core5.concurrent.FutureCallback;
import org.apache.hc.core5.http.ContentType;
import org.apache.hc.core5.http.EntityDetails;
import org.apache.hc.core5.http.Header;
import org.apache.hc.core5.http.HttpException;
import org.apache.hc.core5.http.HttpResponse;
import org.apache.hc.core5.http.io.entity.InputStreamEntity;
import org.apache.hc.core5.http.nio.AsyncResponseConsumer;
import org.apache.hc.core5.http.nio.CapacityChannel;
import org.apache.hc.core5.http.protocol.HttpContext;
import org.openspcoop2.pdd.core.connettori.ConnettoreLogger;
import org.openspcoop2.pdd.services.connector.ConnectorApplicativeThreadPool;
import org.openspcoop2.utils.Utilities;
import org.openspcoop2.utils.UtilsRuntimeException;
import org.openspcoop2.utils.io.notifier.unblocked.IPipedUnblockedStream;
import org.openspcoop2.utils.io.notifier.unblocked.PipedUnblockedStreamFactory;
import org.openspcoop2.utils.transport.http.HttpRequestMethod;

/**
 * ConnettoreHTTPCOREInputStreamEntityConsumer
 *
 * @author Andrea Poli (apoli@link.it)
 * @author $Author$
 * @version $Rev$, $Date$
 */
public class ConnettoreHTTPCOREInputStreamEntityConsumer implements AsyncResponseConsumer<ConnettoreHTTPCOREResponse>{

	private ConnettoreHTTPCOREResponse res = null;
	private ContentType ct = null;
	private IPipedUnblockedStream stream = null;
	private FutureCallback<ConnettoreHTTPCOREResponse> callback;
	private long count = 0;
	private boolean complete = false;
	private boolean delegata;

	private ConnettoreLogger logger;
	private int sizeBuffer;
	private Integer readTimeout;
	
	private final HttpRequestMethod method;
	
	public ConnettoreHTTPCOREInputStreamEntityConsumer(ConnettoreLogger logger, int sizeBuffer, int readTimeout, boolean delegata) {
		this.logger = logger;
		this.sizeBuffer = sizeBuffer;
		this.readTimeout = readTimeout;
		this.delegata = delegata;
		this.method = null;
	}
	
	public ConnettoreHTTPCOREInputStreamEntityConsumer(HttpRequestMethod method, ConnettoreLogger logger, int sizeBuffer, int readTimeout, boolean delegata) {
		this.logger = logger;
		this.sizeBuffer = sizeBuffer;
		this.readTimeout = readTimeout;
		this.delegata = delegata;
		this.method = method;
	}
	
	private void invokeCallback() {
		if(this.callback!=null) {
			Runnable runnable = new Runnable() {
				
				private FutureCallback<ConnettoreHTTPCOREResponse> callback;
				private ConnettoreHTTPCOREResponse res = null;
			
				public Runnable init(FutureCallback<ConnettoreHTTPCOREResponse> callback, ConnettoreHTTPCOREResponse res) {
					this.callback = callback;
					this.res = res;
					return this;
				}
				
				@Override
				public void run() {
					
					this.callback.completed(this.res);
				}
			}.init(this.callback, this.res);
			/**System.out.println("ESEGUO!");*/
			if(this.delegata) {
				ConnectorApplicativeThreadPool.executeByAsyncInResponsePool(runnable);
			}
			else {
				ConnectorApplicativeThreadPool.executeByAsyncOutResponsePool(runnable);
			}

			this.callback = null;
		}
	}
	
	@Override
	// Triggered to signal receipt of an intermediate (1xx) HTTP response
	public void informationResponse(HttpResponse res, HttpContext context) throws HttpException, IOException {
		/**System.out.println("======== informationResponse");*/
	}
	
	@Override
	// Triggered to signal receipt of a response message head.
	public void consumeResponse(HttpResponse res, EntityDetails entityDetails, HttpContext context,
			FutureCallback<ConnettoreHTTPCOREResponse> callback) throws HttpException, IOException {
		/**System.out.println("======== consumeResponse");*/
		if(entityDetails!=null && entityDetails.getContentType()!=null) {
			this.ct = ContentType.parse(entityDetails.getContentType());
		}
		this.res = new ConnettoreHTTPCOREResponse(res);
		this.callback = callback;
		
		if (this.method != null && this.method.equals(HttpRequestMethod.HEAD))
			invokeCallback();
	}
	
	@Override
	// Triggered to pass incoming data to the data consumer
	public void consume(ByteBuffer bb) throws IOException {
		if(bb!=null && bb.remaining()>0) {
			/**System.out.println("======== consume: "+ bb.remaining());*/
			
			if(this.stream==null) {
				this.stream = PipedUnblockedStreamFactory.newPipedUnblockedStream(this.logger.getLogger(), this.sizeBuffer, this.readTimeout, "Response");
				this.res.setEntity(new InputStreamEntity(this.stream, this.ct));
			}
			
			if(this.callback!=null) {
				/**System.out.println("invoco la callback tramite CONSUME");*/
				invokeCallback();
			}
			
			/**while (bb.hasRemaining()) {
				this.stream.write(bb.get());
			}*/
			while (bb.remaining()>0) {
				byte[] buf = new byte[bb.remaining()];
				bb.get(buf);
				this.stream.write(buf);
				this.count=this.count+buf.length;
			}
		}
	}
	
	@Override
	// Triggered to signal ability of the underlying data stream to receive data capacity update
	public void updateCapacity(CapacityChannel channel) throws IOException {
		/**System.out.println("======== updateCapacity");*/
		channel.update(Utilities.DIMENSIONE_BUFFER);
	}

	private void streamEnd() {
		this.complete = true;
		if(this.stream!=null) {
			try {
				this.stream.close();
				this.stream = null;
			}catch(Exception e) {
				throw new UtilsRuntimeException(e.getMessage(),e);
			}
		}
	}
	
	@Override
	// Triggered to signal termination of the data stream.
	public void streamEnd(List<? extends Header> list) throws HttpException, IOException {
		/**System.out.println("======== streamEnd: " + this.count);*/
		if(this.callback!=null) {
			/**System.out.println("invoco la callback via END");*/
			/**this.callback.completed(this.res);*/
			invokeCallback();
		}
		
		if(this.count>0 && this.res!=null) {
			this.res.setCount(this.count);
		}

		streamEnd();
	}

	@Override
	public void releaseResources() {
		/**System.out.println("======== releaseResources (RISPOSTA) (scritti: "+this.count+") ("+Utilities.convertBytesToFormatString(this.count)+")");*/
		streamEnd();
	}

	@Override
	// Triggered to signal a failure in data processing
	public void failed(Exception exception) {
		
		/*
		 * Viene chiamato anche quando la connessione viene rilasciata al pool. 
		 * Non deve essere sollevata l'eccezione se la risposta รจ stata correttamente gestita
		 * Esempio di stacktrace:
			java.io.InterruptedIOException
			at deployment.govway.ear//org.apache.hc.client5.http.impl.async.HttpAsyncMainClientExec$1.cancel(HttpAsyncMainClientExec.java:129)
			at deployment.govway.ear//org.apache.hc.client5.http.impl.async.InternalHttpAsyncExecRuntime$3.cancel(InternalHttpAsyncExecRuntime.java:267)
			at deployment.govway.ear//org.apache.hc.core5.concurrent.ComplexFuture.setDependency(ComplexFuture.java:55)
			   ...
			at deployment.govway.ear//org.apache.hc.client5.http.impl.async.InternalHttpAsyncExecRuntime$1.completed(InternalHttpAsyncExecRuntime.java:114)
			 ....
			at deployment.govway.ear//org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManager$1.leaseCompleted(PoolingAsyncClientConnectionManager.java:240)
			at deployment.govway.ear//org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManager$1.completed(PoolingAsyncClientConnectionManager.java:277)
			at deployment.govway.ear//org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManager$1.completed(PoolingAsyncClientConnectionManager.java:226)
			at deployment.govway.ear//org.apache.hc.core5.concurrent.BasicFuture.completed(BasicFuture.java:123)
			at deployment.govway.ear//org.apache.hc.core5.pool.LaxConnPool$PerRoutePool.lease(LaxConnPool.java:496)
			at deployment.govway.ear//org.apache.hc.core5.pool.LaxConnPool.lease(LaxConnPool.java:165)
			at deployment.govway.ear//org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManager.lease(PoolingAsyncClientConnectionManager.java:225)
			at deployment.govway.ear//org.apache.hc.client5.http.impl.async.InternalHttpAsyncExecRuntime.acquireEndpoint(InternalHttpAsyncExecRuntime.java:100)
			.....
			at deployment.govway.ear//org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient.execute(CloseableHttpAsyncClient.java:107)
			at deployment.govway.ear//org.openspcoop2.pdd.core.connettori.nio.ConnettoreHTTPCORE5.sendHTTP(ConnettoreHTTPCORE5.java:505)
		 **/
		
		if(!this.complete) {
		
			/**
			 * Questa eccezione deve essere gestita chiamando la callback se non ancora chiamata o sollevandola sull'input stream altrimenti 
			 * */
			this.logger.error("======== exception: " + this.stream + " - " + this.count+" : "+exception.getMessage(),exception);
		}
	}

}