NotifierInputStream.java

/*
 * GovWay - A customizable API Gateway 
 * https://govway.org
 * 
 * Copyright (c) 2005-2024 Link.it srl (https://link.it). 
 * 
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU General Public License version 3, as published by
 * the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
 *
 */
package org.openspcoop2.utils.io.notifier;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.slf4j.Logger;

import com.sun.xml.messaging.saaj.packaging.mime.internet.ContentType;
import com.sun.xml.messaging.saaj.packaging.mime.internet.ParseException;

/**
 * NotifierInputStream
 *
 * @author Poli Andrea (apoli@link.it)
 * @author $Author$
 * @version $Rev$, $Date$
 */
public class NotifierInputStream extends InputStream {

	
	/* ****** VARIABLE ******* */
	
	/**
	 * Original Input Stream
	 */
	private InputStream is;	
	/**
	 * Original Input Stream Consumed
	 */
	private boolean isOriginalInputStreamConsumed;
	
	/**
	 * Current position in this input stream
	 */
	private int currentReadPositionFromStream = 0;
	
	/**
	 * Content type of the stream
	 */
	private ContentType contentType = null;
	
	/**
	 * Indication if the buffer is enabled
	 */
	private boolean bufferEnabled = false;
	public boolean isBufferEnabled() {
		return this.bufferEnabled;
	}

	/**
	 * Buffer
	 */
	private ByteArrayOutputStream buffer = null;
	/**
	 * Bytes present in the stream after the completion of reading
	 */
	private byte[] contentCompleteReadedFromStream = null;	
	
	/**
	 * StreamingHandler
	 */
	private Map<String,StreamingHandler> streamingHandlers = new HashMap<String, StreamingHandler>();
	private List<String> streamingHandlersIds = new ArrayList<>(); // Per preservare l'ordine di inserimento

	
	/**
	 * throwStreamingHandlerException
	 */
	private boolean throwStreamingHandlerException = false;
	
	/**
	 * InputStream is closed
	 */
	private boolean isClosed = false;
	
	
	/**
	 * Logger
	 */
	private Logger log = null;
	
	
	/* ****** CONSTRUCTOR ******* */
	
	public NotifierInputStream(InputStream is,String contentType,NotifierInputStreamParams params) throws IOException, ParseException{
		
		// Set content type of the stream
		//try{
		if(contentType!=null){
			this.contentType = new ContentType(contentType);
		}
		//}catch(Exception e){
		//	throw new IOException(e.getMessage(),e);
		//}
		
		// Set original input stream
		if(is==null){
			throw new ParseException("Original InputStream not defined in args");
		}
		this.is = is;
		//System.out.println("@@@@@@@@ NotifierInputStream: "+is.getClass().getName());
				
		// Initialize Streaming Handler List
		if(params.sizeStreamingHandlers()>0){
			for (String streamingHandlerId : params.getStreamingHandlerIds()) {
				try{
					//System.out.println("@@@@@@@@ INIT HANDLERS");
					this.streamingHandlers.put(streamingHandlerId, params.getStreamingHandler(streamingHandlerId));
					this.streamingHandlersIds.add(streamingHandlerId);
				}catch(Exception e){
					throw new IOException("Streaming Handler initialization failed (id:"+streamingHandlerId+")"); 
				}
			}
		}
		
		// throwStreamingHandlerException
		this.throwStreamingHandlerException = params.isThrowStreamingHandlerException();
		
		// Log
		this.log = params.getLog();
		
		// Buffering
		if(params.isBufferEnabled()){
			this.setONBuffering();
		}
		
	}
	
	
	
	
	
	
	/* ****** GET ******* */

	public ContentType getContentType() {
		return this.contentType;
	}


	
	
	
	
	/* ******* INPUT STREAM INTERFACE METHODS ******* */
	
	@Override
	public int read(byte[] b) throws IOException {
		return this.read(b, 0, b.length);
	}
	
	
	@Override
	public int read(byte[] b, int off, int len) throws IOException { 
		return this.read_engine(b, off, len, true);
	}
	
//	java.io.FileOutputStream fout = null;
//	java.io.File f = null;
//	private void debug(byte[] b, int off, int letti){
//		try{
//			if(this.fout==null){
//				this.f = java.io.File.createTempFile("NotifierInputStream", "tmp");
//				this.fout = new java.io.FileOutputStream(this.f);
//				this.fout.write(b, off, letti);
//			}
//		}catch(Exception e){
//			System.out.println("@@@@@@@@ DEBUG ERROR:"+e.getMessage());
//		}
//	}
//	private void printFile(){
//		try{
//			this.fout.flush();
//			this.fout.close();
//		}catch(Exception e){
//			System.out.println("@@@@@@@@ FILE ERROR:"+e.getMessage());
//		}
//		System.out.println("@@@@@@@@ FILE["+this.f.getAbsolutePath()+"]");
//	}
	
