PipedInputOutputStreamHandler.java
/*
* GovWay - A customizable API Gateway
* https://govway.org
*
* Copyright (c) 2005-2024 Link.it srl (https://link.it).
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License version 3, as published by
* the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
*/
package org.openspcoop2.utils.io.notifier.unblocked;
import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.openspcoop2.utils.io.notifier.StreamingHandler;
/**
* PipedInputOutputStreamHandler
*
* @author Poli Andrea (apoli@link.it)
* @author $Author$
* @version $Rev$, $Date$
*/
public class PipedInputOutputStreamHandler implements StreamingHandler {
// Thread che si occupa di consumare i bytes disponibili forniti all'handler
// Implementa anche la gestione dello stream tra producer e consumer
private AbstractStreamingHandler streamingHandler;
// Submit ritornato dell'esecuzione del thread
private Future<ResultStreamingHandler> submitThreadExecution;
// Risultato dell'esecuzione del thread
private boolean retrieveResult = false;
private ResultStreamingHandler resultThreadExecution;
private String errorMessageThreadExecution;
private Throwable exceptionThreadExecution;
// Log
private Logger log;
// Esecutore del Thread
private ExecutorService executor;
// Informazione se lo stream is closed
private boolean closed;
// ID classe streamingHandler
private String idStreamingHandler;
public PipedInputOutputStreamHandler(String id, AbstractStreamingHandler streamingHandler, Logger log) throws Exception {
this.log = log;
//inizializzo la pipe di stream attraverso la quale eseguire la validazione
//this.out = new PipedOutputStream();
this.streamingHandler = streamingHandler;
this.idStreamingHandler = id;
// Creo esecutore del thread
this.executor = Executors.newSingleThreadExecutor();
// Avvio il thread
this.submitThreadExecution = this.executor.submit(this.streamingHandler);
}
// ** Metodi interfaccia StreamingHandler **
@Override
public String getID(){
return this.idStreamingHandler;
}
@Override
public void feed(byte b) throws IOException {
byte[]buffer = new byte[1];
this.feed(buffer);
}
@Override
public void feed(byte[] b) throws IOException {
//System.out.println("@@PIPE@@ feed ["+b.length+"] bytes ...");
try{
if(!this.closed) {
// in.isPrematureEnd() ci dice se il processo attuato dallo streaming handler e' gia' terminato
// b == -1 significa che lo stream รจ finito
if(this.streamingHandler.isPrematureEnd()){
//System.out.println("@@PIPE@@ feed ["+b.length+"] bytes: END");
this.end();
}else {
//System.out.println("@@PIPE@@ feed ["+b.length+"] bytes: WRITE");
this.streamingHandler.write(b);
}
}
}catch(Throwable e){
this.log.error("["+this.idStreamingHandler+"] feed error",e);
throw new IOException(e.getMessage());
}
}
@Override
public void end() throws IOException {
//System.out.println("@@PIPE@@ END");
closeResources();
}
@Override
public void closeResources() throws IOException {
//System.out.println("@@PIPE@@ CLOSE RESOURCES ...");
IOException ioException = null;
if(!this.closed) {
try {
// Chiudo input stream
this.streamingHandler.close();
// Fermo esecutore del thread
this.executor.shutdown();
try {
if (!this.executor.awaitTermination(20, TimeUnit.SECONDS)) {
this.executor.shutdownNow();
}
} catch (InterruptedException pCaught) {
this.executor.shutdownNow();
Thread.currentThread().interrupt();
}
}catch(Throwable e){
if(e!=null && e instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
this.log.error("["+this.idStreamingHandler+"] end error",e);
ioException = new IOException(e.getMessage());
}
}
//System.out.println("@@PIPE@@ RETRIVE RESULT ...");
retrieveResult();
//System.out.println("@@PIPE@@ RETRIVED RESULT");
//System.out.println("@@PIPE@@ Release resource...");
this.releaseResource();
//System.out.println("@@PIPE@@ Release resource ok");
if(ioException!=null){
throw ioException;
}
}
private void retrieveResult() {
if(this.retrieveResult == false){
try {
// Recupero risultato
if(this.submitThreadExecution!=null){
//System.out.println("@@PIPE@@ RETRIVE RESULT submitThreadExecution.get()...");
this.resultThreadExecution = this.submitThreadExecution.get();
//System.out.println("@@PIPE@@ RETRIVE RESULT submitThreadExecution.get() ok");
}
// Recupero eventuali errori non lanciati nella exception del metodo call() (Utile negli scenari in cui l'errore viene capito in streaming all'interno del thread)
this.errorMessageThreadExecution = this.streamingHandler.getError();
this.exceptionThreadExecution = this.streamingHandler.getException();
}catch(Throwable e){
if(e !=null && e instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
// Errori generati da this.submitThreadExecution.get()
// Cosi' facendo recupero eventuali errori lanciati nella exception del metodo call()
this.errorMessageThreadExecution = e.getMessage();
this.exceptionThreadExecution = e;
this.log.error("["+this.idStreamingHandler+"] end error",e);
// Non devo rilanciare gli errori, la logica e' rimandata a chi usa gli handler.
// Devo solo salvare gli errori
// throw new IOException(e.getMessage());
}
finally{
// Risultati letti
this.retrieveResult = true;
}
}
}
private void releaseResource() throws IOException{
try {
if(!this.closed) {
//this.out = null;
this.streamingHandler.close();
this.streamingHandler = null;
this.submitThreadExecution = null;
this.closed = true;
}
}catch(Exception e){
this.log.error("["+this.idStreamingHandler+"] closeResources error",e);
throw new IOException(e.getMessage());
}
}
// ** Metodi Recuperare informazioni elaborate dallo streaming handler **
private void finalizeResult() throws IOException{
// La close puo' non essere ancora stata effettuato:
// - puo' succedere che non sia stato chiamato l'end perche' il messaggio e' piccolo e viene gestito in una unica feed.
// - inoltre il metodo closeResources puo' essere invocato alla fine di tutto (es. dopo aver ritornato la risposta)
if(!this.closed) {
closeResources();
}
}
public ResultStreamingHandler getResult() throws IOException {
this.finalizeResult();
return this.resultThreadExecution;
}
public String getError() throws IOException {
this.finalizeResult();
return this.errorMessageThreadExecution;
}
public Throwable getException() throws IOException {
this.finalizeResult();
return this.exceptionThreadExecution;
}
}