NotifierInputStream.java

  1. /*
  2.  * GovWay - A customizable API Gateway
  3.  * https://govway.org
  4.  *
  5.  * Copyright (c) 2005-2025 Link.it srl (https://link.it).
  6.  *
  7.  * This program is free software: you can redistribute it and/or modify
  8.  * it under the terms of the GNU General Public License version 3, as published by
  9.  * the Free Software Foundation.
  10.  *
  11.  * This program is distributed in the hope that it will be useful,
  12.  * but WITHOUT ANY WARRANTY; without even the implied warranty of
  13.  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  14.  * GNU General Public License for more details.
  15.  *
  16.  * You should have received a copy of the GNU General Public License
  17.  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
  18.  *
  19.  */
  20. package org.openspcoop2.utils.io.notifier;

  21. import java.io.ByteArrayOutputStream;
  22. import java.io.IOException;
  23. import java.io.InputStream;
  24. import java.io.OutputStream;
  25. import java.util.ArrayList;
  26. import java.util.HashMap;
  27. import java.util.List;
  28. import java.util.Map;

  29. import org.slf4j.Logger;

  30. import com.sun.xml.messaging.saaj.packaging.mime.internet.ContentType;
  31. import com.sun.xml.messaging.saaj.packaging.mime.internet.ParseException;

  32. /**
  33.  * NotifierInputStream
  34.  *
  35.  * @author Poli Andrea (apoli@link.it)
  36.  * @author $Author$
  37.  * @version $Rev$, $Date$
  38.  */
  39. public class NotifierInputStream extends InputStream {

  40.    
  41.     /* ****** VARIABLE ******* */
  42.    
  43.     /**
  44.      * Original Input Stream
  45.      */
  46.     private InputStream is;
  47.     /**
  48.      * Original Input Stream Consumed
  49.      */
  50.     private boolean isOriginalInputStreamConsumed;
  51.    
  52.     /**
  53.      * Current position in this input stream
  54.      */
  55.     private int currentReadPositionFromStream = 0;
  56.    
  57.     /**
  58.      * Content type of the stream
  59.      */
  60.     private ContentType contentType = null;
  61.    
  62.     /**
  63.      * Indication if the buffer is enabled
  64.      */
  65.     private boolean bufferEnabled = false;
  66.     public boolean isBufferEnabled() {
  67.         return this.bufferEnabled;
  68.     }

  69.     /**
  70.      * Buffer
  71.      */
  72.     private ByteArrayOutputStream buffer = null;
  73.     /**
  74.      * Bytes present in the stream after the completion of reading
  75.      */
  76.     private byte[] contentCompleteReadedFromStream = null;  
  77.    
  78.     /**
  79.      * StreamingHandler
  80.      */
  81.     private Map<String,StreamingHandler> streamingHandlers = new HashMap<String, StreamingHandler>();
  82.     private List<String> streamingHandlersIds = new ArrayList<>(); // Per preservare l'ordine di inserimento

  83.    
  84.     /**
  85.      * throwStreamingHandlerException
  86.      */
  87.     private boolean throwStreamingHandlerException = false;
  88.    
  89.     /**
  90.      * InputStream is closed
  91.      */
  92.     private boolean isClosed = false;
  93.    
  94.    
  95.     /**
  96.      * Logger
  97.      */
  98.     private Logger log = null;
  99.    
  100.    
  101.     /* ****** CONSTRUCTOR ******* */
  102.    
  103.     public NotifierInputStream(InputStream is,String contentType,NotifierInputStreamParams params) throws IOException, ParseException{
  104.        
  105.         // Set content type of the stream
  106.         //try{
  107.         if(contentType!=null){
  108.             this.contentType = new ContentType(contentType);
  109.         }
  110.         //}catch(Exception e){
  111.         //  throw new IOException(e.getMessage(),e);
  112.         //}
  113.        
  114.         // Set original input stream
  115.         if(is==null){
  116.             throw new ParseException("Original InputStream not defined in args");
  117.         }
  118.         this.is = is;
  119.         //System.out.println("@@@@@@@@ NotifierInputStream: "+is.getClass().getName());
  120.                
  121.         // Initialize Streaming Handler List
  122.         if(params.sizeStreamingHandlers()>0){
  123.             for (String streamingHandlerId : params.getStreamingHandlerIds()) {
  124.                 try{
  125.                     //System.out.println("@@@@@@@@ INIT HANDLERS");
  126.                     this.streamingHandlers.put(streamingHandlerId, params.getStreamingHandler(streamingHandlerId));
  127.                     this.streamingHandlersIds.add(streamingHandlerId);
  128.                 }catch(Exception e){
  129.                     throw new IOException("Streaming Handler initialization failed (id:"+streamingHandlerId+")");
  130.                 }
  131.             }
  132.         }
  133.        
  134.         // throwStreamingHandlerException
  135.         this.throwStreamingHandlerException = params.isThrowStreamingHandlerException();
  136.        
  137.         // Log
  138.         this.log = params.getLog();
  139.        
  140.         // Buffering
  141.         if(params.isBufferEnabled()){
  142.             this.setONBuffering();
  143.         }
  144.        
  145.     }
  146.    
  147.    
  148.    
  149.    
  150.    
  151.    
  152.     /* ****** GET ******* */

  153.     public ContentType getContentType() {
  154.         return this.contentType;
  155.     }


  156.    
  157.    
  158.    
  159.    
  160.     /* ******* INPUT STREAM INTERFACE METHODS ******* */
  161.    
  162.     @Override
  163.     public int read(byte[] b) throws IOException {
  164.         return this.read(b, 0, b.length);
  165.     }
  166.    
  167.    
  168.     @Override
  169.     public int read(byte[] b, int off, int len) throws IOException {
  170.         return this.read_engine(b, off, len, true);
  171.     }
  172.    
  173. //  java.io.FileOutputStream fout = null;
  174. //  java.io.File f = null;
  175. //  private void debug(byte[] b, int off, int letti){
  176. //      try{
  177. //          if(this.fout==null){
  178. //              this.f = java.io.File.createTempFile("NotifierInputStream", "tmp");
  179. //              this.fout = new java.io.FileOutputStream(this.f);
  180. //              this.fout.write(b, off, letti);
  181. //          }
  182. //      }catch(Exception e){
  183. //          System.out.println("@@@@@@@@ DEBUG ERROR:"+e.getMessage());
  184. //      }
  185. //  }
  186. //  private void printFile(){
  187. //      try{
  188. //          this.fout.flush();
  189. //          this.fout.close();
  190. //      }catch(Exception e){
  191. //          System.out.println("@@@@@@@@ FILE ERROR:"+e.getMessage());
  192. //      }
  193. //      System.out.println("@@@@@@@@ FILE["+this.f.getAbsolutePath()+"]");
  194. //  }
  195.    
  196.     private int read_engine(byte[] b, int off, int len, boolean incrementCurrentReadPositionFromStream) throws IOException {
  197.        
  198. //      if(!incrementCurrentReadPositionFromStream){
  199. //          System.out.println("@@@@@@@@ READ FROM SERIALIZE b["+b.length+"] offset["+off+"] length["+len+"]");
  200. //      }
  201. //      else{
  202. //          System.out.println("@@@@@@@@ READ b["+b.length+"] offset["+off+"] length["+len+"]");
  203. //      }
  204.        
  205.         int readBytes = 0;
  206.         int offset = off;
  207.         try{
  208.            
  209.             if(this.isOriginalInputStreamConsumed){
  210.            
  211.                 //System.out.println("@@@@@@@@ IS COMPLITED.... ");
  212.                
  213.                 // The stream was completely consumed and the bytes are stored in the variable 'contentReadFromStream'
  214.                 if(this.contentCompleteReadedFromStream!=null){
  215.                    
  216. //                  System.out.println("@@@@@@@@ IS COMPLITED currentReadPositionFromStream["+this.currentReadPositionFromStream
  217. //                          +"] contentCompleteReadedFromStream["+this.contentCompleteReadedFromStream.length+"].... ");
  218.                    
  219.                     if(this.currentReadPositionFromStream>=this.contentCompleteReadedFromStream.length){
  220.                         // stream completed
  221. //                      System.out.println("@@@@@@@@ IS COMPLITED RETURN -1");
  222. //                      this.printFile();
  223.                         return -1;
  224.                     }
  225.                     else{
  226.                         int position = this.currentReadPositionFromStream;
  227.                         while( (position<this.contentCompleteReadedFromStream.length) && (readBytes<len) ){
  228.                            
  229.                             if(offset>=b.length){
  230.                                 throw new IndexOutOfBoundsException("Offset: "+offset+" , byte[] length: "+b.length);
  231.                             }
  232.                            
  233.                             b[offset] = this.contentCompleteReadedFromStream[position];
  234.                             readBytes++;
  235.                             offset++;
  236.                         }
  237. //                      System.out.println("@@@@@@@@ IS COMPLITED RETURN "+readBytes+" BYTES");
  238. //                      this.debug(b, offset, readBytes);
  239.                         return readBytes;
  240.                     }
  241.                    
  242.                 }
  243.                
  244.                 // The stream has already been completely read. The error may be due to the use of the methods serialize without buffering enabled
  245.                 else{
  246.                     //throw new IOException("The stream has already been completely read. The error may be due to the use of the methods serialize without buffering enabled");
  247.                     // NOTE: the engine can call read method more times ...
  248.                     return -1;
  249.                 }
  250.                
  251.             }
  252.            
  253.             // Reading in the progress of the stream
  254.             else{
  255.                                
  256.                 // performRead: enable dispatching to streaming handlers
  257.                 readBytes = performReadBytes(b,off,len);
  258.                
  259.                 // stream completed
  260.                 if(readBytes == -1){
  261.                     //System.out.println("@@@@@@@@ IS complite ");
  262.                     this.isOriginalInputStreamConsumed = true;
  263.                 }
  264.                
  265.                 // if enabled buffering, the byte read is saved
  266.                 if(this.bufferEnabled){
  267.                     if(this.isOriginalInputStreamConsumed){
  268.                         //System.out.println("@@@@@@@@ IS finalize! ");
  269.                         this.finalizeBuffer();
  270.                     }else{
  271.                         //System.out.println("@@@@@@@@ write offset["+offset+"] readBytes["+readBytes+"] ");
  272.                         this.buffer.write(b, offset, readBytes);
  273.                     }
  274.                 }
  275.                
  276. //              // Check if exists more bytes (questi codice non dovrebbe servire)
  277. //              int bytesMaxRead = len-off;
  278. //              if(readBytes<bytesMaxRead){
  279. //                  //System.out.println("@@@@@@@@ Check if exists more bytes ...");
  280. //                  int byteRead = read_engine(true);
  281. //                  //System.out.println("@@@@@@@@ Check if exists more bytes, return: "+byteRead);
  282. //                  if(byteRead!=-1){
  283. //                      //System.out.println("@@@@@@@@  Exists more byte, set return at position ["+readBytes+"]");
  284. //                      b[readBytes]=(byte)byteRead;
  285. //                      readBytes++;
  286. //                  }
  287. //              }
  288.                
  289.                 //System.out.println("@@@@@@@@ Return dopo Perform "+readBytes);
  290. //              if(readBytes == -1){
  291. //                  this.printFile();
  292. //              }else{
  293. //                  this.debug(b, offset, readBytes);
  294. //              }          
  295.                 return readBytes;
  296.             }
  297.         }finally{
  298.             if(incrementCurrentReadPositionFromStream){
  299.                 this.currentReadPositionFromStream=this.currentReadPositionFromStream+readBytes;
  300.             }
  301.         }
  302.     }
  303.    
  304.     private int performReadBytes(byte[] b, int off, int len) throws IOException {
  305.        
  306.         //System.out.println("@@@@@@@@ performReadBytes ...");
  307.        
  308.         int readBytes = 0;
  309.        
  310.         if(this.is==null){
  311.             //System.out.println("@@@@@@@@ return -1 Stream is null ...");
  312.             readBytes = -1;
  313.         }
  314.         else{
  315.             readBytes = this.is.read(b,off,len);
  316.         }
  317.         //System.out.println("@@@@@@@@ ["+readBytes+"] bytes read ...");
  318.        
  319.         // enable dispatching to streaming handlers
  320.         ByteArrayOutputStream bout = null;
  321.         for(String streamingHandlerId : this.streamingHandlersIds) {
  322.             //System.out.println("@@@@@@@@ Streaming handler ["+this.streamingHandlersIds.size()+"]");
  323.             StreamingHandler streamingHandler = this.streamingHandlers.get(streamingHandlerId);
  324.             try{
  325.                 if(readBytes==-1){
  326.                     //System.out.println("@@@@@@@@ return -1 Dispatching end...");
  327.                     streamingHandler.end();
  328.                 }
  329.                 else{
  330.                     if(bout==null){
  331.                         bout = new ByteArrayOutputStream();
  332.                         bout.write(b, off, readBytes);
  333.                         bout.flush();
  334.                         bout.close();
  335.                     }
  336.                     //System.out.println("@@@@@@@@ return bytes "+bout.size()+" Dispatching ...");
  337.                     streamingHandler.feed(bout.toByteArray());
  338.                 }
  339.             }catch(Throwable e){
  340.                 if(this.log!=null){
  341.                     this.log.error("["+streamingHandlerId+"] error occurs: "+e.getMessage(),e);
  342.                 }
  343.                 if(this.throwStreamingHandlerException){
  344.                     throw new IOException("["+streamingHandlerId+"] "+e.getMessage());
  345.                 }
  346.             }
  347.         }
  348.                
  349.         return readBytes;
  350.     }
  351.    


  352.    
  353.     @Override
  354.     public int read() throws IOException {
  355.         return this.read_engine(true);
  356.     }
  357.     private int read_engine(boolean incrementCurrentReadPositionFromStream) throws IOException {
  358.         try{
  359.            
  360. //          if(!incrementCurrentReadPositionFromStream){
  361. //              System.out.println("@@@@@@@@ READ FROM SERIALIZE");
  362. //          }
  363. //          else{
  364. //              System.out.println("@@@@@@@@ READ");
  365. //          }
  366.            
  367.             if(this.isOriginalInputStreamConsumed){
  368.            
  369.                 //System.out.println("@@@@@@@@ IS COMPLITED SINGLE BYTE.... ");
  370.                
  371.                 // The stream was completely consumed and the bytes are stored in the variable 'contentReadFromStream'
  372.                 if(this.contentCompleteReadedFromStream!=null){
  373.                    
  374. //                  System.out.println("@@@@@@@@ IS COMPLITED SINGLE BYTE currentReadPositionFromStream["+this.currentReadPositionFromStream
  375. //                          +"] contentCompleteReadedFromStream["+this.contentCompleteReadedFromStream.length+"].... ");
  376.                    
  377.                     if(this.currentReadPositionFromStream>=this.contentCompleteReadedFromStream.length){
  378.                         // stream completed
  379. //                      System.out.println("@@@@@@@@ IS COMPLITED SINGLE BYTE RETURN -1");
  380. //                      this.printFile();
  381.                         return -1;
  382.                     }
  383.                     else{
  384. //                      System.out.println("@@@@@@@@ IS COMPLITED RETURN SINGLE BYTE ("+this.contentCompleteReadedFromStream[this.currentReadPositionFromStream]+") ");
  385. //                      this.debug(new byte[]{this.contentCompleteReadedFromStream[this.currentReadPositionFromStream]}, 0, 1);
  386.                         return this.contentCompleteReadedFromStream[this.currentReadPositionFromStream];
  387.                     }
  388.                    
  389.                 }
  390.                
  391.                 // The stream has already been completely read. The error may be due to the use of the methods serialize without buffering enabled
  392.                 else{
  393.                     //throw new IOException("The stream has already been completely read. The error may be due to the use of the methods serialize without buffering enabled");
  394.                     // NOTE: the engine can call read method more times ...
  395.                     return -1;
  396.                 }
  397.                
  398.             }
  399.            
  400.             // Reading in the progress of the stream
  401.             else{
  402.                                
  403.                 // performRead: enable dispatching to streaming handlers
  404.                 int byteRead = performRead();
  405.            
  406.                 // stream completed
  407.                 if(byteRead == -1){
  408.                     //System.out.println("@@@@@@@@ IS complite SINGLE BYTE ");
  409.                     this.isOriginalInputStreamConsumed = true;
  410.                 }
  411.                
  412.                 // if enabled buffering, the byte read is saved
  413.                 if(this.bufferEnabled){
  414.                     if(this.isOriginalInputStreamConsumed){
  415.                         //System.out.println("@@@@@@@@ IS finalize! SINGLE BYTE");
  416.                         this.finalizeBuffer();
  417.                     }else{
  418.                         //System.out.println("@@@@@@@@ write SINGLE BYTE["+byteRead+"] ");
  419.                         this.buffer.write(byteRead);
  420.                     }
  421.                 }
  422.                
  423.                 //System.out.println("@@@@@@@@ Return dopo Perform SINGLE BYTE "+byteRead);
  424. //              if(byteRead == -1){
  425. //                  this.printFile();
  426. //              }else{
  427. //                  this.debug(new byte[]{(byte)byteRead}, 0, 1);
  428. //              }
  429.                 return byteRead;
  430.             }
  431.         }finally{
  432.             if(incrementCurrentReadPositionFromStream){
  433.                 this.currentReadPositionFromStream++;
  434.             }
  435.         }
  436.     }
  437.    
  438.     private int performRead() throws IOException {
  439.         int b = this.is.read();

  440.         // enable dispatching to streaming handlers
  441.         for(String streamingHandlerId : this.streamingHandlersIds) {
  442.             StreamingHandler streamingHandler = this.streamingHandlers.get(streamingHandlerId);
  443.             try{
  444.                 if(b==-1){
  445.                     //System.out.println("@@@@@@@@ return -1 Dispatching end ...");
  446.                     streamingHandler.end();
  447.                 }
  448.                 else{
  449.                     //System.out.println("@@@@@@@@ return un byte Dispatching ...");
  450.                     streamingHandler.feed((byte)b);
  451.                 }
  452.             }catch(Throwable e){
  453.                 if(this.log!=null){
  454.                     this.log.error("["+streamingHandlerId+"] error occurs: "+e.getMessage(),e);
  455.                 }
  456.                 if(this.throwStreamingHandlerException){
  457.                     throw new IOException("["+streamingHandlerId+"] "+e.getMessage());
  458.                 }
  459.             }
  460.         }
  461.         return b;
  462.     }
  463.    
  464.     @Override
  465.     public void close() throws IOException {
  466.        
  467.         if(this.isClosed==false){
  468.        
  469.             if(this.is!=null)
  470.                 this.is.close();
  471.            
  472.             IOException streamingHandlerException = null;
  473.            
  474.             // enable dispatching to streaming handlers
  475.             for(String streamingHandlerId : this.streamingHandlersIds) {
  476.                 StreamingHandler streamingHandler = this.streamingHandlers.get(streamingHandlerId);
  477.                 try{
  478.                     streamingHandler.closeResources();
  479.                 }catch(Throwable e){
  480.                     if(this.log!=null){
  481.                         this.log.error("["+streamingHandlerId+"] error occurs: "+e.getMessage(),e);
  482.                     }
  483.                     if(this.throwStreamingHandlerException){
  484.                         if(streamingHandlerException==null){
  485.                             // throw the first exception occurs
  486.                             streamingHandlerException = new IOException("["+streamingHandlerId+"] "+e.getMessage());
  487.                         }
  488.                     }
  489.                 }
  490.             }
  491.                    
  492.             this.isClosed = true;
  493.            
  494.             if(streamingHandlerException!=null){
  495.                 throw streamingHandlerException;
  496.             }
  497.         }
  498.     }
  499.    
  500.    
  501.    
  502.    
  503.    
  504.    
  505.     /* ******* HANDLERS ******* */
  506.    
  507.     /**
  508.      * adds a Streaming handler to our list
  509.      * @param handler handler to add
  510.      * @throws IOException
  511.      */
  512.     public void addStreamingHandler(StreamingHandler handler) throws IOException {
  513.        
  514.         //System.out.println("@@@@@@@@ addStreamingHandler currentReadPositionFromStream["+this.currentReadPositionFromStream+"] COMPLITED["+this.isCompleted+"]... ");
  515.        
  516.         // the option of buffering can be enabled only if the stream has not yet been accessed
  517.         if(this.currentReadPositionFromStream>0){
  518.             if(this.bufferEnabled==false)
  519.                 throw new IOException("You can not add handler after the stream has been accessed with buffering disabled");
  520.             else {
  521.                
  522.                 String id = handler.getID();
  523.                 if(this.streamingHandlersIds.contains(id)){
  524.                     throw new IOException("StreamingHandler with id ["+id+"] already exists");
  525.                 }
  526.                 this.streamingHandlers.put(id, handler);
  527.                 this.streamingHandlersIds.add(id);
  528.                                
  529.                 if(this.isOriginalInputStreamConsumed){
  530.                     //System.out.println("@@@@@@@@ addStreamingHandler FEED ["+this.contentCompleteReadedFromStream.length+"]");
  531.                     handler.feed(this.contentCompleteReadedFromStream);
  532.                    
  533.                     //System.out.println("@@@@@@@@ addStreamingHandler END");
  534.                     handler.end();
  535.                 }
  536.                 else{
  537.                     //System.out.println("@@@@@@@@ addStreamingHandler FEED ["+this.buffer.size()+"]");
  538.                     handler.feed(this.buffer.toByteArray());
  539.                 }
  540.             }
  541.         }
  542.        
  543.     }
  544.    
  545.     /**
  546.      * Given a StreamingHandler class, returns the corresponding handler, if one
  547.      * Useful in case of more handlers registered to this class
  548.      * @param clazz - the class which the needed handler belongs
  549.      * @return corresponding handler if there is one, otherwise null
  550.      * @throws IOException
  551.      */
  552.     @SuppressWarnings("unchecked")
  553.     public <T extends StreamingHandler> T getFirstStreamingHandlerByType(Class<T> clazz) throws IOException {
  554.         for(String streamingHandlerId : this.streamingHandlersIds) {
  555.             StreamingHandler streamingHandler = this.streamingHandlers.get(streamingHandlerId);
  556.             if (streamingHandler.getClass().equals(clazz))
  557.                 return (T) streamingHandler;
  558.         }
  559.         throw new IOException("StreamingHandler with type ["+clazz.getName()+"] not exists");
  560.     }

  561.     public StreamingHandler getStreamingHandler(String id) throws IOException{
  562.         if(this.streamingHandlersIds.contains(id)){
  563.             return this.streamingHandlers.get(id);
  564.         }
  565.         else{
  566.             throw new IOException("StreamingHandler with id ["+id+"] not exists");
  567.         }
  568.     }
  569.    

  570.    
  571.    

  572.    
  573.    
  574.     /* ******* BUFFER ******* */
  575.    
  576.     public void setONBuffering() throws IOException {
  577.        
  578.         // the option of buffering can be enabled only if the stream has not yet been accessed
  579.         if(this.currentReadPositionFromStream>0){
  580.             throw new IOException("You can not enable buffering after the stream has been accessed");
  581.         }

  582.         if(this.bufferEnabled){
  583.             throw new IOException("The buffering is already enabled");
  584.         }
  585.        
  586.         // enable buffering
  587.         this.bufferEnabled = true;
  588.         this.buffer = new ByteArrayOutputStream();

  589.     }
  590.    
  591.     public void setOFFBuffering() throws IOException {
  592.         this.setOFFBuffering(true);
  593.     }
  594.     public void setOFFBuffering(boolean releaseBufferReaded) throws IOException {
  595.        
  596.         //System.out.println("@@@@@@@@ setOFFBuffering("+releaseBufferReaded+")");
  597.        
  598.         if(this.bufferEnabled==false){
  599.             throw new IOException("The buffering is not enabled");
  600.         }
  601.        
  602.         // disable buffer
  603.         this.bufferEnabled = false;
  604.         if(releaseBufferReaded){
  605.             //System.out.println("@@@@@@@@ setOFFBuffering("+releaseBufferReaded+") rilascio");
  606.             this.contentCompleteReadedFromStream = null;
  607.             if(this.buffer!=null){
  608.                 this.buffer.close();
  609.                 this.buffer = null;
  610.                 //System.out.println("@@@@@@@@ setOFFBuffering("+releaseBufferReaded+") rilasciato");
  611.             }
  612.         }

  613.     }
  614.    
  615.     private void finalizeBuffer() throws IOException{
  616.        
  617.         if(this.bufferEnabled==false){
  618.             throw new IOException("BufferMode is not enabled");
  619.         }
  620.        
  621.         if(this.contentCompleteReadedFromStream!=null){
  622.             throw new IOException("bufferingComplete already invoked");
  623.         }
  624.        
  625.         this.buffer.flush();
  626.         this.buffer.close();
  627.         this.contentCompleteReadedFromStream=this.buffer.toByteArray();
  628.         this.buffer=null; // G.C.
  629.        
  630.     }
  631.    
  632.    
  633.    
  634.    
  635.    
  636.    
  637.    
  638.    
  639.    
  640.    
  641.     /* ******* SERIALIZE ******* */
  642.    
  643.     public void serialize(OutputStream out) throws IOException{
  644.         this.serializeEngine(out,false);
  645.     }
  646.    
  647.     public void serializeAndConsume(OutputStream out) throws IOException{
  648.         this.serializeEngine(out,true);
  649.     }
  650.    
  651.     public byte[] serialize() throws IOException{
  652.         ByteArrayOutputStream bout = new ByteArrayOutputStream();
  653.         this.serializeEngine(bout,false);
  654.         bout.flush();
  655.         bout.close();
  656.         return bout.toByteArray();
  657.     }
  658.    
  659.     public byte[] serializeAndConsume() throws IOException{
  660.         ByteArrayOutputStream bout = new ByteArrayOutputStream();
  661.         this.serializeEngine(bout,true);
  662.         bout.flush();
  663.         bout.close();
  664.         return bout.toByteArray();
  665.     }
  666.    
  667.     private void serializeEngine(OutputStream out,boolean consume) throws IOException{
  668.        
  669.         //System.out.println("@@@@@@@@ serializeEngine");
  670.        
  671.         // If the stream is already saved return byte array
  672.         //if(this.bufferEnabled && this.contentCompleteReadedFromStream!=null){
  673.         if(this.contentCompleteReadedFromStream!=null){
  674.             out.write(this.contentCompleteReadedFromStream);
  675.             return;
  676.         }
  677.        
  678.         // If buffering is enabled, but the stream is not completely consumed, write the bytes stored in the buffer in the output stream
  679.         //if(this.bufferEnabled && this.contentCompleteReadedFromStream==null){
  680.         if(this.buffer!=null){
  681.             this.buffer.flush();
  682.             if(this.buffer.size()>0){
  683.                 out.write(this.buffer.toByteArray());
  684.             }
  685.         }
  686.        
  687.         if(consume){
  688.             if(this.bufferEnabled){
  689.                 this.setOFFBuffering();
  690.             }
  691.             else if(this.buffer!=null){
  692.                 this.buffer.close();
  693.                 this.buffer = null;
  694.             }
  695.         }
  696.        
  697.         // Conclude to consume the stream remaining
  698.         // If buffering is enabled all bytes read will stored in the buffer
  699.         byte[] buffer = new byte[1024];
  700.         int byteRead = this.read_engine(buffer, 0, buffer.length, false);
  701.         while ( byteRead != -1 ){
  702.             out.write(buffer,0,byteRead);
  703.             byteRead = this.read_engine(buffer, 0, buffer.length, false);
  704.         }
  705.     }
  706.    

  707.    
  708.    

  709. }