	private int read_engine(byte[] b, int off, int len, boolean incrementCurrentReadPositionFromStream) throws IOException {
		
//		if(!incrementCurrentReadPositionFromStream){
//			System.out.println("@@@@@@@@ READ FROM SERIALIZE b["+b.length+"] offset["+off+"] length["+len+"]");
//		}
//		else{
//			System.out.println("@@@@@@@@ READ b["+b.length+"] offset["+off+"] length["+len+"]");
//		}
		
		int readBytes = 0;
		int offset = off;
		try{
			
			if(this.isOriginalInputStreamConsumed){
			
				//System.out.println("@@@@@@@@ IS COMPLITED.... ");
				
				// The stream was completely consumed and the bytes are stored in the variable 'contentReadFromStream'
				if(this.contentCompleteReadedFromStream!=null){
					
//					System.out.println("@@@@@@@@ IS COMPLITED currentReadPositionFromStream["+this.currentReadPositionFromStream
//							+"] contentCompleteReadedFromStream["+this.contentCompleteReadedFromStream.length+"].... ");
					
					if(this.currentReadPositionFromStream>=this.contentCompleteReadedFromStream.length){
						// stream completed
//						System.out.println("@@@@@@@@ IS COMPLITED RETURN -1");
//						this.printFile();
						return -1; 
					}
					else{
						int position = this.currentReadPositionFromStream;
						while( (position<this.contentCompleteReadedFromStream.length) && (readBytes<len) ){
							
							if(offset>=b.length){
								throw new IndexOutOfBoundsException("Offset: "+offset+" , byte[] length: "+b.length);
							}
							
							b[offset] = this.contentCompleteReadedFromStream[position];
							readBytes++;
							offset++;
						}
//						System.out.println("@@@@@@@@ IS COMPLITED RETURN "+readBytes+" BYTES");
//						this.debug(b, offset, readBytes);
						return readBytes;
					}
					
				} 
				
				// The stream has already been completely read. The error may be due to the use of the methods serialize without buffering enabled
				else{
					//throw new IOException("The stream has already been completely read. The error may be due to the use of the methods serialize without buffering enabled");
					// NOTE: the engine can call read method more times ...
					return -1;
				}
				
			}
			
			// Reading in the progress of the stream
			else{
								
				// performRead: enable dispatching to streaming handlers
				readBytes = performReadBytes(b,off,len);
				
				// stream completed
				if(readBytes == -1){
					//System.out.println("@@@@@@@@ IS complite ");
					this.isOriginalInputStreamConsumed = true;
				}
				
				// if enabled buffering, the byte read is saved
				if(this.bufferEnabled){
					if(this.isOriginalInputStreamConsumed){
						//System.out.println("@@@@@@@@ IS finalize! ");
						this.finalizeBuffer();
					}else{
						//System.out.println("@@@@@@@@ write offset["+offset+"] readBytes["+readBytes+"] ");
						this.buffer.write(b, offset, readBytes);
					}
				}
				
//				// Check if exists more bytes (questi codice non dovrebbe servire)
//				int bytesMaxRead = len-off;
//				if(readBytes<bytesMaxRead){
//					//System.out.println("@@@@@@@@ Check if exists more bytes ...");
//					int byteRead = read_engine(true);
//					//System.out.println("@@@@@@@@ Check if exists more bytes, return: "+byteRead);
//					if(byteRead!=-1){
//						//System.out.println("@@@@@@@@  Exists more byte, set return at position ["+readBytes+"]");
//						b[readBytes]=(byte)byteRead;
//						readBytes++;
//					}
//				}
				
				//System.out.println("@@@@@@@@ Return dopo Perform "+readBytes);
//				if(readBytes == -1){
//					this.printFile();
//				}else{
//					this.debug(b, offset, readBytes);
//				}			
				return readBytes;
			}
		}finally{
			if(incrementCurrentReadPositionFromStream){
				this.currentReadPositionFromStream=this.currentReadPositionFromStream+readBytes;
			}
		}
	}
	
