NotifierStreamingHandler.java
/*
* GovWay - A customizable API Gateway
* https://govway.org
*
* Copyright (c) 2005-2024 Link.it srl (https://link.it).
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License version 3, as published by
* the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
*/
package org.openspcoop2.pdd.core.handlers.notifier.engine;
import java.io.File;
import java.io.FileOutputStream;
import java.util.List;
import java.util.Map;
import org.openspcoop2.core.id.IDSoggetto;
import org.openspcoop2.core.transazioni.constants.TipoMessaggio;
import org.openspcoop2.pdd.config.OpenSPCoop2Properties;
import org.openspcoop2.utils.UtilsException;
import org.openspcoop2.utils.io.notifier.unblocked.AbstractStreamingHandler;
import org.openspcoop2.utils.io.notifier.unblocked.ResultStreamingHandler;
import org.slf4j.Logger;
/**
* NotifierStreamingHandler
*
* @author Poli Andrea (poli@link.it)
* @author $Author$
* @version $Rev$, $Date$
*/
public class NotifierStreamingHandler extends AbstractStreamingHandler {
private Throwable exception = null;
private String error = null;
private String idTransazione;
private TipoMessaggio tipoMessaggio;
private Map<String, List<String>> headerTrasporto;
private long idDumpConfigurazione;
private String contentType;
private NotifierCallback notifierCallback;
private IDSoggetto dominio;
public NotifierStreamingHandler(NotifierCallback notifierCallback, String idTransazione, TipoMessaggio tipoMessaggio,
Map<String, List<String>> headerTrasporto,
long idDumpConfigurazione,
String contentType, Logger log,
IDSoggetto dominio) throws Exception{
super(log, OpenSPCoop2Properties.getInstance().getDumpNonRealtimeInMemoryThreshold());
this.notifierCallback = notifierCallback; // Per i log
this.idTransazione = idTransazione;
this.tipoMessaggio = tipoMessaggio;
this.headerTrasporto = headerTrasporto;
this.contentType = contentType;
this.idDumpConfigurazione = idDumpConfigurazione;
this.dominio = dominio;
}
@Override
public boolean isPrematureEnd() throws UtilsException {
return (this.exception!=null);
}
@Override
public String getError() {
return this.error;
}
@Override
public Throwable getException() {
return this.exception;
}
@Override
public ResultStreamingHandler call() throws UtilsException {
try{
NotifierResultStreamingHandler result = new NotifierResultStreamingHandler();
OpenSPCoop2Properties op2Properties = OpenSPCoop2Properties.getInstance();
if(op2Properties.isDumpNonRealtimeDatabaseMode()){
this.notifierCallback.debug("Save on database.....");
// save on database
NotifierDump notifierDump = NotifierDump.getInstance();
this.notifierCallback.debug("Get Instance.....");
int executeUpdate = notifierDump.saveOnDatabase(this.notifierCallback, this.idTransazione, this.tipoMessaggio,
this.headerTrasporto,
this.idDumpConfigurazione, this.contentType, this, this.dominio);
this.notifierCallback.debug("Execute: "+executeUpdate);
result.setSaveOnFileSystem(false);
result.setExecuteUpdateRow(executeUpdate);
this.notifierCallback.debug("Save on database execute with row: "+executeUpdate);
}
else{
this.notifierCallback.debug("Save on fs.....");
// save on fs
// directory
File fDir =op2Properties.getDumpNonRealtimeRepository();
if(fDir.exists()==false){
throw new Exception("Directory ["+fDir.getAbsolutePath()+"] not exists");
}
if(fDir.canRead()==false){
throw new Exception("Directory ["+fDir.getAbsolutePath()+"] not readable");
}
if(fDir.canWrite()==false){
throw new Exception("Directory ["+fDir.getAbsolutePath()+"] not writable");
}
// messaggio
File f = new File(fDir, this.idTransazione+"_"+this.tipoMessaggio.toString()+".bin");
FileOutputStream fout = null;
//org.apache.commons.io.output.NullOutputStream fout = null;
try{
fout = new FileOutputStream(f);
//fout = new org.apache.commons.io.output.NullOutputStream();
// lettura e scrittura su file
byte[]buffer = new byte[4096];
int letti = 0;
while( (letti=this.read(buffer)) != -1 ){
fout.write(buffer, 0, letti);
}
}finally{
try{
if(fout!=null){
fout.flush();
}
}catch(Exception eClose){
// close
}
try{
if(fout!=null){
fout.close();
}
}catch(Exception eClose){
// close
}
}
// Risposta
result.setSaveOnFileSystem(true);
result.setFile(f);
this.notifierCallback.debug("Save on fs execute: "+f.getAbsolutePath());
}
return result;
}catch(Throwable e){
//this.notifierCallback.error("ERRORE HANDLER STREAMING :"+e.getMessage(),e);
this.exception = e;
this.error = e.getMessage();
throw new UtilsException(this.error,this.exception);
}
}
}