PipedBytesStream.java

  1. /*
  2.  * GovWay - A customizable API Gateway
  3.  * https://govway.org
  4.  *
  5.  * Copyright (c) 2005-2025 Link.it srl (https://link.it).
  6.  *
  7.  * This program is free software: you can redistribute it and/or modify
  8.  * it under the terms of the GNU General Public License version 3, as published by
  9.  * the Free Software Foundation.
  10.  *
  11.  * This program is distributed in the hope that it will be useful,
  12.  * but WITHOUT ANY WARRANTY; without even the implied warranty of
  13.  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  14.  * GNU General Public License for more details.
  15.  *
  16.  * You should have received a copy of the GNU General Public License
  17.  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
  18.  *
  19.  */
  20. package org.openspcoop2.utils.io.notifier.unblocked;

  21. import java.io.IOException;
  22. import java.util.ArrayList;
  23. import java.util.List;
  24. import java.util.concurrent.CompletableFuture;
  25. import java.util.concurrent.TimeUnit;

  26. import org.openspcoop2.utils.SemaphoreLock;
  27. import org.openspcoop2.utils.Utilities;
  28. import org.slf4j.Logger;

  29. /**
  30.  * PipedBytesStream
  31.  *
  32.  * @author $Author$
  33.  * @version $Rev$, $Date$
  34.  */
  35. public class PipedBytesStream extends IPipedUnblockedStream {
  36.     private static final int MAX_QUEUE = 2;

  37.     protected Logger log = null;
  38.     private long sizeBuffer;
  39.     private int timeoutMs;
  40.    
  41.     @Override
  42.     public void init(Logger log, long sizeBuffer, int timeoutMs, String source) {
  43.         this.log = log;
  44.         // In memoria esistono 2 buffer,
  45.         // - [bytesReceived] uno che contiene i bytes gia' consolidati pronti a essere consumati
  46.         // - [bytesReading] buffer utilizzato per scrivere i dati
  47.         // Quindi puo' succedere che entrambi i buffer siano "pieni". La dimensione massima richiesta in memoria viene quindi divisa per 2.
  48.         // NOTA: La dimensione di ogni buffer potra' essere this.sizeBuffer + eventualmente ;a dimensione del byte[] fornita con l'ultima write che ha superato il controllo di waitSpaceAvailable
  49.         if(sizeBuffer<=0) {
  50.             sizeBuffer = Utilities.DIMENSIONE_BUFFER;
  51.         }
  52.         this.sizeBuffer = sizeBuffer / 2;
  53.         this.timeoutMs = timeoutMs;
  54.         this.source = source;
  55.         initReadingBuffer();
  56.     }
  57.     @Override
  58.     public void setTimeout(int timeoutMs) {
  59.         this.timeoutMs = timeoutMs;
  60.     }
  61.    
  62.     //private final Integer semaphore = 1;
  63.     private final org.openspcoop2.utils.Semaphore lockPIPE = new org.openspcoop2.utils.Semaphore("PipedBytesStream");
  64.     private byte [] bytesReading = null;
  65.     //private volatile int indexNextByteReceivedForWrite = 0;
  66.     private int indexNextByteReceivedForWrite = 0;
  67.     private List<byte[]> chunkList = new ArrayList<byte[]>();
  68.     private byte [] bytesReceived = null;
  69.     //private volatile int indexNextByteReceivedForRead = -1;
  70.     private int indexNextByteReceivedForRead = -1;
  71.    
  72.     private boolean stop = false;

  73.     private boolean useThreadSleep = false;
  74.     private static final int ITERAZIONI_WAIT = 128;
  75.     private CompletableFuture<Boolean> asyncReadTask = null;
  76.     private CompletableFuture<Boolean> asyncWriteTask = null;

  77.     private void initReadingBuffer() {
  78.         this.bytesReading = new byte[ (int)this.sizeBuffer ];
  79.         this.indexNextByteReceivedForWrite = 0;
  80.     }

  81.     private String source = null;
  82.     public String getPrefixSource() {
  83.         return this.source!=null ? "["+this.source+"] " : "";
  84.     }

  85.     private boolean readBytesPending() {
  86.         return (this.indexNextByteReceivedForWrite > 0 || this.chunkList.size() > 0);
  87.     }

  88.     // INPUT STREAM
  89.    

  90.     private void readWaitBytes() throws IOException{
  91.         try {
  92.             if(this.useThreadSleep) {
  93.                 int i = 0;
  94.                 while(this.stop==false && !readBytesPending() && i<ITERAZIONI_WAIT){
  95.                     Utilities.sleep((i+1));
  96.                     i = i + i;
  97.                 }
  98.                 if(i>=ITERAZIONI_WAIT){
  99.                     throw new IOException(getPrefixSource()+"Timeout, no bytes available for read");
  100.                 }
  101.             } else {
  102.                 boolean wait = false;
  103.                 SemaphoreLock lock = this.lockPIPE.acquireThrowRuntime("readWaitBytes");
  104.                 try {
  105.                     if(this.stop==false && !readBytesPending()) {
  106.                         this.asyncReadTask = new CompletableFuture<Boolean>();
  107.                         wait = true;
  108.                     }
  109.                 }finally {
  110.                     this.lockPIPE.release(lock, "readWaitBytes");
  111.                 }
  112.                
  113.                 if(wait) {
  114.                     try {
  115.     //                  System.out.println("["+this.source+"] ASPETTO READ...");
  116.                         if(this.timeoutMs>0) {
  117.                             this.asyncReadTask.get(this.timeoutMs,TimeUnit.MILLISECONDS );
  118.                         }
  119.                         else {
  120.                             this.asyncReadTask.get();
  121.                         }
  122.     //                  System.out.println("["+this.source+"] READ OK");
  123.                     }catch (InterruptedException timeout) {
  124.                         Thread.currentThread().interrupt();
  125.                         throw new IOException(getPrefixSource()+"Timeout, no bytes available for read: "+timeout.getMessage(),timeout);
  126.                     }catch(Exception timeout) {
  127.                         throw new IOException(getPrefixSource()+"Timeout, no bytes available for read: "+timeout.getMessage(),timeout);
  128.                     }
  129.                 }
  130.             }
  131.         }
  132.         catch(IOException io) {
  133.             throw io;
  134.         }
  135.         catch(Throwable t) {
  136.             if(t !=null && t instanceof InterruptedException) {
  137.                 Thread.currentThread().interrupt();
  138.             }
  139.             throw new IOException(t.getMessage(),t);
  140.         }
  141.     }
  142.    
  143.     @Override
  144.     public int read(byte[] b) throws IOException {
  145.        
  146.         return this.read(b, 0, b.length);
  147.        
  148.     }

  149.     @Override
  150.     public int read(byte[] b, int off, int len) throws IOException {
  151.        
  152.         try {
  153.            
  154.     //      System.out.println("########### READ b["+b.length+"] off["+off+"] len["+len+"] .....");
  155.             //this.log.debug("########### READ b["+b.length+"] off["+off+"] len["+len+"] .....");
  156.            
  157.             if(this.bytesReceived==null){
  158.                 if (this.stop) {        
  159.                     if(!readBytesPending()){
  160.     //                  System.out.println("########### READ b["+b.length+"] off["+off+"] len["+len+"] STOP BOUT NULL return -1");
  161.                         if(this.asyncWriteTask!=null) {
  162.     //                      System.out.println("["+this.source+"] READ for WRITE COMPLETE 1");
  163.                             this.asyncWriteTask.complete(true);
  164.                         }
  165.                         return -1;
  166.                     }
  167.                 } else {
  168.                     if(!readBytesPending()){
  169.     //                  System.out.println("########### READ b["+b.length+"] off["+off+"] len["+len+"] WAIT BYTES ...");
  170.                         readWaitBytes();
  171.                         if (!readBytesPending()) {
  172.                             // Viene reso null dal metodo close() che puo' essere chiamato mentre la read e' in corso
  173.     //                      System.out.println("########### READ b["+b.length+"] off["+off+"] len["+len+"] WAIT BYTES FOUND BOUT NULL ON EXIT");
  174.                             if(this.asyncWriteTask!=null) {
  175.     //                          System.out.println("["+this.source+"] READ for WRITE COMPLETE 3");
  176.                                 this.asyncWriteTask.complete(true);
  177.                             }
  178.                             return -1;
  179.                         }
  180.                     }
  181.                 }
  182.             }
  183.            
  184.             //this.log.debug("########### READ b["+b.length+"] off["+off+"] len["+len+"] BYTES AVAILABLE ...");
  185.            
  186.             if(this.bytesReceived==null){
  187.                 if(this.stop){
  188.                     if(!readBytesPending()){
  189.     //                  System.out.println("########### READ b["+b.length+"] off["+off+"] len["+len+"] BYTES AVAILABLE RETURN -1");
  190.                         if(this.asyncWriteTask!=null) {
  191.     //                      System.out.println("["+this.source+"] READ for WRITE COMPLETE 4");
  192.                             this.asyncWriteTask.complete(true);
  193.                         }
  194.                         return -1;
  195.                     }
  196.                 }
  197.             }
  198.            
  199.                    
  200.            
  201.            
  202.             if(this.bytesReceived==null){
  203.     //          System.out.println("########### READ b["+b.length+"] off["+off+"] len["+len+"] BYTES AVAILABLE FROM PRECEDENT BUFFERING IS NULL ...");
  204.     //          System.out.println("########### READ b["+b.length+"] off["+off+"] len["+len+"] SYNC ...");
  205.                 //synchronized (this.semaphore) {
  206.                 SemaphoreLock lock = this.lockPIPE.acquireThrowRuntime("read");
  207.                 try {
  208.     //              System.out.println("########### READ b["+b.length+"] off["+off+"] len["+len+"] CHUNKS ... " + this.chunkList.size() );
  209.     //              System.out.println("########### READ b["+b.length+"] off["+off+"] len["+len+"] Next-Byte for write ... " + this.indexNextByteReceivedForWrite );
  210.                     if ( this.chunkList.size() > 0 ) {
  211.                         this.bytesReceived = this.chunkList.remove(0);
  212.                     } else
  213.                     if ( this.indexNextByteReceivedForWrite > 0 ) {
  214.                         this.bytesReceived = new byte[ this.indexNextByteReceivedForWrite ];
  215.                         System.arraycopy( this.bytesReading, 0, this.bytesReceived, 0, this.indexNextByteReceivedForWrite );
  216.                         this.indexNextByteReceivedForWrite = 0;
  217.                     }
  218.                     this.indexNextByteReceivedForRead = 0;
  219.                     if(this.asyncWriteTask!=null) {
  220.     //                  System.out.println("["+this.source+"] READ for WRITE COMPLETE IN SEMAPHORE");
  221.                         this.asyncWriteTask.complete(true);
  222.                     }
  223.                 } finally {
  224.                     this.lockPIPE.release(lock, "read");
  225.                 }
  226.     //          System.out.println("########### READ b["+b.length+"] off["+off+"] len["+len+"] SYNC OK");
  227.             }
  228.            
  229.             if(this.bytesReceived==null){
  230.                 return -1;
  231.             }
  232.            
  233.             int bytesAvailableForRead = this.bytesReceived.length - this.indexNextByteReceivedForRead;
  234.             if ( bytesAvailableForRead == len ) {
  235.                 System.arraycopy( this.bytesReceived, this.indexNextByteReceivedForRead, b, off, bytesAvailableForRead );
  236.                 this.bytesReceived = null;
  237.                 this.indexNextByteReceivedForRead = -1;
  238.     //          System.out.println("########### READ b["+b.length+"] off["+off+"] len["+len+"] NEXT INDEX["+this.indexNextByteReceivedForRead+"] RETURN LETTI("+len+") EXIT A");
  239.                 return len;
  240.             } else
  241.             if ( bytesAvailableForRead > len ) {
  242.                 System.arraycopy( this.bytesReceived, this.indexNextByteReceivedForRead, b, off, len );
  243.                 this.indexNextByteReceivedForRead += len;
  244.     //          System.out.println("########### READ b["+b.length+"] off["+off+"] len["+len+"] NEXT INDEX["+this.indexNextByteReceivedForRead+"] RETURN LETTI("+len+") EXIT B");
  245.                 return len;
  246.             } else {
  247.                 System.arraycopy( this.bytesReceived, this.indexNextByteReceivedForRead, b, off, bytesAvailableForRead );
  248.                 this.bytesReceived = null;
  249.                 this.indexNextByteReceivedForRead = -1;
  250.     //          System.out.println("########### READ b["+b.length+"] off["+off+"] len["+len+"] NEXT INDEX["+this.indexNextByteReceivedForRead+"] RETURN LETTI("+bytesAvailableForRead+") EXIT C");
  251.                 return bytesAvailableForRead;
  252.             }
  253.         }
  254.         catch(IOException io) {
  255.             throw io;
  256.         }
  257.         catch(Throwable t) {
  258.             throw new IOException(t.getMessage(),t);
  259.         }
  260.     }

  261.     @Override
  262.     public int read() throws IOException {
  263.         byte[]b = new byte[1];
  264.         int len = this.read(b);
  265.         if ( len == 1 )
  266.             return b[0] & 0xFF;
  267.         if ( len == -1 )
  268.             return -1;
  269.         throw new IOException( "Cannot read single byte" );
  270.     }

  271.    
  272.     @Override
  273.     public void close() throws IOException {
  274.         try {
  275.             if ( this.stop == false ) {
  276.    
  277.                 SemaphoreLock lock = this.lockPIPE.acquireThrowRuntime("close");
  278.                 try {
  279.                     if ( this.indexNextByteReceivedForWrite > 0 ) {
  280.                         byte[] lastChunk = new byte[ this.indexNextByteReceivedForWrite ];
  281.                         System.arraycopy( this.bytesReading, 0, lastChunk, 0, this.indexNextByteReceivedForWrite );
  282.                         this.chunkList.add( lastChunk );
  283.                         this.indexNextByteReceivedForWrite = 0;
  284.                     }
  285.                     this.bytesReading = null;
  286.                     this.stop = true;
  287.                 }finally{
  288.                     this.lockPIPE.release(lock, "close");
  289.                 }
  290.             }
  291.             if(this.asyncWriteTask!=null) {
  292.     //          System.out.println("["+this.source+"] CLOSE for WRITE COMPLETE");
  293.                 this.asyncWriteTask.complete(true);
  294.             }
  295.             if(this.asyncReadTask!=null) {
  296.     //          System.out.println("["+this.source+"] CLOSE for READ COMPLETE");
  297.                 this.asyncReadTask.complete(true);
  298.             }
  299.         }
  300.         catch(Throwable t) {
  301.             throw new IOException(t.getMessage(),t);
  302.         }
  303.     }

  304.    
  305.     // ALIMENTAZIONE
  306.    
  307.     private void writeWaitEmptyBuffer() throws IOException{
  308.         try {
  309.             if ( this.useThreadSleep ) {
  310.                 if ( readBytesPending() && this.chunkList.size() >= MAX_QUEUE ) {
  311.                     int i = 0;
  312.                     while ( this.stop == false && i < ITERAZIONI_WAIT ) {
  313.                         Utilities.sleep((i+1));
  314.                         i = i + i;
  315.                     }
  316.                     if ( i >= ITERAZIONI_WAIT ) {
  317.                         throw new IOException(getPrefixSource()+"Timeout, no buffer available for write");
  318.                     }
  319.                 }
  320.             } else {
  321.                 boolean wait = false;
  322.                 SemaphoreLock lock = this.lockPIPE.acquireThrowRuntime("writeWaitEmptyBuffer");
  323.                 try {
  324.                     if ( readBytesPending() && this.chunkList.size() >= MAX_QUEUE ) {
  325.                         if ( this.stop == false ) {
  326.                             this.asyncWriteTask = new CompletableFuture<Boolean>();
  327.                             wait = true;
  328.                         }
  329.                     }
  330.                 }finally{
  331.                     this.lockPIPE.release(lock, "writeWaitEmptyBuffer");
  332.                 }
  333.                 if(wait) {
  334.                     try {
  335.     //                  System.out.println("["+this.source+"] ASPETTO WRITE ...");
  336.                         if(this.timeoutMs>0) {
  337.                             this.asyncWriteTask.get(this.timeoutMs,TimeUnit.MILLISECONDS );
  338.                         }
  339.                         else {
  340.                             this.asyncWriteTask.get();
  341.                         }
  342.     //                  System.out.println("["+this.source+"] WRITE OK");
  343.                     }catch(InterruptedException timeout) {
  344.                         Thread.currentThread().interrupt();
  345.                         throw new IOException(getPrefixSource()+"Timeout, no bytes available for read: "+timeout.getMessage(),timeout);
  346.                     }
  347.                     catch(Exception timeout) {
  348.                         throw new IOException(getPrefixSource()+"Timeout, no bytes available for read: "+timeout.getMessage(),timeout);
  349.                     }
  350.                 }
  351.             }
  352.         }
  353.         catch(IOException io) {
  354.             throw io;
  355.         }
  356.         catch(Throwable t) {
  357.             if(t !=null && t instanceof InterruptedException) {
  358.                 Thread.currentThread().interrupt();
  359.             }
  360.             throw new IOException(t.getMessage(),t);
  361.         }
  362.     }
  363.    
  364.     @Override
  365.     public void write(byte b) throws IOException{
  366.        
  367.         try {
  368.        
  369.             //this.log.debug("########### WRITE byte .....");
  370.            
  371.             if ( this.bytesReading == null ) {
  372.                 throw new IOException(getPrefixSource()+"Stream already closed");
  373.             }
  374.            
  375.             this.writeWaitEmptyBuffer();
  376.                    
  377.             //this.log.debug("########### WRITE byte SYNC ...");
  378.             //synchronized (this.semaphore) {
  379.             SemaphoreLock lock = this.lockPIPE.acquireThrowRuntime("write(b)");
  380.             try {
  381.                 this.bytesReading[ this.indexNextByteReceivedForWrite++ ] = b;
  382.                 if ( this.indexNextByteReceivedForWrite == this.bytesReading.length ) {
  383.                     this.chunkList.add( this.bytesReading );
  384.                     initReadingBuffer();
  385.                 }
  386.                 if(this.asyncReadTask!=null) {
  387.     //              System.out.println("["+this.source+"] WRITE for READ COMPLETE 1");
  388.                     this.asyncReadTask.complete(true);
  389.                 }
  390.                 //this.log.debug("########### WRITE byte SYNC OK");
  391.     //          System.out.println("########### WRITE byte SYNC OK ADD[1] SIZE_ATTUALE["+ this.indexNextByteReceivedForWrite + " - " + this.chunkList.size()+"]");
  392.             }finally{
  393.                 this.lockPIPE.release(lock, "write(b)");
  394.             }
  395.            
  396.         }
  397.         catch(IOException io) {
  398.             throw io;
  399.         }
  400.         catch(Throwable t) {
  401.             throw new IOException(t.getMessage(),t);
  402.         }

  403.     }
  404.    
  405.     @Override
  406.     public void write(byte [] b) throws IOException{
  407.        
  408.         try {
  409.        
  410.             //this.log.debug("########### WRITE byte ["+b.length+"] .....");
  411.            
  412.             if ( this.bytesReading == null ) {
  413.                 throw new IOException(getPrefixSource()+"Stream already closed");
  414.             }
  415.            
  416.             this.writeWaitEmptyBuffer();
  417.                    
  418.             //this.log.debug("########### WRITE byte ["+b.length+"] SYNC ...");
  419.             //synchronized (this.semaphore) {
  420.             SemaphoreLock lock = this.lockPIPE.acquireThrowRuntime("write(b[])");
  421.             try {
  422.                 int offset = 0;
  423.                 int bytesNum = b.length;
  424.                 while ( bytesNum > 0 ) {
  425.                     int freeSpace = this.bytesReading.length - this.indexNextByteReceivedForWrite;
  426.                     int chunkLen = ( bytesNum <= freeSpace ? bytesNum : freeSpace );
  427.                     System.arraycopy( b, offset, this.bytesReading, this.indexNextByteReceivedForWrite, chunkLen );
  428.                     this.indexNextByteReceivedForWrite += chunkLen;
  429.                     if ( this.indexNextByteReceivedForWrite == this.bytesReading.length ) {
  430.                         this.chunkList.add( this.bytesReading );
  431.                         initReadingBuffer();
  432.                     }
  433.                     bytesNum -= chunkLen;
  434.                     offset += chunkLen;
  435.                 }
  436.                 if(this.asyncReadTask!=null) {
  437.     //              System.out.println("["+this.source+"] WRITE for READ COMPLETE 2");
  438.                     this.asyncReadTask.complete(true);
  439.                 }
  440.                 //this.log.debug("########### WRITE byte ["+b.length+"] SYNC OK");
  441.     //          System.out.println("########### WRITE byte SYNC OK ADD["+b.length+"] SIZE_ATTUALE["+ this.indexNextByteReceivedForWrite + " - " + this.chunkList.size()+"]");
  442.             }finally {
  443.                 this.lockPIPE.release(lock, "write(b[])");
  444.             }
  445.            
  446.         }
  447.         catch(IOException io) {
  448.             throw io;
  449.         }
  450.         catch(Throwable t) {
  451.             throw new IOException(t.getMessage(),t);
  452.         }

  453.     }
  454.    
  455.     @Override
  456.     public void write(byte[] b, int off, int len) throws IOException{
  457.        
  458.         try {
  459.        
  460.             //this.log.debug("########### WRITE byte ["+b.length+"] off:"+off+" len:"+len+" .....");
  461.            
  462.             if ( this.bytesReading == null ) {
  463.                 throw new IOException(getPrefixSource()+"Stream already closed");
  464.             }
  465.            
  466.             this.writeWaitEmptyBuffer();
  467.                    
  468.             //this.log.debug("########### WRITE byte ["+b.length+"] off:"+off+" len:"+len+" SYNC ...");
  469.             //synchronized (this.semaphore) {
  470.             SemaphoreLock lock = this.lockPIPE.acquireThrowRuntime("write(b[],off,len)");
  471.             try {
  472.                 int offset = off;
  473.                 int bytesNum = len;
  474.                 while ( bytesNum > 0 ) {
  475.                     int freeSpace = this.bytesReading.length - this.indexNextByteReceivedForWrite;
  476.                     int chunkLen = ( bytesNum <= freeSpace ? bytesNum : freeSpace );
  477.                     System.arraycopy( b, offset, this.bytesReading, this.indexNextByteReceivedForWrite, chunkLen );
  478.                     this.indexNextByteReceivedForWrite += chunkLen;
  479.                     if ( this.indexNextByteReceivedForWrite == this.bytesReading.length ) {
  480.                         this.chunkList.add( this.bytesReading );
  481.                         initReadingBuffer();
  482.                     }
  483.                     bytesNum -= chunkLen;
  484.                     offset += chunkLen;
  485.                 }
  486.                 if(this.asyncReadTask!=null) {
  487.     //              System.out.println("["+this.source+"] WRITE for READ COMPLETE 3");
  488.                     this.asyncReadTask.complete(true);
  489.                 }
  490.                 //this.log.debug("########### WRITE byte ["+b.length+"] off:"+off+" len:"+len+" SYNC OK");
  491.     //          System.out.println("########### WRITE byte SYNC OK ADD["+b.length+"] SIZE_ATTUALE["+ this.indexNextByteReceivedForWrite + " - " + this.chunkList.size()+"]");
  492.             }finally {
  493.                 this.lockPIPE.release(lock, "write(b[],off,len)");
  494.             }
  495.            
  496.         }
  497.         catch(IOException io) {
  498.             throw io;
  499.         }
  500.         catch(Throwable t) {
  501.             throw new IOException(t.getMessage(),t);
  502.         }

  503.     }

  504. }