	private int performReadBytes(byte[] b, int off, int len) throws IOException {
		
		//System.out.println("@@@@@@@@ performReadBytes ...");
		
		int readBytes = 0;
		
		if(this.is==null){
			//System.out.println("@@@@@@@@ return -1 Stream is null ...");
			readBytes = -1;
		}
		else{
			readBytes = this.is.read(b,off,len);
		}
		//System.out.println("@@@@@@@@ ["+readBytes+"] bytes read ...");
		
		// enable dispatching to streaming handlers
		ByteArrayOutputStream bout = null;
		for(String streamingHandlerId : this.streamingHandlersIds) {
			//System.out.println("@@@@@@@@ Streaming handler ["+this.streamingHandlersIds.size()+"]");
			StreamingHandler streamingHandler = this.streamingHandlers.get(streamingHandlerId);
			try{
				if(readBytes==-1){
					//System.out.println("@@@@@@@@ return -1 Dispatching end...");
					streamingHandler.end();
				}
				else{
					if(bout==null){
						bout = new ByteArrayOutputStream();
						bout.write(b, off, readBytes);
						bout.flush();
						bout.close();
					}
					//System.out.println("@@@@@@@@ return bytes "+bout.size()+" Dispatching ...");
					streamingHandler.feed(bout.toByteArray());
				}
			}catch(Throwable e){
				if(this.log!=null){
					this.log.error("["+streamingHandlerId+"] error occurs: "+e.getMessage(),e);
				}
				if(this.throwStreamingHandlerException){
					throw new IOException("["+streamingHandlerId+"] "+e.getMessage());
				}
			}
		}
				
		return readBytes;
	}
	


	
	@Override
	public int read() throws IOException {
		return this.read_engine(true);
	}
	private int read_engine(boolean incrementCurrentReadPositionFromStream) throws IOException {
		try{
			
//			if(!incrementCurrentReadPositionFromStream){
//				System.out.println("@@@@@@@@ READ FROM SERIALIZE");
//			}
//			else{
//				System.out.println("@@@@@@@@ READ");
//			}
			
			if(this.isOriginalInputStreamConsumed){
			
				//System.out.println("@@@@@@@@ IS COMPLITED SINGLE BYTE.... ");
				
				// The stream was completely consumed and the bytes are stored in the variable 'contentReadFromStream'
				if(this.contentCompleteReadedFromStream!=null){
					
//					System.out.println("@@@@@@@@ IS COMPLITED SINGLE BYTE currentReadPositionFromStream["+this.currentReadPositionFromStream
//							+"] contentCompleteReadedFromStream["+this.contentCompleteReadedFromStream.length+"].... ");
					
					if(this.currentReadPositionFromStream>=this.contentCompleteReadedFromStream.length){
						// stream completed
//						System.out.println("@@@@@@@@ IS COMPLITED SINGLE BYTE RETURN -1");
//						this.printFile();
						return -1; 
					}
					else{
//						System.out.println("@@@@@@@@ IS COMPLITED RETURN SINGLE BYTE ("+this.contentCompleteReadedFromStream[this.currentReadPositionFromStream]+") ");
//						this.debug(new byte[]{this.contentCompleteReadedFromStream[this.currentReadPositionFromStream]}, 0, 1);
						return this.contentCompleteReadedFromStream[this.currentReadPositionFromStream];
					}
					
				} 
				
				// The stream has already been completely read. The error may be due to the use of the methods serialize without buffering enabled
				else{
					//throw new IOException("The stream has already been completely read. The error may be due to the use of the methods serialize without buffering enabled");
					// NOTE: the engine can call read method more times ...
					return -1;
				}
				
			}
			
			// Reading in the progress of the stream
			else{
								
				// performRead: enable dispatching to streaming handlers
				int byteRead = performRead();
			
				// stream completed
				if(byteRead == -1){
					//System.out.println("@@@@@@@@ IS complite SINGLE BYTE ");
					this.isOriginalInputStreamConsumed = true;
				}
				
				// if enabled buffering, the byte read is saved
				if(this.bufferEnabled){
					if(this.isOriginalInputStreamConsumed){
						//System.out.println("@@@@@@@@ IS finalize! SINGLE BYTE");
						this.finalizeBuffer();
					}else{
						//System.out.println("@@@@@@@@ write SINGLE BYTE["+byteRead+"] ");
						this.buffer.write(byteRead);
					}
				}
				
				//System.out.println("@@@@@@@@ Return dopo Perform SINGLE BYTE "+byteRead);
//				if(byteRead == -1){
//					this.printFile();
//				}else{
//					this.debug(new byte[]{(byte)byteRead}, 0, 1);
//				}
				return byteRead;
			}
		}finally{
			if(incrementCurrentReadPositionFromStream){
				this.currentReadPositionFromStream++;
			}
		}
	}
	
