PipedInputOutputStreamHandler.java

/*
 * GovWay - A customizable API Gateway 
 * https://govway.org
 * 
 * Copyright (c) 2005-2024 Link.it srl (https://link.it). 
 * 
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU General Public License version 3, as published by
 * the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
 *
 */
package org.openspcoop2.utils.io.notifier.unblocked;
import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

import org.slf4j.Logger;
import org.openspcoop2.utils.io.notifier.StreamingHandler;

/**
 * PipedInputOutputStreamHandler
 *
 * @author Poli Andrea (apoli@link.it)
 * @author $Author$
 * @version $Rev$, $Date$
 */
public class PipedInputOutputStreamHandler implements StreamingHandler {

	// Thread che si occupa di consumare i bytes disponibili forniti all'handler
	// Implementa anche la gestione dello stream tra producer e consumer
	private AbstractStreamingHandler streamingHandler;
	
	// Submit ritornato dell'esecuzione del thread
	private Future<ResultStreamingHandler> submitThreadExecution;
	
	// Risultato dell'esecuzione del thread
	private boolean retrieveResult = false;
	private ResultStreamingHandler resultThreadExecution;
	private String errorMessageThreadExecution;
	private Throwable exceptionThreadExecution;
	
	// Log
	private Logger log;
	
	// Esecutore del Thread
	private ExecutorService executor;
	
	// Informazione se lo stream is closed
	private boolean closed;
	
	// ID classe streamingHandler
	private String idStreamingHandler;
	
	public PipedInputOutputStreamHandler(String id, AbstractStreamingHandler streamingHandler, Logger log) throws Exception {
		
		this.log = log;
		
		//inizializzo la pipe di stream attraverso la quale eseguire la validazione
		//this.out = new PipedOutputStream();
		this.streamingHandler = streamingHandler;
		this.idStreamingHandler = id;
		
		// Creo esecutore del thread
		this.executor = Executors.newSingleThreadExecutor();
		
		// Avvio il thread
		this.submitThreadExecution = this.executor.submit(this.streamingHandler);
	}
	
	
	
	// ** Metodi interfaccia StreamingHandler **
	
	@Override
	public String getID(){
		return this.idStreamingHandler;
	}
	
	@Override
	public void feed(byte b) throws IOException {
		byte[]buffer = new byte[1];
		this.feed(buffer);
	}

	@Override
	public void feed(byte[] b) throws IOException {
		
		//System.out.println("@@PIPE@@ feed ["+b.length+"] bytes ...");
		
		try{
			if(!this.closed) {
				// in.isPrematureEnd() ci dice se il processo attuato dallo streaming handler e' gia' terminato
				// b == -1 significa che lo stream รจ finito
				if(this.streamingHandler.isPrematureEnd()){
					//System.out.println("@@PIPE@@ feed ["+b.length+"] bytes: END");
					this.end();
				}else {
					//System.out.println("@@PIPE@@ feed ["+b.length+"] bytes: WRITE");
					this.streamingHandler.write(b);
				}
			}
		}catch(Throwable e){
			this.log.error("["+this.idStreamingHandler+"] feed error",e);
			throw new IOException(e.getMessage());
		}
	}
	
	@Override
	public void end() throws IOException {
		
		//System.out.println("@@PIPE@@ END");
		closeResources();
		
	}
		
	@Override
	public void closeResources() throws IOException {
		
		//System.out.println("@@PIPE@@ CLOSE RESOURCES ...");
		IOException ioException = null;
		
		if(!this.closed) {
		
			try {
			
				// Chiudo input stream
				this.streamingHandler.close();
				
				// Fermo esecutore del thread
				this.executor.shutdown();				
				try {
	
					 if (!this.executor.awaitTermination(20, TimeUnit.SECONDS)) {
						 this.executor.shutdownNow();
					 }
				 } catch (InterruptedException pCaught) {	
					 this.executor.shutdownNow();
					 Thread.currentThread().interrupt();	
				 }
								
			}catch(Throwable e){
				if(e!=null && e instanceof InterruptedException) {
					Thread.currentThread().interrupt();	
				}
				this.log.error("["+this.idStreamingHandler+"] end error",e);
				ioException = new IOException(e.getMessage());
			}
		}
		
		//System.out.println("@@PIPE@@ RETRIVE RESULT ...");
		retrieveResult();
		//System.out.println("@@PIPE@@ RETRIVED RESULT");
		
		//System.out.println("@@PIPE@@ Release resource...");
		this.releaseResource(); 
		//System.out.println("@@PIPE@@ Release resource ok");
		
		if(ioException!=null){
			throw ioException;
		}
		
	}

	private void retrieveResult() {
		
		if(this.retrieveResult == false){
			try {
								
				// Recupero risultato
				if(this.submitThreadExecution!=null){
					//System.out.println("@@PIPE@@ RETRIVE RESULT submitThreadExecution.get()...");
					this.resultThreadExecution = this.submitThreadExecution.get();
					//System.out.println("@@PIPE@@ RETRIVE RESULT submitThreadExecution.get() ok");
				}
				
				// Recupero eventuali errori non lanciati nella exception del metodo call() (Utile negli scenari in cui l'errore viene capito in streaming all'interno del thread)
				this.errorMessageThreadExecution = this.streamingHandler.getError();
				this.exceptionThreadExecution = this.streamingHandler.getException();
								
			}catch(Throwable e){
				
				if(e !=null && e instanceof InterruptedException) {
					Thread.currentThread().interrupt();
				}
				
				// Errori generati da this.submitThreadExecution.get()
				// Cosi' facendo recupero eventuali errori lanciati nella exception del metodo call()
				this.errorMessageThreadExecution = e.getMessage();
				this.exceptionThreadExecution = e;
				
				this.log.error("["+this.idStreamingHandler+"] end error",e);
				// Non devo rilanciare gli errori, la logica e' rimandata a chi usa gli handler.
				// Devo solo salvare gli errori
				// throw new IOException(e.getMessage());
			}
			finally{
				// Risultati letti
				this.retrieveResult = true;
			}
		}
	}
	
	private void releaseResource() throws IOException{
		try {
			if(!this.closed) {
				//this.out = null;
				this.streamingHandler.close();
				this.streamingHandler = null;
				this.submitThreadExecution = null;
				this.closed = true;
			}
		}catch(Exception e){
			this.log.error("["+this.idStreamingHandler+"] closeResources error",e);
			throw new IOException(e.getMessage());
		}
	}
	
	
	
	
	
	// ** Metodi Recuperare informazioni elaborate dallo streaming handler **
	
	private void finalizeResult() throws IOException{
		// La close puo' non essere ancora stata effettuato:
		// - puo' succedere che non sia stato chiamato l'end perche' il messaggio e' piccolo e viene gestito in una unica feed.
		// - inoltre il metodo closeResources puo' essere invocato alla fine di tutto (es. dopo aver ritornato la risposta)
		if(!this.closed) {
			closeResources();
		}
	}
	
	public ResultStreamingHandler getResult() throws IOException {
		this.finalizeResult();
		return this.resultThreadExecution;
	}

	public String getError() throws IOException {
		this.finalizeResult();
		return this.errorMessageThreadExecution;
	}
	
	public Throwable getException() throws IOException {
		this.finalizeResult();
		return this.exceptionThreadExecution;
	}

	



}