NotifierInputStream.java
- /*
- * GovWay - A customizable API Gateway
- * https://govway.org
- *
- * Copyright (c) 2005-2025 Link.it srl (https://link.it).
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU General Public License version 3, as published by
- * the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- *
- */
- package org.openspcoop2.utils.io.notifier;
- import java.io.ByteArrayOutputStream;
- import java.io.IOException;
- import java.io.InputStream;
- import java.io.OutputStream;
- import java.util.ArrayList;
- import java.util.HashMap;
- import java.util.List;
- import java.util.Map;
- import org.slf4j.Logger;
- import com.sun.xml.messaging.saaj.packaging.mime.internet.ContentType;
- import com.sun.xml.messaging.saaj.packaging.mime.internet.ParseException;
- /**
- * NotifierInputStream
- *
- * @author Poli Andrea (apoli@link.it)
- * @author $Author$
- * @version $Rev$, $Date$
- */
- public class NotifierInputStream extends InputStream {
-
- /* ****** VARIABLE ******* */
-
- /**
- * Original Input Stream
- */
- private InputStream is;
- /**
- * Original Input Stream Consumed
- */
- private boolean isOriginalInputStreamConsumed;
-
- /**
- * Current position in this input stream
- */
- private int currentReadPositionFromStream = 0;
-
- /**
- * Content type of the stream
- */
- private ContentType contentType = null;
-
- /**
- * Indication if the buffer is enabled
- */
- private boolean bufferEnabled = false;
- public boolean isBufferEnabled() {
- return this.bufferEnabled;
- }
- /**
- * Buffer
- */
- private ByteArrayOutputStream buffer = null;
- /**
- * Bytes present in the stream after the completion of reading
- */
- private byte[] contentCompleteReadedFromStream = null;
-
- /**
- * StreamingHandler
- */
- private Map<String,StreamingHandler> streamingHandlers = new HashMap<String, StreamingHandler>();
- private List<String> streamingHandlersIds = new ArrayList<>(); // Per preservare l'ordine di inserimento
-
- /**
- * throwStreamingHandlerException
- */
- private boolean throwStreamingHandlerException = false;
-
- /**
- * InputStream is closed
- */
- private boolean isClosed = false;
-
-
- /**
- * Logger
- */
- private Logger log = null;
-
-
- /* ****** CONSTRUCTOR ******* */
-
- public NotifierInputStream(InputStream is,String contentType,NotifierInputStreamParams params) throws IOException, ParseException{
-
- // Set content type of the stream
- //try{
- if(contentType!=null){
- this.contentType = new ContentType(contentType);
- }
- //}catch(Exception e){
- // throw new IOException(e.getMessage(),e);
- //}
-
- // Set original input stream
- if(is==null){
- throw new ParseException("Original InputStream not defined in args");
- }
- this.is = is;
- //System.out.println("@@@@@@@@ NotifierInputStream: "+is.getClass().getName());
-
- // Initialize Streaming Handler List
- if(params.sizeStreamingHandlers()>0){
- for (String streamingHandlerId : params.getStreamingHandlerIds()) {
- try{
- //System.out.println("@@@@@@@@ INIT HANDLERS");
- this.streamingHandlers.put(streamingHandlerId, params.getStreamingHandler(streamingHandlerId));
- this.streamingHandlersIds.add(streamingHandlerId);
- }catch(Exception e){
- throw new IOException("Streaming Handler initialization failed (id:"+streamingHandlerId+")");
- }
- }
- }
-
- // throwStreamingHandlerException
- this.throwStreamingHandlerException = params.isThrowStreamingHandlerException();
-
- // Log
- this.log = params.getLog();
-
- // Buffering
- if(params.isBufferEnabled()){
- this.setONBuffering();
- }
-
- }
-
-
-
-
-
-
- /* ****** GET ******* */
- public ContentType getContentType() {
- return this.contentType;
- }
-
-
-
-
- /* ******* INPUT STREAM INTERFACE METHODS ******* */
-
- @Override
- public int read(byte[] b) throws IOException {
- return this.read(b, 0, b.length);
- }
-
-
- @Override
- public int read(byte[] b, int off, int len) throws IOException {
- return this.read_engine(b, off, len, true);
- }
-
- // java.io.FileOutputStream fout = null;
- // java.io.File f = null;
- // private void debug(byte[] b, int off, int letti){
- // try{
- // if(this.fout==null){
- // this.f = java.io.File.createTempFile("NotifierInputStream", "tmp");
- // this.fout = new java.io.FileOutputStream(this.f);
- // this.fout.write(b, off, letti);
- // }
- // }catch(Exception e){
- // System.out.println("@@@@@@@@ DEBUG ERROR:"+e.getMessage());
- // }
- // }
- // private void printFile(){
- // try{
- // this.fout.flush();
- // this.fout.close();
- // }catch(Exception e){
- // System.out.println("@@@@@@@@ FILE ERROR:"+e.getMessage());
- // }
- // System.out.println("@@@@@@@@ FILE["+this.f.getAbsolutePath()+"]");
- // }
-
- private int read_engine(byte[] b, int off, int len, boolean incrementCurrentReadPositionFromStream) throws IOException {
-
- // if(!incrementCurrentReadPositionFromStream){
- // System.out.println("@@@@@@@@ READ FROM SERIALIZE b["+b.length+"] offset["+off+"] length["+len+"]");
- // }
- // else{
- // System.out.println("@@@@@@@@ READ b["+b.length+"] offset["+off+"] length["+len+"]");
- // }
-
- int readBytes = 0;
- int offset = off;
- try{
-
- if(this.isOriginalInputStreamConsumed){
-
- //System.out.println("@@@@@@@@ IS COMPLITED.... ");
-
- // The stream was completely consumed and the bytes are stored in the variable 'contentReadFromStream'
- if(this.contentCompleteReadedFromStream!=null){
-
- // System.out.println("@@@@@@@@ IS COMPLITED currentReadPositionFromStream["+this.currentReadPositionFromStream
- // +"] contentCompleteReadedFromStream["+this.contentCompleteReadedFromStream.length+"].... ");
-
- if(this.currentReadPositionFromStream>=this.contentCompleteReadedFromStream.length){
- // stream completed
- // System.out.println("@@@@@@@@ IS COMPLITED RETURN -1");
- // this.printFile();
- return -1;
- }
- else{
- int position = this.currentReadPositionFromStream;
- while( (position<this.contentCompleteReadedFromStream.length) && (readBytes<len) ){
-
- if(offset>=b.length){
- throw new IndexOutOfBoundsException("Offset: "+offset+" , byte[] length: "+b.length);
- }
-
- b[offset] = this.contentCompleteReadedFromStream[position];
- readBytes++;
- offset++;
- }
- // System.out.println("@@@@@@@@ IS COMPLITED RETURN "+readBytes+" BYTES");
- // this.debug(b, offset, readBytes);
- return readBytes;
- }
-
- }
-
- // The stream has already been completely read. The error may be due to the use of the methods serialize without buffering enabled
- else{
- //throw new IOException("The stream has already been completely read. The error may be due to the use of the methods serialize without buffering enabled");
- // NOTE: the engine can call read method more times ...
- return -1;
- }
-
- }
-
- // Reading in the progress of the stream
- else{
-
- // performRead: enable dispatching to streaming handlers
- readBytes = performReadBytes(b,off,len);
-
- // stream completed
- if(readBytes == -1){
- //System.out.println("@@@@@@@@ IS complite ");
- this.isOriginalInputStreamConsumed = true;
- }
-
- // if enabled buffering, the byte read is saved
- if(this.bufferEnabled){
- if(this.isOriginalInputStreamConsumed){
- //System.out.println("@@@@@@@@ IS finalize! ");
- this.finalizeBuffer();
- }else{
- //System.out.println("@@@@@@@@ write offset["+offset+"] readBytes["+readBytes+"] ");
- this.buffer.write(b, offset, readBytes);
- }
- }
-
- // // Check if exists more bytes (questi codice non dovrebbe servire)
- // int bytesMaxRead = len-off;
- // if(readBytes<bytesMaxRead){
- // //System.out.println("@@@@@@@@ Check if exists more bytes ...");
- // int byteRead = read_engine(true);
- // //System.out.println("@@@@@@@@ Check if exists more bytes, return: "+byteRead);
- // if(byteRead!=-1){
- // //System.out.println("@@@@@@@@ Exists more byte, set return at position ["+readBytes+"]");
- // b[readBytes]=(byte)byteRead;
- // readBytes++;
- // }
- // }
-
- //System.out.println("@@@@@@@@ Return dopo Perform "+readBytes);
- // if(readBytes == -1){
- // this.printFile();
- // }else{
- // this.debug(b, offset, readBytes);
- // }
- return readBytes;
- }
- }finally{
- if(incrementCurrentReadPositionFromStream){
- this.currentReadPositionFromStream=this.currentReadPositionFromStream+readBytes;
- }
- }
- }
-
- private int performReadBytes(byte[] b, int off, int len) throws IOException {
-
- //System.out.println("@@@@@@@@ performReadBytes ...");
-
- int readBytes = 0;
-
- if(this.is==null){
- //System.out.println("@@@@@@@@ return -1 Stream is null ...");
- readBytes = -1;
- }
- else{
- readBytes = this.is.read(b,off,len);
- }
- //System.out.println("@@@@@@@@ ["+readBytes+"] bytes read ...");
-
- // enable dispatching to streaming handlers
- ByteArrayOutputStream bout = null;
- for(String streamingHandlerId : this.streamingHandlersIds) {
- //System.out.println("@@@@@@@@ Streaming handler ["+this.streamingHandlersIds.size()+"]");
- StreamingHandler streamingHandler = this.streamingHandlers.get(streamingHandlerId);
- try{
- if(readBytes==-1){
- //System.out.println("@@@@@@@@ return -1 Dispatching end...");
- streamingHandler.end();
- }
- else{
- if(bout==null){
- bout = new ByteArrayOutputStream();
- bout.write(b, off, readBytes);
- bout.flush();
- bout.close();
- }
- //System.out.println("@@@@@@@@ return bytes "+bout.size()+" Dispatching ...");
- streamingHandler.feed(bout.toByteArray());
- }
- }catch(Throwable e){
- if(this.log!=null){
- this.log.error("["+streamingHandlerId+"] error occurs: "+e.getMessage(),e);
- }
- if(this.throwStreamingHandlerException){
- throw new IOException("["+streamingHandlerId+"] "+e.getMessage());
- }
- }
- }
-
- return readBytes;
- }
-
-
- @Override
- public int read() throws IOException {
- return this.read_engine(true);
- }
- private int read_engine(boolean incrementCurrentReadPositionFromStream) throws IOException {
- try{
-
- // if(!incrementCurrentReadPositionFromStream){
- // System.out.println("@@@@@@@@ READ FROM SERIALIZE");
- // }
- // else{
- // System.out.println("@@@@@@@@ READ");
- // }
-
- if(this.isOriginalInputStreamConsumed){
-
- //System.out.println("@@@@@@@@ IS COMPLITED SINGLE BYTE.... ");
-
- // The stream was completely consumed and the bytes are stored in the variable 'contentReadFromStream'
- if(this.contentCompleteReadedFromStream!=null){
-
- // System.out.println("@@@@@@@@ IS COMPLITED SINGLE BYTE currentReadPositionFromStream["+this.currentReadPositionFromStream
- // +"] contentCompleteReadedFromStream["+this.contentCompleteReadedFromStream.length+"].... ");
-
- if(this.currentReadPositionFromStream>=this.contentCompleteReadedFromStream.length){
- // stream completed
- // System.out.println("@@@@@@@@ IS COMPLITED SINGLE BYTE RETURN -1");
- // this.printFile();
- return -1;
- }
- else{
- // System.out.println("@@@@@@@@ IS COMPLITED RETURN SINGLE BYTE ("+this.contentCompleteReadedFromStream[this.currentReadPositionFromStream]+") ");
- // this.debug(new byte[]{this.contentCompleteReadedFromStream[this.currentReadPositionFromStream]}, 0, 1);
- return this.contentCompleteReadedFromStream[this.currentReadPositionFromStream];
- }
-
- }
-
- // The stream has already been completely read. The error may be due to the use of the methods serialize without buffering enabled
- else{
- //throw new IOException("The stream has already been completely read. The error may be due to the use of the methods serialize without buffering enabled");
- // NOTE: the engine can call read method more times ...
- return -1;
- }
-
- }
-
- // Reading in the progress of the stream
- else{
-
- // performRead: enable dispatching to streaming handlers
- int byteRead = performRead();
-
- // stream completed
- if(byteRead == -1){
- //System.out.println("@@@@@@@@ IS complite SINGLE BYTE ");
- this.isOriginalInputStreamConsumed = true;
- }
-
- // if enabled buffering, the byte read is saved
- if(this.bufferEnabled){
- if(this.isOriginalInputStreamConsumed){
- //System.out.println("@@@@@@@@ IS finalize! SINGLE BYTE");
- this.finalizeBuffer();
- }else{
- //System.out.println("@@@@@@@@ write SINGLE BYTE["+byteRead+"] ");
- this.buffer.write(byteRead);
- }
- }
-
- //System.out.println("@@@@@@@@ Return dopo Perform SINGLE BYTE "+byteRead);
- // if(byteRead == -1){
- // this.printFile();
- // }else{
- // this.debug(new byte[]{(byte)byteRead}, 0, 1);
- // }
- return byteRead;
- }
- }finally{
- if(incrementCurrentReadPositionFromStream){
- this.currentReadPositionFromStream++;
- }
- }
- }
-
- private int performRead() throws IOException {
- int b = this.is.read();
- // enable dispatching to streaming handlers
- for(String streamingHandlerId : this.streamingHandlersIds) {
- StreamingHandler streamingHandler = this.streamingHandlers.get(streamingHandlerId);
- try{
- if(b==-1){
- //System.out.println("@@@@@@@@ return -1 Dispatching end ...");
- streamingHandler.end();
- }
- else{
- //System.out.println("@@@@@@@@ return un byte Dispatching ...");
- streamingHandler.feed((byte)b);
- }
- }catch(Throwable e){
- if(this.log!=null){
- this.log.error("["+streamingHandlerId+"] error occurs: "+e.getMessage(),e);
- }
- if(this.throwStreamingHandlerException){
- throw new IOException("["+streamingHandlerId+"] "+e.getMessage());
- }
- }
- }
- return b;
- }
-
- @Override
- public void close() throws IOException {
-
- if(this.isClosed==false){
-
- if(this.is!=null)
- this.is.close();
-
- IOException streamingHandlerException = null;
-
- // enable dispatching to streaming handlers
- for(String streamingHandlerId : this.streamingHandlersIds) {
- StreamingHandler streamingHandler = this.streamingHandlers.get(streamingHandlerId);
- try{
- streamingHandler.closeResources();
- }catch(Throwable e){
- if(this.log!=null){
- this.log.error("["+streamingHandlerId+"] error occurs: "+e.getMessage(),e);
- }
- if(this.throwStreamingHandlerException){
- if(streamingHandlerException==null){
- // throw the first exception occurs
- streamingHandlerException = new IOException("["+streamingHandlerId+"] "+e.getMessage());
- }
- }
- }
- }
-
- this.isClosed = true;
-
- if(streamingHandlerException!=null){
- throw streamingHandlerException;
- }
- }
- }
-
-
-
-
-
-
- /* ******* HANDLERS ******* */
-
- /**
- * adds a Streaming handler to our list
- * @param handler handler to add
- * @throws IOException
- */
- public void addStreamingHandler(StreamingHandler handler) throws IOException {
-
- //System.out.println("@@@@@@@@ addStreamingHandler currentReadPositionFromStream["+this.currentReadPositionFromStream+"] COMPLITED["+this.isCompleted+"]... ");
-
- // the option of buffering can be enabled only if the stream has not yet been accessed
- if(this.currentReadPositionFromStream>0){
- if(this.bufferEnabled==false)
- throw new IOException("You can not add handler after the stream has been accessed with buffering disabled");
- else {
-
- String id = handler.getID();
- if(this.streamingHandlersIds.contains(id)){
- throw new IOException("StreamingHandler with id ["+id+"] already exists");
- }
- this.streamingHandlers.put(id, handler);
- this.streamingHandlersIds.add(id);
-
- if(this.isOriginalInputStreamConsumed){
- //System.out.println("@@@@@@@@ addStreamingHandler FEED ["+this.contentCompleteReadedFromStream.length+"]");
- handler.feed(this.contentCompleteReadedFromStream);
-
- //System.out.println("@@@@@@@@ addStreamingHandler END");
- handler.end();
- }
- else{
- //System.out.println("@@@@@@@@ addStreamingHandler FEED ["+this.buffer.size()+"]");
- handler.feed(this.buffer.toByteArray());
- }
- }
- }
-
- }
-
- /**
- * Given a StreamingHandler class, returns the corresponding handler, if one
- * Useful in case of more handlers registered to this class
- * @param clazz - the class which the needed handler belongs
- * @return corresponding handler if there is one, otherwise null
- * @throws IOException
- */
- @SuppressWarnings("unchecked")
- public <T extends StreamingHandler> T getFirstStreamingHandlerByType(Class<T> clazz) throws IOException {
- for(String streamingHandlerId : this.streamingHandlersIds) {
- StreamingHandler streamingHandler = this.streamingHandlers.get(streamingHandlerId);
- if (streamingHandler.getClass().equals(clazz))
- return (T) streamingHandler;
- }
- throw new IOException("StreamingHandler with type ["+clazz.getName()+"] not exists");
- }
- public StreamingHandler getStreamingHandler(String id) throws IOException{
- if(this.streamingHandlersIds.contains(id)){
- return this.streamingHandlers.get(id);
- }
- else{
- throw new IOException("StreamingHandler with id ["+id+"] not exists");
- }
- }
-
-
-
-
-
- /* ******* BUFFER ******* */
-
- public void setONBuffering() throws IOException {
-
- // the option of buffering can be enabled only if the stream has not yet been accessed
- if(this.currentReadPositionFromStream>0){
- throw new IOException("You can not enable buffering after the stream has been accessed");
- }
- if(this.bufferEnabled){
- throw new IOException("The buffering is already enabled");
- }
-
- // enable buffering
- this.bufferEnabled = true;
- this.buffer = new ByteArrayOutputStream();
- }
-
- public void setOFFBuffering() throws IOException {
- this.setOFFBuffering(true);
- }
- public void setOFFBuffering(boolean releaseBufferReaded) throws IOException {
-
- //System.out.println("@@@@@@@@ setOFFBuffering("+releaseBufferReaded+")");
-
- if(this.bufferEnabled==false){
- throw new IOException("The buffering is not enabled");
- }
-
- // disable buffer
- this.bufferEnabled = false;
- if(releaseBufferReaded){
- //System.out.println("@@@@@@@@ setOFFBuffering("+releaseBufferReaded+") rilascio");
- this.contentCompleteReadedFromStream = null;
- if(this.buffer!=null){
- this.buffer.close();
- this.buffer = null;
- //System.out.println("@@@@@@@@ setOFFBuffering("+releaseBufferReaded+") rilasciato");
- }
- }
- }
-
- private void finalizeBuffer() throws IOException{
-
- if(this.bufferEnabled==false){
- throw new IOException("BufferMode is not enabled");
- }
-
- if(this.contentCompleteReadedFromStream!=null){
- throw new IOException("bufferingComplete already invoked");
- }
-
- this.buffer.flush();
- this.buffer.close();
- this.contentCompleteReadedFromStream=this.buffer.toByteArray();
- this.buffer=null; // G.C.
-
- }
-
-
-
-
-
-
-
-
-
-
- /* ******* SERIALIZE ******* */
-
- public void serialize(OutputStream out) throws IOException{
- this.serializeEngine(out,false);
- }
-
- public void serializeAndConsume(OutputStream out) throws IOException{
- this.serializeEngine(out,true);
- }
-
- public byte[] serialize() throws IOException{
- ByteArrayOutputStream bout = new ByteArrayOutputStream();
- this.serializeEngine(bout,false);
- bout.flush();
- bout.close();
- return bout.toByteArray();
- }
-
- public byte[] serializeAndConsume() throws IOException{
- ByteArrayOutputStream bout = new ByteArrayOutputStream();
- this.serializeEngine(bout,true);
- bout.flush();
- bout.close();
- return bout.toByteArray();
- }
-
- private void serializeEngine(OutputStream out,boolean consume) throws IOException{
-
- //System.out.println("@@@@@@@@ serializeEngine");
-
- // If the stream is already saved return byte array
- //if(this.bufferEnabled && this.contentCompleteReadedFromStream!=null){
- if(this.contentCompleteReadedFromStream!=null){
- out.write(this.contentCompleteReadedFromStream);
- return;
- }
-
- // If buffering is enabled, but the stream is not completely consumed, write the bytes stored in the buffer in the output stream
- //if(this.bufferEnabled && this.contentCompleteReadedFromStream==null){
- if(this.buffer!=null){
- this.buffer.flush();
- if(this.buffer.size()>0){
- out.write(this.buffer.toByteArray());
- }
- }
-
- if(consume){
- if(this.bufferEnabled){
- this.setOFFBuffering();
- }
- else if(this.buffer!=null){
- this.buffer.close();
- this.buffer = null;
- }
- }
-
- // Conclude to consume the stream remaining
- // If buffering is enabled all bytes read will stored in the buffer
- byte[] buffer = new byte[1024];
- int byteRead = this.read_engine(buffer, 0, buffer.length, false);
- while ( byteRead != -1 ){
- out.write(buffer,0,byteRead);
- byteRead = this.read_engine(buffer, 0, buffer.length, false);
- }
- }
-
-
-
- }