	private int performRead() throws IOException {
		int b = this.is.read();

		// enable dispatching to streaming handlers
		for(String streamingHandlerId : this.streamingHandlersIds) {
			StreamingHandler streamingHandler = this.streamingHandlers.get(streamingHandlerId);
			try{
				if(b==-1){
					//System.out.println("@@@@@@@@ return -1 Dispatching end ...");
					streamingHandler.end();
				}
				else{
					//System.out.println("@@@@@@@@ return un byte Dispatching ...");
					streamingHandler.feed((byte)b);
				}
			}catch(Throwable e){
				if(this.log!=null){
					this.log.error("["+streamingHandlerId+"] error occurs: "+e.getMessage(),e);
				}
				if(this.throwStreamingHandlerException){
					throw new IOException("["+streamingHandlerId+"] "+e.getMessage());
				}
			}
		}
		return b;
	}
	
	@Override
	public void close() throws IOException {
		
		if(this.isClosed==false){
		
			if(this.is!=null)
				this.is.close();
			
			IOException streamingHandlerException = null;
			
			// enable dispatching to streaming handlers
			for(String streamingHandlerId : this.streamingHandlersIds) {
				StreamingHandler streamingHandler = this.streamingHandlers.get(streamingHandlerId);
				try{
					streamingHandler.closeResources();
				}catch(Throwable e){
					if(this.log!=null){
						this.log.error("["+streamingHandlerId+"] error occurs: "+e.getMessage(),e);
					}
					if(this.throwStreamingHandlerException){
						if(streamingHandlerException==null){
							// throw the first exception occurs
							streamingHandlerException = new IOException("["+streamingHandlerId+"] "+e.getMessage());
						}
					}
				}
			}
					
			this.isClosed = true;
			
			if(streamingHandlerException!=null){
				throw streamingHandlerException;
			}
		}
	}
	
	
	
	
	
	
	/* ******* HANDLERS ******* */
	
	/**
	 * adds a Streaming handler to our list
	 * @param handler handler to add
	 * @throws IOException 
	 */
	public void addStreamingHandler(StreamingHandler handler) throws IOException {
		
		//System.out.println("@@@@@@@@ addStreamingHandler currentReadPositionFromStream["+this.currentReadPositionFromStream+"] COMPLITED["+this.isCompleted+"]... ");
		
		// the option of buffering can be enabled only if the stream has not yet been accessed
		if(this.currentReadPositionFromStream>0){
			if(this.bufferEnabled==false)
				throw new IOException("You can not add handler after the stream has been accessed with buffering disabled");
			else {
				
				String id = handler.getID();
				if(this.streamingHandlersIds.contains(id)){
					throw new IOException("StreamingHandler with id ["+id+"] already exists");
				}
				this.streamingHandlers.put(id, handler);
				this.streamingHandlersIds.add(id);
								
				if(this.isOriginalInputStreamConsumed){
					//System.out.println("@@@@@@@@ addStreamingHandler FEED ["+this.contentCompleteReadedFromStream.length+"]");
					handler.feed(this.contentCompleteReadedFromStream);
					
					//System.out.println("@@@@@@@@ addStreamingHandler END");
					handler.end();
				}
				else{
					//System.out.println("@@@@@@@@ addStreamingHandler FEED ["+this.buffer.size()+"]");
					handler.feed(this.buffer.toByteArray());
				}
			}
		}
		
	}
	
	/**
	 * Given a StreamingHandler class, returns the corresponding handler, if one
	 * Useful in case of more handlers registered to this class
	 * @param clazz - the class which the needed handler belongs
	 * @return corresponding handler if there is one, otherwise null
	 * @throws IOException 
	 */
	@SuppressWarnings("unchecked")
	public <T extends StreamingHandler> T getFirstStreamingHandlerByType(Class<T> clazz) throws IOException {
		for(String streamingHandlerId : this.streamingHandlersIds) {
			StreamingHandler streamingHandler = this.streamingHandlers.get(streamingHandlerId);
			if (streamingHandler.getClass().equals(clazz)) 
				return (T) streamingHandler;
		}
		throw new IOException("StreamingHandler with type ["+clazz.getName()+"] not exists");
	}

