PipedInputOutputStreamHandler.java

  1. /*
  2.  * GovWay - A customizable API Gateway
  3.  * https://govway.org
  4.  *
  5.  * Copyright (c) 2005-2025 Link.it srl (https://link.it).
  6.  *
  7.  * This program is free software: you can redistribute it and/or modify
  8.  * it under the terms of the GNU General Public License version 3, as published by
  9.  * the Free Software Foundation.
  10.  *
  11.  * This program is distributed in the hope that it will be useful,
  12.  * but WITHOUT ANY WARRANTY; without even the implied warranty of
  13.  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  14.  * GNU General Public License for more details.
  15.  *
  16.  * You should have received a copy of the GNU General Public License
  17.  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
  18.  *
  19.  */
  20. package org.openspcoop2.utils.io.notifier.unblocked;
  21. import java.io.IOException;
  22. import java.util.concurrent.ExecutorService;
  23. import java.util.concurrent.Executors;
  24. import java.util.concurrent.Future;
  25. import java.util.concurrent.TimeUnit;

  26. import org.slf4j.Logger;
  27. import org.openspcoop2.utils.io.notifier.StreamingHandler;

  28. /**
  29.  * PipedInputOutputStreamHandler
  30.  *
  31.  * @author Poli Andrea (apoli@link.it)
  32.  * @author $Author$
  33.  * @version $Rev$, $Date$
  34.  */
  35. public class PipedInputOutputStreamHandler implements StreamingHandler {

  36.     // Thread che si occupa di consumare i bytes disponibili forniti all'handler
  37.     // Implementa anche la gestione dello stream tra producer e consumer
  38.     private AbstractStreamingHandler streamingHandler;
  39.    
  40.     // Submit ritornato dell'esecuzione del thread
  41.     private Future<ResultStreamingHandler> submitThreadExecution;
  42.    
  43.     // Risultato dell'esecuzione del thread
  44.     private boolean retrieveResult = false;
  45.     private ResultStreamingHandler resultThreadExecution;
  46.     private String errorMessageThreadExecution;
  47.     private Throwable exceptionThreadExecution;
  48.    
  49.     // Log
  50.     private Logger log;
  51.    
  52.     // Esecutore del Thread
  53.     private ExecutorService executor;
  54.    
  55.     // Informazione se lo stream is closed
  56.     private boolean closed;
  57.    
  58.     // ID classe streamingHandler
  59.     private String idStreamingHandler;
  60.    
  61.     public PipedInputOutputStreamHandler(String id, AbstractStreamingHandler streamingHandler, Logger log) throws Exception {
  62.        
  63.         this.log = log;
  64.        
  65.         //inizializzo la pipe di stream attraverso la quale eseguire la validazione
  66.         //this.out = new PipedOutputStream();
  67.         this.streamingHandler = streamingHandler;
  68.         this.idStreamingHandler = id;
  69.        
  70.         // Creo esecutore del thread
  71.         this.executor = Executors.newSingleThreadExecutor();
  72.        
  73.         // Avvio il thread
  74.         this.submitThreadExecution = this.executor.submit(this.streamingHandler);
  75.     }
  76.    
  77.    
  78.    
  79.     // ** Metodi interfaccia StreamingHandler **
  80.    
  81.     @Override
  82.     public String getID(){
  83.         return this.idStreamingHandler;
  84.     }
  85.    
  86.     @Override
  87.     public void feed(byte b) throws IOException {
  88.         byte[]buffer = new byte[1];
  89.         this.feed(buffer);
  90.     }

  91.     @Override
  92.     public void feed(byte[] b) throws IOException {
  93.        
  94.         //System.out.println("@@PIPE@@ feed ["+b.length+"] bytes ...");
  95.        
  96.         try{
  97.             if(!this.closed) {
  98.                 // in.isPrematureEnd() ci dice se il processo attuato dallo streaming handler e' gia' terminato
  99.                 // b == -1 significa che lo stream รจ finito
  100.                 if(this.streamingHandler.isPrematureEnd()){
  101.                     //System.out.println("@@PIPE@@ feed ["+b.length+"] bytes: END");
  102.                     this.end();
  103.                 }else {
  104.                     //System.out.println("@@PIPE@@ feed ["+b.length+"] bytes: WRITE");
  105.                     this.streamingHandler.write(b);
  106.                 }
  107.             }
  108.         }catch(Throwable e){
  109.             this.log.error("["+this.idStreamingHandler+"] feed error",e);
  110.             throw new IOException(e.getMessage());
  111.         }
  112.     }
  113.    
  114.     @Override
  115.     public void end() throws IOException {
  116.        
  117.         //System.out.println("@@PIPE@@ END");
  118.         closeResources();
  119.        
  120.     }
  121.        
  122.     @Override
  123.     public void closeResources() throws IOException {
  124.        
  125.         //System.out.println("@@PIPE@@ CLOSE RESOURCES ...");
  126.         IOException ioException = null;
  127.        
  128.         if(!this.closed) {
  129.        
  130.             try {
  131.            
  132.                 // Chiudo input stream
  133.                 this.streamingHandler.close();
  134.                
  135.                 // Fermo esecutore del thread
  136.                 this.executor.shutdown();              
  137.                 try {
  138.    
  139.                      if (!this.executor.awaitTermination(20, TimeUnit.SECONDS)) {
  140.                          this.executor.shutdownNow();
  141.                      }
  142.                  } catch (InterruptedException pCaught) {  
  143.                      this.executor.shutdownNow();
  144.                      Thread.currentThread().interrupt();    
  145.                  }
  146.                                
  147.             }catch(Throwable e){
  148.                 if(e!=null && e instanceof InterruptedException) {
  149.                     Thread.currentThread().interrupt();
  150.                 }
  151.                 this.log.error("["+this.idStreamingHandler+"] end error",e);
  152.                 ioException = new IOException(e.getMessage());
  153.             }
  154.         }
  155.        
  156.         //System.out.println("@@PIPE@@ RETRIVE RESULT ...");
  157.         retrieveResult();
  158.         //System.out.println("@@PIPE@@ RETRIVED RESULT");
  159.        
  160.         //System.out.println("@@PIPE@@ Release resource...");
  161.         this.releaseResource();
  162.         //System.out.println("@@PIPE@@ Release resource ok");
  163.        
  164.         if(ioException!=null){
  165.             throw ioException;
  166.         }
  167.        
  168.     }

  169.     private void retrieveResult() {
  170.        
  171.         if(this.retrieveResult == false){
  172.             try {
  173.                                
  174.                 // Recupero risultato
  175.                 if(this.submitThreadExecution!=null){
  176.                     //System.out.println("@@PIPE@@ RETRIVE RESULT submitThreadExecution.get()...");
  177.                     this.resultThreadExecution = this.submitThreadExecution.get();
  178.                     //System.out.println("@@PIPE@@ RETRIVE RESULT submitThreadExecution.get() ok");
  179.                 }
  180.                
  181.                 // Recupero eventuali errori non lanciati nella exception del metodo call() (Utile negli scenari in cui l'errore viene capito in streaming all'interno del thread)
  182.                 this.errorMessageThreadExecution = this.streamingHandler.getError();
  183.                 this.exceptionThreadExecution = this.streamingHandler.getException();
  184.                                
  185.             }catch(Throwable e){
  186.                
  187.                 if(e !=null && e instanceof InterruptedException) {
  188.                     Thread.currentThread().interrupt();
  189.                 }
  190.                
  191.                 // Errori generati da this.submitThreadExecution.get()
  192.                 // Cosi' facendo recupero eventuali errori lanciati nella exception del metodo call()
  193.                 this.errorMessageThreadExecution = e.getMessage();
  194.                 this.exceptionThreadExecution = e;
  195.                
  196.                 this.log.error("["+this.idStreamingHandler+"] end error",e);
  197.                 // Non devo rilanciare gli errori, la logica e' rimandata a chi usa gli handler.
  198.                 // Devo solo salvare gli errori
  199.                 // throw new IOException(e.getMessage());
  200.             }
  201.             finally{
  202.                 // Risultati letti
  203.                 this.retrieveResult = true;
  204.             }
  205.         }
  206.     }
  207.    
  208.     private void releaseResource() throws IOException{
  209.         try {
  210.             if(!this.closed) {
  211.                 //this.out = null;
  212.                 this.streamingHandler.close();
  213.                 this.streamingHandler = null;
  214.                 this.submitThreadExecution = null;
  215.                 this.closed = true;
  216.             }
  217.         }catch(Exception e){
  218.             this.log.error("["+this.idStreamingHandler+"] closeResources error",e);
  219.             throw new IOException(e.getMessage());
  220.         }
  221.     }
  222.    
  223.    
  224.    
  225.    
  226.    
  227.     // ** Metodi Recuperare informazioni elaborate dallo streaming handler **
  228.    
  229.     private void finalizeResult() throws IOException{
  230.         // La close puo' non essere ancora stata effettuato:
  231.         // - puo' succedere che non sia stato chiamato l'end perche' il messaggio e' piccolo e viene gestito in una unica feed.
  232.         // - inoltre il metodo closeResources puo' essere invocato alla fine di tutto (es. dopo aver ritornato la risposta)
  233.         if(!this.closed) {
  234.             closeResources();
  235.         }
  236.     }
  237.    
  238.     public ResultStreamingHandler getResult() throws IOException {
  239.         this.finalizeResult();
  240.         return this.resultThreadExecution;
  241.     }

  242.     public String getError() throws IOException {
  243.         this.finalizeResult();
  244.         return this.errorMessageThreadExecution;
  245.     }
  246.    
  247.     public Throwable getException() throws IOException {
  248.         this.finalizeResult();
  249.         return this.exceptionThreadExecution;
  250.     }

  251.    



  252. }