PipedUnblockedStream.java
/*
* GovWay - A customizable API Gateway
* https://govway.org
*
* Copyright (c) 2005-2025 Link.it srl (https://link.it).
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License version 3, as published by
* the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
*/
package org.openspcoop2.utils.io.notifier.unblocked;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.openspcoop2.utils.SemaphoreLock;
import org.openspcoop2.utils.Utilities;
import org.slf4j.Logger;
/**
* PipedUnblockedStream
*
* @author Poli Andrea (apoli@link.it)
* @author $Author$
* @version $Rev$, $Date$
*/
public class PipedUnblockedStream extends IPipedUnblockedStream {
protected Logger log = null;
private long sizeBuffer;
private int timeoutMs;
private static final String NO_BYTES_AVAILABLE_READ = "Timeout, no bytes available for read: ";
private static final String STREAM_ALREADY_CLOSED = "Stream already closed";
@Override
public void init(Logger log, long sizeBuffer, int timeoutMs, String source) {
this.log = log;
// In memoria esistono 2 buffer,
// - [bytesReceived] uno che contiene i bytes gia' consolidati pronti a essere consumati
// - [bout] buffer utilizzato per scrivere i dati
// Quindi puo' succedere che entrambi i buffer siano "pieni". La dimensione massima richiesta in memoria viene quindi divisa per 2.
// NOTA: La dimensione di ogni buffer potra' essere this.sizeBuffer + eventualmente ;a dimensione del byte[] fornita con l'ultima write che ha superato il controllo di waitSpaceAvailable
if(sizeBuffer<=0) {
sizeBuffer = Utilities.DIMENSIONE_BUFFER;
}
this.sizeBuffer = sizeBuffer / 2;
this.timeoutMs = timeoutMs;
this.source = source;
}
@Override
public void setTimeout(int timeoutMs) {
this.timeoutMs = timeoutMs;
}
private final org.openspcoop2.utils.Semaphore lockPIPE = new org.openspcoop2.utils.Semaphore("PipedUnblockedStream");
private ByteArrayOutputStream bout = new ByteArrayOutputStream();
private byte [] bytesReceived = null;
private int indexNextByteReceivedForRead = -1;
private boolean stop = false;
private boolean useThreadSleep = false;
private static final int ITERAZIONI_WAIT = 128;
private CompletableFuture<Boolean> asyncReadTask = null;
private CompletableFuture<Boolean> asyncWriteTask = null;
private String source = null;
public String getPrefixSource() {
return this.source!=null ? "["+this.source+"] " : "";
}
// INPUT STREAM
private void readWaitBytes() throws IOException{
try {
if(this.useThreadSleep) {
int i = 0;
while(!this.stop && this.bout!=null && this.bout.size()==0 && i<ITERAZIONI_WAIT){
Utilities.sleep((i+1));
i = i + i;
}
if(i>=ITERAZIONI_WAIT){
throw new IOException(getPrefixSource()+"Timeout, no bytes available for read");
}
}
else {
boolean wait = false;
SemaphoreLock lock = this.lockPIPE.acquireThrowRuntime("readWaitBytes");
try {
if(!this.stop && this.bout!=null && this.bout.size()==0) {
this.asyncReadTask = new CompletableFuture<>();
wait = true;
}
}finally {
this.lockPIPE.release(lock, "readWaitBytes");
}
if(wait) {
asyncReadGet();
}
}
}
catch(IOException io) {
throw io;
}
catch(Throwable t) {
if(t instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
throw new IOException(t.getMessage(),t);
}
}
private void asyncReadGet() throws IOException {
try {
/**System.out.println("["+this.source+"] ASPETTO READ...");*/
if(this.timeoutMs>0) {
this.asyncReadTask.get(this.timeoutMs,TimeUnit.MILLISECONDS );
}
else {
this.asyncReadTask.get();
}
/**System.out.println("["+this.source+"] READ OK");*/
}catch(InterruptedException timeout) {
Thread.currentThread().interrupt();
throw new IOException(getPrefixSource()+NO_BYTES_AVAILABLE_READ+timeout.getMessage(),timeout);
}
catch(Exception timeout) {
throw new IOException(getPrefixSource()+NO_BYTES_AVAILABLE_READ+timeout.getMessage(),timeout);
}
}
@Override
public int read(byte[] b) throws IOException {
return this.read(b, 0, b.length);
}
@Override
public int read(byte[] b, int off, int len) throws IOException {
try {
/**System.out.println("########### READ b["+b.length+"] off["+off+"] len["+len+"] .....");*/
/**this.log.debug("########### READ b["+b.length+"] off["+off+"] len["+len+"] .....");*/
if(this.bytesReceived==null){
if(this.stop){
if(this.bout==null){
/**this.log.debug("########### READ b["+b.length+"] off["+off+"] len["+len+"] STOP BOUT NULL return -1");*/
if(this.asyncWriteTask!=null) {
/**System.out.println("["+this.source+"] READ for WRITE COMPLETE 1");*/
this.asyncWriteTask.complete(true);
}
return -1;
}
if(this.bout.size()<=0){
this.bout.close();
this.bout = null;
/**this.log.debug("########### READ b["+b.length+"] off["+off+"] len["+len+"] STOP BOUT SIZE<0 return -1");*/
if(this.asyncWriteTask!=null) {
/**System.out.println("["+this.source+"] READ for WRITE COMPLETE 2");*/
this.asyncWriteTask.complete(true);
}
return -1;
}
}
else{
if(this.bout.size()==0){
/**this.log.debug("########### READ b["+b.length+"] off["+off+"] len["+len+"] WAIT BYTES ...");*/
readWaitBytes();
if(this.bout==null) {
// Viene reso null dal metodo close() che puo' essere chiamato mentre la read e' in corso
/**this.log.debug("########### READ b["+b.length+"] off["+off+"] len["+len+"] WAIT BYTES FOUND BOUT NULL ON EXIT");*/
if(this.asyncWriteTask!=null) {
/**System.out.println("["+this.source+"] READ for WRITE COMPLETE 3");*/
this.asyncWriteTask.complete(true);
}
return -1;
}
}
}
}
/**this.log.debug("########### READ b["+b.length+"] off["+off+"] len["+len+"] BYTES AVAILABLE ...");*/
/**if(this.bytesReceived==null &&
this.stop){
// garantita dal codice sopra essere not null quando si entra nell'if this.bytesReceived==null && this.stop
if(this.bout==null){
//this.log.debug("########### READ b["+b.length+"] off["+off+"] len["+len+"] BYTES AVAILABLE RETURN -1");
if(this.asyncWriteTask!=null) {
//System.out.println("["+this.source+"] READ for WRITE COMPLETE 4");
this.asyncWriteTask.complete(true);
}
return -1;
}
}*/
if(this.bytesReceived==null &&
this.stop &&
this.bout.size()<=0){
this.bout.close();
this.bout = null;
/**this.log.debug("########### READ b["+b.length+"] off["+off+"] len["+len+"] BYTES AVAILABLE RETURN -1 (CASO B)");*/
if(this.asyncWriteTask!=null) {
/**System.out.println("["+this.source+"] READ for WRITE COMPLETE 5");*/
this.asyncWriteTask.complete(true);
}
return -1;
}
if(this.bytesReceived==null){
/**this.log.debug("########### READ b["+b.length+"] off["+off+"] len["+len+"] BYTES AVAILABLE FROM PRECEDENT BUFFERING IS NULL ...");*/
/**this.log.debug("########### READ b["+b.length+"] off["+off+"] len["+len+"] SYNC ...");*/
SemaphoreLock lock = this.lockPIPE.acquireThrowRuntime("read");
try {
/**this.log.debug("########### READ b["+b.length+"] off["+off+"] len["+len+"] SYNC A1 ...");*/
this.bout.flush();
/**this.log.debug("########### READ b["+b.length+"] off["+off+"] len["+len+"] SYNC A2 ...");*/
this.bytesReceived = this.bout.toByteArray();
/**this.log.debug("########### READ b["+b.length+"] off["+off+"] len["+len+"] SYNC A3 ...");*/
this.indexNextByteReceivedForRead = 0;
/**this.log.debug("########### READ b["+b.length+"] off["+off+"] len["+len+"] SYNC A4 ...");*/
/**System.out.println("########### RESET ATTUALE DIMENSIONE IN MEMORIA ["+this.bytesReceived.length+"]");*/
this.bout.reset();
if(this.asyncWriteTask!=null) {
/**System.out.println("["+this.source+"] READ for WRITE COMPLETE IN SEMAPHORE");*/
this.asyncWriteTask.complete(true);
}
}finally {
this.lockPIPE.release(lock, "read");
}
/**this.log.debug("########### READ b["+b.length+"] off["+off+"] len["+len+"] SYNC OK");*/
}
int bytesAvailableForRead = this.bytesReceived.length - this.indexNextByteReceivedForRead;
if(bytesAvailableForRead==len){
for (int i = 0; i < len; i++) {
b[off+i]=this.bytesReceived[this.indexNextByteReceivedForRead];
this.indexNextByteReceivedForRead++;
}
this.bytesReceived = null;
this.indexNextByteReceivedForRead = -1;
/**this.log.debug("########### READ b["+b.length+"] off["+off+"] len["+len+"] NEXT INDEX["+this.indexNextByteReceivedForRead+"] RETURN LETTI("+len+") EXIT A");*/
return len;
}
else if(bytesAvailableForRead>len){
int i = 0;
for (i = 0; i < len; i++) {
b[off+i]=this.bytesReceived[this.indexNextByteReceivedForRead];
this.indexNextByteReceivedForRead++;
}
/**this.log.debug("########### READ b["+b.length+"] off["+off+"] len["+len+"] NEXT INDEX["+this.indexNextByteReceivedForRead+"] RETURN LETTI("+len+") EXIT B");*/
return len;
}
else{
for (int i = 0; i < bytesAvailableForRead; i++) {
b[off+i]=this.bytesReceived[this.indexNextByteReceivedForRead];
this.indexNextByteReceivedForRead++;
}
this.bytesReceived = null;
this.indexNextByteReceivedForRead = -1;
/**this.log.debug("########### READ b["+b.length+"] off["+off+"] len["+len+"] NEXT INDEX["+this.indexNextByteReceivedForRead+"] RETURN LETTI("+bytesAvailableForRead+") EXIT C");*/
return bytesAvailableForRead;
}
/**this.log.debug("########### READ b["+b.length+"] off["+off+"] len["+len+"] SYNC ...");
byte[] buffer = null;
synchronized (this.semaphore) {
this.log.debug("########### READ b["+b.length+"] off["+off+"] len["+len+"] SYNC A1 ...");
this.bout.flush();
this.log.debug("########### READ b["+b.length+"] off["+off+"] len["+len+"] SYNC A2 ...");
buffer = this.bout.toByteArray();
this.log.debug("########### READ b["+b.length+"] off["+off+"] len["+len+"] SYNC A3 ...");
this.bout.reset();
this.log.debug("########### READ b["+b.length+"] off["+off+"] len["+len+"] SYNC A4 ...");
// Se il buffer possiede piu' bytes di quanti richiesti, vengono risalvati quelli che non verranno ritornati con questa chiamata.
if(buffer.length>len){
this.log.debug("########### READ b["+b.length+"] off["+off+"] len["+len+"] SYNC A5 ...");
this.bout.write(buffer, len, buffer.length-len);
}
this.log.debug("########### READ b["+b.length+"] off["+off+"] len["+len+"] SYNC A6 ...");
}
this.log.debug("########### READ b["+b.length+"] off["+off+"] len["+len+"] SYNC OK");
if(buffer.length==len){
for (int i = 0; i < buffer.length; i++) {
b[off+i]=buffer[i];
}
this.log.debug("########### READ b["+b.length+"] off["+off+"] len["+len+"] EXIT A");
return b.length;
}
else if(buffer.length>len){
int i = 0;
for (i = 0; i < len; i++) {
b[off+i]=buffer[i];
}
this.log.debug("########### READ b["+b.length+"] off["+off+"] len["+len+"] EXIT B");
return b.length;
}
else{
for (int i = 0; i < buffer.length; i++) {
b[off+i]=buffer[i];
}
this.log.debug("########### READ b["+b.length+"] off["+off+"] len["+len+"] EXIT C");
return buffer.length;
}*/
}
catch(IOException io) {
throw io;
}
catch(Throwable t) {
throw new IOException(t.getMessage(),t);
}
}
@Override
public int read() throws IOException {
byte[]b = new byte[1];
int len = this.read(b);
if ( len == 1 )
return b[0] & 0xFF;
if ( len == -1 )
return -1;
throw new IOException( "Cannot read single byte" );
}
@Override
public void close() throws IOException {
try {
SemaphoreLock lock = this.lockPIPE.acquireThrowRuntime("close");
try {
if(!this.stop){
if(this.bout.size()<=0 ){
this.bout.flush();
this.bout.close();
//this.bout = null; se si annulla, la read lo trovera' null
}
this.stop = true;
}
}finally{
this.lockPIPE.release(lock, "close");
}
if(this.asyncWriteTask!=null) {
/**System.out.println("["+this.source+"] CLOSE for WRITE COMPLETE");*/
this.asyncWriteTask.complete(true);
}
if(this.asyncReadTask!=null) {
/**System.out.println("["+this.source+"] CLOSE for READ COMPLETE");*/
this.asyncReadTask.complete(true);
}
}
catch(IOException io) {
throw io;
}
catch(Throwable t) {
throw new IOException(t.getMessage(),t);
}
}
// ALIMENTAZIONE
private void writeWaitEmptyBuffer() throws IOException{
try {
if(this.useThreadSleep) {
if(this.bout.size()>this.sizeBuffer){
int i = 0;
while(!this.stop && this.bout.size()>0 && i<ITERAZIONI_WAIT){
Utilities.sleep((i+1));
i = i + i;
}
if(i>=ITERAZIONI_WAIT){
throw new IOException(getPrefixSource()+"Timeout, no buffer available for write");
}
}
}
else {
boolean wait = false;
SemaphoreLock lock = this.lockPIPE.acquireThrowRuntime("writeWaitEmptyBuffer");
try {
if(this.bout.size()>this.sizeBuffer &&
!this.stop && this.bout.size()>0 ) {
this.asyncWriteTask = new CompletableFuture<>();
wait = true;
}
}finally{
this.lockPIPE.release(lock, "writeWaitEmptyBuffer");
}
if(wait) {
asyncWriteGet();
}
}
}
catch(IOException io) {
throw io;
}
catch(Throwable t) {
if(t instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
throw new IOException(t.getMessage(),t);
}
}
private void asyncWriteGet() throws IOException {
try {
/**System.out.println("["+this.source+"] ASPETTO WRITE ...");*/
if(this.timeoutMs>0) {
this.asyncWriteTask.get(this.timeoutMs,TimeUnit.MILLISECONDS );
}
else {
this.asyncWriteTask.get();
}
/**System.out.println("["+this.source+"] WRITE OK");*/
}catch(InterruptedException timeout) {
Thread.currentThread().interrupt();
throw new IOException(getPrefixSource()+NO_BYTES_AVAILABLE_READ+timeout.getMessage(),timeout);
}
catch(Exception timeout) {
throw new IOException(getPrefixSource()+NO_BYTES_AVAILABLE_READ+timeout.getMessage(),timeout);
}
}
@Override
public void write(byte b) throws IOException{
try {
/**this.log.debug("########### WRITE byte .....");*/
if(this.bout==null){
throw new IOException(getPrefixSource()+STREAM_ALREADY_CLOSED);
}
this.writeWaitEmptyBuffer();
/**this.log.debug("########### WRITE byte SYNC ...");*/
SemaphoreLock lock = this.lockPIPE.acquireThrowRuntime("write(b)");
try {
this.bout.write(b);
if(this.asyncReadTask!=null) {
/**System.out.println("["+this.source+"] WRITE for READ COMPLETE 1");*/
this.asyncReadTask.complete(true);
}
/**this.log.debug("########### WRITE byte SYNC OK");*/
/**System.out.println("########### WRITE byte SYNC OK ADD[1] SIZE_ATTUALE["+this.bout.size()+"]");*/
}finally{
this.lockPIPE.release(lock, "write(b)");
}
}
catch(IOException io) {
throw io;
}
catch(Throwable t) {
throw new IOException(t.getMessage(),t);
}
}
@Override
public void write(byte [] b) throws IOException{
try {
/**this.log.debug("########### WRITE byte ["+b.length+"] .....");*/
if(this.bout==null){
throw new IOException(getPrefixSource()+STREAM_ALREADY_CLOSED);
}
this.writeWaitEmptyBuffer();
/**this.log.debug("########### WRITE byte ["+b.length+"] SYNC ...");*/
SemaphoreLock lock = this.lockPIPE.acquireThrowRuntime("write(b[])");
try {
this.bout.write(b);
if(this.asyncReadTask!=null) {
/**System.out.println("["+this.source+"] WRITE for READ COMPLETE 2");*/
this.asyncReadTask.complete(true);
}
/**this.log.debug("########### WRITE byte ["+b.length+"] SYNC OK");*/
/**System.out.println("########### WRITE byte SYNC OK ADD["+b.length+"] SIZE_ATTUALE["+this.bout.size()+"]");*/
}finally {
this.lockPIPE.release(lock, "write(b[])");
}
}
catch(IOException io) {
throw io;
}
catch(Throwable t) {
throw new IOException(t.getMessage(),t);
}
}
@Override
public void write(byte[] b, int off, int len) throws IOException{
try {
/**this.log.debug("########### WRITE byte ["+b.length+"] off:"+off+" len:"+len+" .....");*/
if(this.bout==null){
throw new IOException(getPrefixSource()+STREAM_ALREADY_CLOSED);
}
this.writeWaitEmptyBuffer();
/**this.log.debug("########### WRITE byte ["+b.length+"] off:"+off+" len:"+len+" SYNC ...");*/
SemaphoreLock lock = this.lockPIPE.acquireThrowRuntime("write(b[],off,len)");
try {
this.bout.write(b, off, len);
if(this.asyncReadTask!=null) {
/**System.out.println("["+this.source+"] WRITE for READ COMPLETE 3");*/
this.asyncReadTask.complete(true);
}
/**this.log.debug("########### WRITE byte ["+b.length+"] off:"+off+" len:"+len+" SYNC OK");*/
/**System.out.println("########### WRITE byte SYNC OK ADD["+b.length+"] SIZE_ATTUALE["+this.bout.size()+"]");*/
}finally {
this.lockPIPE.release(lock, "write(b[],off,len)");
}
}
catch(IOException io) {
throw io;
}
catch(Throwable t) {
throw new IOException(t.getMessage(),t);
}
}
}