	public StreamingHandler getStreamingHandler(String id) throws IOException{
		if(this.streamingHandlersIds.contains(id)){
			return this.streamingHandlers.get(id);
		}
		else{
			throw new IOException("StreamingHandler with id ["+id+"] not exists");
		}
	}
	

	
	

	
	
	/* ******* BUFFER ******* */
	
	public void setONBuffering() throws IOException {
		
		// the option of buffering can be enabled only if the stream has not yet been accessed
		if(this.currentReadPositionFromStream>0){
			throw new IOException("You can not enable buffering after the stream has been accessed");
		}

		if(this.bufferEnabled){
			throw new IOException("The buffering is already enabled");
		}
		
		// enable buffering
		this.bufferEnabled = true;
		this.buffer = new ByteArrayOutputStream();

	}
	
	public void setOFFBuffering() throws IOException {
		this.setOFFBuffering(true);
	}
	public void setOFFBuffering(boolean releaseBufferReaded) throws IOException {
		
		//System.out.println("@@@@@@@@ setOFFBuffering("+releaseBufferReaded+")");
		
		if(this.bufferEnabled==false){
			throw new IOException("The buffering is not enabled");
		}
		
		// disable buffer
		this.bufferEnabled = false;
		if(releaseBufferReaded){
			//System.out.println("@@@@@@@@ setOFFBuffering("+releaseBufferReaded+") rilascio");
			this.contentCompleteReadedFromStream = null;
			if(this.buffer!=null){
				this.buffer.close();
				this.buffer = null;
				//System.out.println("@@@@@@@@ setOFFBuffering("+releaseBufferReaded+") rilasciato");
			}
		}

	}
	
	private void finalizeBuffer() throws IOException{
		
		if(this.bufferEnabled==false){
			throw new IOException("BufferMode is not enabled");
		}
		
		if(this.contentCompleteReadedFromStream!=null){
			throw new IOException("bufferingComplete already invoked");
		}
		
		this.buffer.flush();
		this.buffer.close();
		this.contentCompleteReadedFromStream=this.buffer.toByteArray();
		this.buffer=null; // G.C.
		
	}
	
	
	
	
	
	
	
	
	
	
	/* ******* SERIALIZE ******* */
	
	public void serialize(OutputStream out) throws IOException{
		this.serializeEngine(out,false);
	}
	
	public void serializeAndConsume(OutputStream out) throws IOException{
		this.serializeEngine(out,true);
	}
	
	public byte[] serialize() throws IOException{
		ByteArrayOutputStream bout = new ByteArrayOutputStream();
		this.serializeEngine(bout,false);
		bout.flush();
		bout.close();
		return bout.toByteArray();
	}
	
	public byte[] serializeAndConsume() throws IOException{
		ByteArrayOutputStream bout = new ByteArrayOutputStream();
		this.serializeEngine(bout,true);
		bout.flush();
		bout.close();
		return bout.toByteArray();
	}
	
	private void serializeEngine(OutputStream out,boolean consume) throws IOException{
		
		//System.out.println("@@@@@@@@ serializeEngine");
		
		// If the stream is already saved return byte array
		//if(this.bufferEnabled && this.contentCompleteReadedFromStream!=null){
		if(this.contentCompleteReadedFromStream!=null){
			out.write(this.contentCompleteReadedFromStream);
			return;
		}
		
		// If buffering is enabled, but the stream is not completely consumed, write the bytes stored in the buffer in the output stream
		//if(this.bufferEnabled && this.contentCompleteReadedFromStream==null){
		if(this.buffer!=null){
			this.buffer.flush();
			if(this.buffer.size()>0){
				out.write(this.buffer.toByteArray());
			}
		}
		
		if(consume){
			if(this.bufferEnabled){
				this.setOFFBuffering();
			}
			else if(this.buffer!=null){
				this.buffer.close();
				this.buffer = null;
			}
		}
		
		// Conclude to consume the stream remaining
		// If buffering is enabled all bytes read will stored in the buffer
		byte[] buffer = new byte[1024];
		int byteRead = this.read_engine(buffer, 0, buffer.length, false);
		while ( byteRead != -1 ){
			out.write(buffer,0,byteRead);
			byteRead = this.read_engine(buffer, 0, buffer.length, false);
		}
	}
	

	
	

}