PipedUnblockedStream.java

  1. /*
  2.  * GovWay - A customizable API Gateway
  3.  * https://govway.org
  4.  *
  5.  * Copyright (c) 2005-2025 Link.it srl (https://link.it).
  6.  *
  7.  * This program is free software: you can redistribute it and/or modify
  8.  * it under the terms of the GNU General Public License version 3, as published by
  9.  * the Free Software Foundation.
  10.  *
  11.  * This program is distributed in the hope that it will be useful,
  12.  * but WITHOUT ANY WARRANTY; without even the implied warranty of
  13.  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  14.  * GNU General Public License for more details.
  15.  *
  16.  * You should have received a copy of the GNU General Public License
  17.  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
  18.  *
  19.  */
  20. package org.openspcoop2.utils.io.notifier.unblocked;

  21. import java.io.ByteArrayOutputStream;
  22. import java.io.IOException;
  23. import java.util.concurrent.CompletableFuture;
  24. import java.util.concurrent.TimeUnit;

  25. import org.openspcoop2.utils.SemaphoreLock;
  26. import org.openspcoop2.utils.Utilities;
  27. import org.slf4j.Logger;

  28. /**
  29.  * PipedUnblockedStream
  30.  *
  31.  * @author Poli Andrea (apoli@link.it)
  32.  * @author $Author$
  33.  * @version $Rev$, $Date$
  34.  */
  35. public class PipedUnblockedStream extends IPipedUnblockedStream {

  36.     protected Logger log = null;
  37.     private long sizeBuffer;
  38.     private int timeoutMs;
  39.    
  40.     @Override
  41.     public void init(Logger log, long sizeBuffer, int timeoutMs, String source) {
  42.         this.log = log;
  43.         // In memoria esistono 2 buffer,
  44.         // - [bytesReceived] uno che contiene i bytes gia' consolidati pronti a essere consumati
  45.         // - [bout] buffer utilizzato per scrivere i dati
  46.         // Quindi puo' succedere che entrambi i buffer siano "pieni". La dimensione massima richiesta in memoria viene quindi divisa per 2.
  47.         // NOTA: La dimensione di ogni buffer potra' essere this.sizeBuffer + eventualmente ;a dimensione del byte[] fornita con l'ultima write che ha superato il controllo di waitSpaceAvailable
  48.         if(sizeBuffer<=0) {
  49.             sizeBuffer = Utilities.DIMENSIONE_BUFFER;
  50.         }
  51.         this.sizeBuffer = sizeBuffer / 2;
  52.         this.timeoutMs = timeoutMs;
  53.         this.source = source;
  54.     }
  55.     @Override
  56.     public void setTimeout(int timeoutMs) {
  57.         this.timeoutMs = timeoutMs;
  58.     }
  59.    
  60.     //private final Integer semaphore = 1;
  61.     private final org.openspcoop2.utils.Semaphore lockPIPE = new org.openspcoop2.utils.Semaphore("PipedUnblockedStream");
  62.     private ByteArrayOutputStream bout = new ByteArrayOutputStream();
  63.     private byte [] bytesReceived = null;
  64.     private int indexNextByteReceivedForRead = -1;
  65.    
  66.     private boolean stop = false;

  67.     private boolean useThreadSleep = false;
  68.     private static final int ITERAZIONI_WAIT = 128;
  69.     private CompletableFuture<Boolean> asyncReadTask = null;
  70.     private CompletableFuture<Boolean> asyncWriteTask = null;
  71.    
  72.     private String source = null;
  73.     public String getPrefixSource() {
  74.         return this.source!=null ? "["+this.source+"] " : "";
  75.     }
  76.    
  77.     // INPUT STREAM
  78.    

  79.     private void readWaitBytes() throws IOException{
  80.         try {
  81.             if(this.useThreadSleep) {
  82.                 int i = 0;
  83.                 while(this.stop==false && this.bout!=null && this.bout.size()==0 && i<ITERAZIONI_WAIT){
  84.                     Utilities.sleep((i+1));
  85.                     i = i + i;
  86.                 }
  87.                 if(i>=ITERAZIONI_WAIT){
  88.                     throw new IOException(getPrefixSource()+"Timeout, no bytes available for read");
  89.                 }
  90.             }
  91.             else {
  92.                 boolean wait = false;
  93.                 SemaphoreLock lock = this.lockPIPE.acquireThrowRuntime("readWaitBytes");
  94.                 try {
  95.                     if(this.stop==false && this.bout!=null && this.bout.size()==0) {
  96.                         this.asyncReadTask = new CompletableFuture<Boolean>();
  97.                         wait = true;
  98.                     }
  99.                 }finally {
  100.                     this.lockPIPE.release(lock, "readWaitBytes");
  101.                 }
  102.                
  103.                 if(wait) {
  104.                     try {
  105.                         //System.out.println("["+this.source+"] ASPETTO READ...");
  106.                         if(this.timeoutMs>0) {
  107.                             this.asyncReadTask.get(this.timeoutMs,TimeUnit.MILLISECONDS );
  108.                         }
  109.                         else {
  110.                             this.asyncReadTask.get();
  111.                         }
  112.                         //System.out.println("["+this.source+"] READ OK");
  113.                     }catch(InterruptedException timeout) {
  114.                         Thread.currentThread().interrupt();
  115.                         throw new IOException(getPrefixSource()+"Timeout, no bytes available for read: "+timeout.getMessage(),timeout);
  116.                     }
  117.                     catch(Exception timeout) {
  118.                         throw new IOException(getPrefixSource()+"Timeout, no bytes available for read: "+timeout.getMessage(),timeout);
  119.                     }
  120.                 }
  121.             }
  122.         }
  123.         catch(IOException io) {
  124.             throw io;
  125.         }
  126.         catch(Throwable t) {
  127.             if(t !=null && t instanceof InterruptedException) {
  128.                 Thread.currentThread().interrupt();
  129.             }
  130.             throw new IOException(t.getMessage(),t);
  131.         }
  132.     }
  133.    
  134.     @Override
  135.     public int read(byte[] b) throws IOException {
  136.        
  137.         return this.read(b, 0, b.length);
  138.        
  139.     }

  140.     @Override
  141.     public int read(byte[] b, int off, int len) throws IOException {
  142.        
  143.         try {
  144.        
  145.             //System.out.println("########### READ b["+b.length+"] off["+off+"] len["+len+"] .....");
  146.             //this.log.debug("########### READ b["+b.length+"] off["+off+"] len["+len+"] .....");
  147.            
  148.             if(this.bytesReceived==null){
  149.                 if(this.stop){      
  150.                     if(this.bout==null){
  151.                         //this.log.debug("########### READ b["+b.length+"] off["+off+"] len["+len+"] STOP BOUT NULL return -1");
  152.                         if(this.asyncWriteTask!=null) {
  153.                             //System.out.println("["+this.source+"] READ for WRITE COMPLETE 1");
  154.                             this.asyncWriteTask.complete(true);
  155.                         }
  156.                         return -1;
  157.                     }
  158.                     if(this.bout.size()<=0){
  159.                         this.bout.close();
  160.                         this.bout = null;
  161.                         //this.log.debug("########### READ b["+b.length+"] off["+off+"] len["+len+"] STOP BOUT SIZE<0 return -1");
  162.                         if(this.asyncWriteTask!=null) {
  163.                             //System.out.println("["+this.source+"] READ for WRITE COMPLETE 2");
  164.                             this.asyncWriteTask.complete(true);
  165.                         }
  166.                         return -1;
  167.                     }
  168.                 }
  169.                 else{
  170.                     if(this.bout.size()==0){
  171.                         //this.log.debug("########### READ b["+b.length+"] off["+off+"] len["+len+"] WAIT BYTES ...");
  172.                         readWaitBytes();
  173.                         if(this.bout==null) {
  174.                             // Viene reso null dal metodo close() che puo' essere chiamato mentre la read e' in corso
  175.                             //this.log.debug("########### READ b["+b.length+"] off["+off+"] len["+len+"] WAIT BYTES FOUND BOUT NULL ON EXIT");
  176.                             if(this.asyncWriteTask!=null) {
  177.                                 //System.out.println("["+this.source+"] READ for WRITE COMPLETE 3");
  178.                                 this.asyncWriteTask.complete(true);
  179.                             }
  180.                             return -1;
  181.                         }
  182.                     }
  183.                 }
  184.             }
  185.            
  186.             //this.log.debug("########### READ b["+b.length+"] off["+off+"] len["+len+"] BYTES AVAILABLE ...");
  187.            
  188.             if(this.bytesReceived==null){
  189.                 if(this.stop){
  190.                     // garantita dal codice sopra essere not null quando si entra nell'if this.bytesReceived==null && this.stop
  191.                     /*if(this.bout==null){
  192.                         //this.log.debug("########### READ b["+b.length+"] off["+off+"] len["+len+"] BYTES AVAILABLE RETURN -1");
  193.                         if(this.asyncWriteTask!=null) {
  194.                             //System.out.println("["+this.source+"] READ for WRITE COMPLETE 4");
  195.                             this.asyncWriteTask.complete(true);
  196.                         }
  197.                         return -1;
  198.                     }*/
  199.                     if(this.bout.size()<=0){
  200.                         this.bout.close();
  201.                         this.bout = null;
  202.                         //this.log.debug("########### READ b["+b.length+"] off["+off+"] len["+len+"] BYTES AVAILABLE RETURN -1 (CASO B)");
  203.                         if(this.asyncWriteTask!=null) {
  204.                             //System.out.println("["+this.source+"] READ for WRITE COMPLETE 5");
  205.                             this.asyncWriteTask.complete(true);
  206.                         }
  207.                         return -1;
  208.                     }
  209.                 }
  210.             }
  211.            
  212.                    
  213.            
  214.            
  215.             if(this.bytesReceived==null){
  216.                 //this.log.debug("########### READ b["+b.length+"] off["+off+"] len["+len+"] BYTES AVAILABLE FROM PRECEDENT BUFFERING IS NULL ...");
  217.                 //this.log.debug("########### READ b["+b.length+"] off["+off+"] len["+len+"] SYNC ...");
  218.                 //synchronized (this.semaphore) {
  219.                 SemaphoreLock lock = this.lockPIPE.acquireThrowRuntime("read");
  220.                 try {
  221.                     //this.log.debug("########### READ b["+b.length+"] off["+off+"] len["+len+"] SYNC A1 ...");
  222.                     this.bout.flush();
  223.                     //this.log.debug("########### READ b["+b.length+"] off["+off+"] len["+len+"] SYNC A2 ...");
  224.                     this.bytesReceived = this.bout.toByteArray();
  225.                     //this.log.debug("########### READ b["+b.length+"] off["+off+"] len["+len+"] SYNC A3 ...");
  226.                     this.indexNextByteReceivedForRead = 0;
  227.                     //this.log.debug("########### READ b["+b.length+"] off["+off+"] len["+len+"] SYNC A4 ...");
  228.                     //System.out.println("########### RESET ATTUALE DIMENSIONE IN MEMORIA ["+this.bytesReceived.length+"]");
  229.                     this.bout.reset();
  230.                     if(this.asyncWriteTask!=null) {
  231.                         //System.out.println("["+this.source+"] READ for WRITE COMPLETE IN SEMAPHORE");
  232.                         this.asyncWriteTask.complete(true);
  233.                     }
  234.                 }finally {
  235.                     this.lockPIPE.release(lock, "read");
  236.                 }
  237.                 //this.log.debug("########### READ b["+b.length+"] off["+off+"] len["+len+"] SYNC OK");
  238.             }
  239.            
  240.             int bytesAvailableForRead = this.bytesReceived.length - this.indexNextByteReceivedForRead;
  241.             if(bytesAvailableForRead==len){        
  242.                 for (int i = 0; i < len; i++) {
  243.                     b[off+i]=this.bytesReceived[this.indexNextByteReceivedForRead];
  244.                     this.indexNextByteReceivedForRead++;
  245.                 }  
  246.                 this.bytesReceived = null;
  247.                 this.indexNextByteReceivedForRead = -1;
  248.                 //this.log.debug("########### READ b["+b.length+"] off["+off+"] len["+len+"] NEXT INDEX["+this.indexNextByteReceivedForRead+"] RETURN LETTI("+len+") EXIT A");
  249.                 return len;
  250.             }
  251.             else if(bytesAvailableForRead>len){
  252.                 int i = 0;
  253.                 for (i = 0; i < len; i++) {
  254.                     b[off+i]=this.bytesReceived[this.indexNextByteReceivedForRead];
  255.                     this.indexNextByteReceivedForRead++;
  256.                 }
  257.                 //this.log.debug("########### READ b["+b.length+"] off["+off+"] len["+len+"] NEXT INDEX["+this.indexNextByteReceivedForRead+"] RETURN LETTI("+len+") EXIT B");
  258.                 return len;
  259.             }
  260.             else{
  261.                 for (int i = 0; i < bytesAvailableForRead; i++) {
  262.                     b[off+i]=this.bytesReceived[this.indexNextByteReceivedForRead];
  263.                     this.indexNextByteReceivedForRead++;
  264.                 }
  265.                 this.bytesReceived = null;
  266.                 this.indexNextByteReceivedForRead = -1;
  267.                 //this.log.debug("########### READ b["+b.length+"] off["+off+"] len["+len+"] NEXT INDEX["+this.indexNextByteReceivedForRead+"] RETURN LETTI("+bytesAvailableForRead+") EXIT C");
  268.                 return bytesAvailableForRead;
  269.             }
  270.            
  271.            
  272.            
  273.     //      this.log.debug("########### READ b["+b.length+"] off["+off+"] len["+len+"] SYNC ...");
  274.     //      byte[] buffer = null;
  275.     //      synchronized (this.semaphore) {
  276.     //          
  277.     //          this.log.debug("########### READ b["+b.length+"] off["+off+"] len["+len+"] SYNC A1 ...");
  278.     //          
  279.     //          this.bout.flush();
  280.     //          
  281.     //          this.log.debug("########### READ b["+b.length+"] off["+off+"] len["+len+"] SYNC A2 ...");
  282.     //          
  283.     //          buffer = this.bout.toByteArray();
  284.     //          
  285.     //          this.log.debug("########### READ b["+b.length+"] off["+off+"] len["+len+"] SYNC A3 ...");
  286.     //          
  287.     //          this.bout.reset();
  288.     //          
  289.     //          this.log.debug("########### READ b["+b.length+"] off["+off+"] len["+len+"] SYNC A4 ...");
  290.     //          
  291.     //          // Se il buffer possiede piu' bytes di quanti richiesti, vengono risalvati quelli che non verranno ritornati con questa chiamata.
  292.     //          if(buffer.length>len){
  293.     //              this.log.debug("########### READ b["+b.length+"] off["+off+"] len["+len+"] SYNC A5 ...");
  294.     //              this.bout.write(buffer, len, buffer.length-len);
  295.     //          }
  296.     //          this.log.debug("########### READ b["+b.length+"] off["+off+"] len["+len+"] SYNC A6 ...");
  297.     //      }
  298.     //          
  299.     //      this.log.debug("########### READ b["+b.length+"] off["+off+"] len["+len+"] SYNC OK");
  300.     //      
  301.     //      if(buffer.length==len){        
  302.     //          for (int i = 0; i < buffer.length; i++) {
  303.     //              b[off+i]=buffer[i];
  304.     //          }      
  305.     //          this.log.debug("########### READ b["+b.length+"] off["+off+"] len["+len+"] EXIT A");
  306.     //          return b.length;
  307.     //      }
  308.     //      else if(buffer.length>len){
  309.     //          int i = 0;
  310.     //          for (i = 0; i < len; i++) {
  311.     //              b[off+i]=buffer[i];
  312.     //          }
  313.     //          this.log.debug("########### READ b["+b.length+"] off["+off+"] len["+len+"] EXIT B");
  314.     //          return b.length;
  315.     //      }
  316.     //      else{
  317.     //          for (int i = 0; i < buffer.length; i++) {
  318.     //              b[off+i]=buffer[i];
  319.     //          }
  320.     //          this.log.debug("########### READ b["+b.length+"] off["+off+"] len["+len+"] EXIT C");
  321.     //          return buffer.length;
  322.     //      }
  323.     //          
  324.         }
  325.         catch(IOException io) {
  326.             throw io;
  327.         }
  328.         catch(Throwable t) {
  329.             throw new IOException(t.getMessage(),t);
  330.         }
  331.     }

  332.     @Override
  333.     public int read() throws IOException {
  334.         byte[]b = new byte[1];
  335.         int len = this.read(b);
  336.         if ( len == 1 )
  337.             return b[0] & 0xFF;
  338.         if ( len == -1 )
  339.             return -1;
  340.         throw new IOException( "Cannot read single byte" );
  341.     }

  342.    
  343.     @Override
  344.     public void close() throws IOException {    
  345.         try {
  346.             SemaphoreLock lock = this.lockPIPE.acquireThrowRuntime("close");
  347.             try {
  348.                 if(this.stop==false){
  349.                     if(! (this.bout.size()>0) ){
  350.                         this.bout.flush();
  351.                         this.bout.close();
  352.                         //this.bout = null; se si annulla, la read lo trovera' null
  353.                     }
  354.                     this.stop = true;
  355.                 }
  356.             }finally{
  357.                 this.lockPIPE.release(lock, "close");
  358.             }
  359.             if(this.asyncWriteTask!=null) {
  360.                 //System.out.println("["+this.source+"] CLOSE for WRITE COMPLETE");
  361.                 this.asyncWriteTask.complete(true);
  362.             }
  363.             if(this.asyncReadTask!=null) {
  364.                 //System.out.println("["+this.source+"] CLOSE for READ COMPLETE");
  365.                 this.asyncReadTask.complete(true);
  366.             }
  367.            
  368.         }
  369.         catch(IOException io) {
  370.             throw io;
  371.         }
  372.         catch(Throwable t) {
  373.             throw new IOException(t.getMessage(),t);
  374.         }
  375.     }

  376.    
  377.     // ALIMENTAZIONE
  378.    
  379.     private void writeWaitEmptyBuffer() throws IOException{
  380.         try {
  381.             if(this.useThreadSleep) {
  382.                 if(this.bout.size()>this.sizeBuffer){
  383.                     int i = 0;
  384.                     while(this.stop==false && this.bout.size()>0 && i<ITERAZIONI_WAIT){
  385.                         Utilities.sleep((i+1));
  386.                         i = i + i;
  387.                     }
  388.                     if(i>=ITERAZIONI_WAIT){
  389.                         throw new IOException(getPrefixSource()+"Timeout, no buffer available for write");
  390.                     }
  391.                 }
  392.             }
  393.             else {
  394.                 boolean wait = false;
  395.                 SemaphoreLock lock = this.lockPIPE.acquireThrowRuntime("writeWaitEmptyBuffer");
  396.                 try {
  397.                     if(this.bout.size()>this.sizeBuffer){
  398.                         if(this.stop==false && this.bout.size()>0 ) {
  399.                             this.asyncWriteTask = new CompletableFuture<Boolean>();
  400.                             wait = true;
  401.                         }
  402.                     }
  403.                 }finally{
  404.                     this.lockPIPE.release(lock, "writeWaitEmptyBuffer");
  405.                 }
  406.                 if(wait) {
  407.                     try {
  408.                         //System.out.println("["+this.source+"] ASPETTO WRITE ...");
  409.                         if(this.timeoutMs>0) {
  410.                             this.asyncWriteTask.get(this.timeoutMs,TimeUnit.MILLISECONDS );
  411.                         }
  412.                         else {
  413.                             this.asyncWriteTask.get();
  414.                         }
  415.                         //System.out.println("["+this.source+"] WRITE OK");
  416.                     }catch(InterruptedException timeout) {
  417.                         Thread.currentThread().interrupt();
  418.                         throw new IOException(getPrefixSource()+"Timeout, no bytes available for read: "+timeout.getMessage(),timeout);
  419.                     }
  420.                     catch(Exception timeout) {
  421.                         throw new IOException(getPrefixSource()+"Timeout, no bytes available for read: "+timeout.getMessage(),timeout);
  422.                     }
  423.                 }
  424.             }
  425.         }
  426.         catch(IOException io) {
  427.             throw io;
  428.         }
  429.         catch(Throwable t) {
  430.             if(t !=null && t instanceof InterruptedException) {
  431.                 Thread.currentThread().interrupt();
  432.             }
  433.             throw new IOException(t.getMessage(),t);
  434.         }
  435.     }
  436.    
  437.     @Override
  438.     public void write(byte b) throws IOException{
  439.        
  440.         try {
  441.        
  442.             //this.log.debug("########### WRITE byte .....");
  443.            
  444.             if(this.bout==null){
  445.                 throw new IOException(getPrefixSource()+"Stream already closed");
  446.             }
  447.            
  448.             this.writeWaitEmptyBuffer();
  449.                    
  450.             //this.log.debug("########### WRITE byte SYNC ...");
  451.             //synchronized (this.semaphore) {
  452.             SemaphoreLock lock = this.lockPIPE.acquireThrowRuntime("write(b)");
  453.             try {
  454.                 this.bout.write(b);
  455.                 if(this.asyncReadTask!=null) {
  456.                     //System.out.println("["+this.source+"] WRITE for READ COMPLETE 1");
  457.                     this.asyncReadTask.complete(true);
  458.                 }
  459.                 //this.log.debug("########### WRITE byte SYNC OK");
  460.                 //System.out.println("########### WRITE byte SYNC OK ADD[1] SIZE_ATTUALE["+this.bout.size()+"]");
  461.             }finally{
  462.                 this.lockPIPE.release(lock, "write(b)");
  463.             }
  464.            
  465.         }
  466.         catch(IOException io) {
  467.             throw io;
  468.         }
  469.         catch(Throwable t) {
  470.             throw new IOException(t.getMessage(),t);
  471.         }

  472.     }
  473.    
  474.     @Override
  475.     public void write(byte [] b) throws IOException{
  476.        
  477.         try {
  478.        
  479.             //this.log.debug("########### WRITE byte ["+b.length+"] .....");
  480.            
  481.             if(this.bout==null){
  482.                 throw new IOException(getPrefixSource()+"Stream already closed");
  483.             }
  484.            
  485.             this.writeWaitEmptyBuffer();
  486.                    
  487.             //this.log.debug("########### WRITE byte ["+b.length+"] SYNC ...");
  488.             //synchronized (this.semaphore) {
  489.             SemaphoreLock lock = this.lockPIPE.acquireThrowRuntime("write(b[])");
  490.             try {
  491.                 this.bout.write(b);
  492.                 if(this.asyncReadTask!=null) {
  493.                     //System.out.println("["+this.source+"] WRITE for READ COMPLETE 2");
  494.                     this.asyncReadTask.complete(true);
  495.                 }
  496.                 //this.log.debug("########### WRITE byte ["+b.length+"] SYNC OK");
  497.                 //System.out.println("########### WRITE byte SYNC OK ADD["+b.length+"] SIZE_ATTUALE["+this.bout.size()+"]");
  498.             }finally {
  499.                 this.lockPIPE.release(lock, "write(b[])");
  500.             }
  501.            
  502.         }
  503.         catch(IOException io) {
  504.             throw io;
  505.         }
  506.         catch(Throwable t) {
  507.             throw new IOException(t.getMessage(),t);
  508.         }

  509.     }
  510.    
  511.     @Override
  512.     public void write(byte[] b, int off, int len) throws IOException{
  513.        
  514.         try {
  515.        
  516.             //this.log.debug("########### WRITE byte ["+b.length+"] off:"+off+" len:"+len+" .....");
  517.            
  518.             if(this.bout==null){
  519.                 throw new IOException(getPrefixSource()+"Stream already closed");
  520.             }
  521.            
  522.             this.writeWaitEmptyBuffer();
  523.                    
  524.             //this.log.debug("########### WRITE byte ["+b.length+"] off:"+off+" len:"+len+" SYNC ...");
  525.             //synchronized (this.semaphore) {
  526.             SemaphoreLock lock = this.lockPIPE.acquireThrowRuntime("write(b[],off,len)");
  527.             try {
  528.                 this.bout.write(b, off, len);
  529.                 if(this.asyncReadTask!=null) {
  530.                     //System.out.println("["+this.source+"] WRITE for READ COMPLETE 3");
  531.                     this.asyncReadTask.complete(true);
  532.                 }
  533.                 //this.log.debug("########### WRITE byte ["+b.length+"] off:"+off+" len:"+len+" SYNC OK");
  534.                 //System.out.println("########### WRITE byte SYNC OK ADD["+b.length+"] SIZE_ATTUALE["+this.bout.size()+"]");
  535.             }finally {
  536.                 this.lockPIPE.release(lock, "write(b[],off,len)");
  537.             }
  538.            
  539.         }
  540.         catch(IOException io) {
  541.             throw io;
  542.         }
  543.         catch(Throwable t) {
  544.             throw new IOException(t.getMessage(),t);
  545.         }

  546.     }

  547. }