GestorePolicyAttiveInMemory.java

/*
 * GovWay - A customizable API Gateway 
 * https://govway.org
 * 
 * Copyright (c) 2005-2024 Link.it srl (https://link.it).
 * 
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU General Public License version 3, as published by
 * the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
 *
 */
package org.openspcoop2.pdd.core.controllo_traffico.policy.driver;

import java.io.File;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.zip.ZipEntry;
import java.util.zip.ZipFile;
import java.util.zip.ZipOutputStream;

import javax.activation.FileDataSource;

import org.openspcoop2.core.controllo_traffico.AttivazionePolicy;
import org.openspcoop2.core.controllo_traffico.ConfigurazionePolicy;
import org.openspcoop2.core.controllo_traffico.beans.ActivePolicy;
import org.openspcoop2.core.controllo_traffico.beans.ConfigurazioneControlloTraffico;
import org.openspcoop2.core.controllo_traffico.beans.DatiCollezionati;
import org.openspcoop2.core.controllo_traffico.beans.DatiTransazione;
import org.openspcoop2.core.controllo_traffico.beans.IDUnivocoGroupByPolicy;
import org.openspcoop2.core.controllo_traffico.beans.UniqueIdentifierUtilities;
import org.openspcoop2.core.controllo_traffico.constants.TipoRisorsa;
import org.openspcoop2.core.controllo_traffico.driver.IGestorePolicyAttive;
import org.openspcoop2.core.controllo_traffico.driver.IPolicyGroupByActiveThreads;
import org.openspcoop2.core.controllo_traffico.driver.IPolicyGroupByActiveThreadsInMemory;
import org.openspcoop2.core.controllo_traffico.driver.PolicyException;
import org.openspcoop2.core.controllo_traffico.driver.PolicyGroupByActiveThreadsType;
import org.openspcoop2.core.controllo_traffico.driver.PolicyNotFoundException;
import org.openspcoop2.core.controllo_traffico.driver.PolicyShutdownException;
import org.openspcoop2.core.controllo_traffico.utils.serializer.JaxbDeserializer;
import org.openspcoop2.core.controllo_traffico.utils.serializer.JaxbSerializer;
import org.openspcoop2.pdd.config.DynamicClusterManager;
import org.openspcoop2.pdd.config.OpenSPCoop2Properties;
import org.openspcoop2.pdd.core.controllo_traffico.policy.driver.hazelcast.HazelcastManager;
import org.openspcoop2.pdd.core.controllo_traffico.policy.driver.hazelcast.PolicyGroupByActiveThreadsDistributedLocalCache;
import org.openspcoop2.pdd.core.controllo_traffico.policy.driver.hazelcast.PolicyGroupByActiveThreadsDistributedNearCache;
import org.openspcoop2.pdd.core.controllo_traffico.policy.driver.hazelcast.PolicyGroupByActiveThreadsDistributedNearCacheWithoutEntryProcessorPutAsync;
import org.openspcoop2.pdd.core.controllo_traffico.policy.driver.hazelcast.PolicyGroupByActiveThreadsDistributedNearCacheWithoutEntryProcessorPutSync;
import org.openspcoop2.pdd.core.controllo_traffico.policy.driver.hazelcast.PolicyGroupByActiveThreadsDistributedNoCache;
import org.openspcoop2.pdd.core.controllo_traffico.policy.driver.hazelcast.PolicyGroupByActiveThreadsDistributedReplicatedMap;
import org.openspcoop2.pdd.core.controllo_traffico.policy.driver.redisson.PolicyGroupByActiveThreadsDistributedRedis;
import org.openspcoop2.pdd.core.controllo_traffico.policy.driver.redisson.RedissonManager;
import org.openspcoop2.pdd.services.OpenSPCoop2Startup;
import org.openspcoop2.protocol.basic.Costanti;
import org.openspcoop2.protocol.sdk.state.IState;
import org.openspcoop2.utils.Utilities;
import org.openspcoop2.utils.io.ZipUtilities;
import org.openspcoop2.utils.resources.FileSystemUtilities;
import org.redisson.api.RedissonClient;
import org.slf4j.Logger;

/**     
 * GestorePolicyAttiveInMemory
 *
 * @author Poli Andrea (poli@link.it)
 * @author $Author$
 * @version $Rev$, $Date$
 */
public class GestorePolicyAttiveInMemory implements IGestorePolicyAttive {


	/** 
	 * Threads allocati sulle Policy. La chiave รจ l'active-policy-id
	 **/
	private Map<String, IPolicyGroupByActiveThreadsInMemory> mapActiveThreadsPolicy = 
			new HashMap<String, IPolicyGroupByActiveThreadsInMemory>();
	private final org.openspcoop2.utils.Semaphore lock = new org.openspcoop2.utils.Semaphore("GestorePolicyAttiveInMemory");
	
	private static final String IMPL_DESCR = "Implementazione InMemory IGestorePolicyAttive";
	public static String getImplDescr(){
		return IMPL_DESCR;
	}
	
	private Logger log;
	private PolicyGroupByActiveThreadsType type;
	private boolean useCountersWithLock = false;
	@Override
	public void initialize(Logger log, boolean isStartupGovWay, PolicyGroupByActiveThreadsType type, Object ... params) throws PolicyException{
		this.log = log;
		this.type = type;
		if(this.type==null) {
			this.type = PolicyGroupByActiveThreadsType.LOCAL;
		}
		
		switch (this.type) {
		case LOCAL:
			break;
		case LOCAL_DIVIDED_BY_NODES:
			if(!DynamicClusterManager.isInitialized()) {
				try {
					DynamicClusterManager.initStaticInstance();
					DynamicClusterManager.getInstance().setRateLimitingGestioneCluster(true);
					DynamicClusterManager.getInstance().register(log);
					OpenSPCoop2Startup.startTimerClusterDinamicoThread();
				}catch(Exception e) {
					throw new PolicyException(e.getMessage(),e);
				}
			}
			else {
				try {
					DynamicClusterManager.getInstance().setRateLimitingGestioneCluster(true);
				}catch(Exception e) {
					throw new PolicyException(e.getMessage(),e);
				}
			}
			break;
			
		case DATABASE:
			break;
		
		case HAZELCAST_MAP:
		case HAZELCAST_NEAR_CACHE:
		case HAZELCAST_NEAR_CACHE_UNSAFE_ASYNC_MAP:
		case HAZELCAST_NEAR_CACHE_UNSAFE_SYNC_MAP:
		case HAZELCAST_PNCOUNTER:
		case HAZELCAST_ATOMIC_LONG:
		case HAZELCAST_ATOMIC_LONG_ASYNC:
		case HAZELCAST_REPLICATED_MAP:
			HazelcastManager.getInstance(this.type);
			break;
		case HAZELCAST_LOCAL_CACHE:
			HazelcastManager.getInstance(this.type);
			if(!OpenSPCoop2Startup.isStartedTimerClusteredRateLimitingLocalCache()) {
				try {
					OpenSPCoop2Startup.startTimerClusteredRateLimitingLocalCache(this);
				}catch(Exception e) {
					throw new PolicyException(e.getMessage(),e);
				}
			}
			break;

		case REDISSON_MAP:
		case REDISSON_ATOMIC_LONG:
		case REDISSON_LONGADDER:
			
			boolean throwInitializingException = true;
			if(isStartupGovWay) {
				throwInitializingException = OpenSPCoop2Properties.getInstance().isControlloTrafficoGestorePolicyInMemoryRedis_throwExceptionIfRedisNotReady();
			}
			
			try {
				RedissonManager.getRedissonClient(throwInitializingException);
			}catch(Exception e) {
				throw new PolicyException(e.getMessage(),e);
			}
			break;
		}
		
		if(this.type.isHazelcastCounters() || this.type.isRedisCounters()) {
			this.useCountersWithLock=OpenSPCoop2Properties.getInstance().isControlloTrafficoGestorePolicyInMemoryRemoteCountersUseLocalLock();
		}
	}
	
	private boolean isStop = false;
	
	
	@Override
	public PolicyGroupByActiveThreadsType getType() {
		return this.type;
	}
	
	@Override
	public IPolicyGroupByActiveThreads getActiveThreadsPolicy(ActivePolicy activePolicy,DatiTransazione datiTransazione, Object state) throws PolicyShutdownException,PolicyException {		
		
		String uniqueIdMap = UniqueIdentifierUtilities.getUniqueId(activePolicy.getInstanceConfiguration());
				
		//synchronized (this.mapActiveThreadsPolicy) {
		this.lock.acquireThrowRuntime("getActiveThreadsPolicy(ActivePolicy)");
		try {
			
			if(this.isStop){
				throw new PolicyShutdownException("Policy Manager shutdown");
			}
			
			IPolicyGroupByActiveThreadsInMemory active = null;
			//System.out.println("@@@ getActiveThreadsPolicy["+uniqueIdMap+"] contains["+this.mapActiveThreadsPolicy.containsKey(uniqueIdMap)+"]...");
			if(this.mapActiveThreadsPolicy.containsKey(uniqueIdMap)){
				active = this.mapActiveThreadsPolicy.get(uniqueIdMap);
				//System.out.println("@@@ getActiveThreadsPolicy["+uniqueIdMap+"] GET");
			}
			else{
				active = newPolicyGroupByActiveThreadsInMemory(activePolicy, uniqueIdMap, datiTransazione, state);
				this.mapActiveThreadsPolicy.put(uniqueIdMap, active);
				//System.out.println("@@@ getActiveThreadsPolicy["+uniqueIdMap+"] CREATE");
			}
			return active;
		}finally {
			this.lock.release("getActiveThreadsPolicy(ActivePolicy)");
		}
	}
	@Override
	public IPolicyGroupByActiveThreads getActiveThreadsPolicy(String uniqueIdMap) throws PolicyShutdownException,PolicyException,PolicyNotFoundException { // usata per la remove
		//synchronized (this.mapActiveThreadsPolicy) {
		this.lock.acquireThrowRuntime("getActiveThreadsPolicy(uniqueIdMap)");
		try {
			
			if(this.isStop){
				throw new PolicyShutdownException("Policy Manager shutdown");
			}
			
			IPolicyGroupByActiveThreads active = null;
			//System.out.println("@@@ getActiveThreadsPolicy["+uniqueIdMap+"] contains["+this.mapActiveThreadsPolicy.containsKey(uniqueIdMap)+"]...");
			if(this.mapActiveThreadsPolicy.containsKey(uniqueIdMap)){
				active = this.mapActiveThreadsPolicy.get(uniqueIdMap);
				//System.out.println("@@@ getActiveThreadsPolicy["+uniqueIdMap+"] GET");
			}
			else{
				throw new PolicyNotFoundException("ActivePolicy ["+uniqueIdMap+"] notFound");
			}
			return active;
		}finally{
			this.lock.release("getActiveThreadsPolicy(uniqueIdMap)");
		}
	}
	
	@Override
	public long sizeActivePolicyThreads(boolean sum) throws PolicyShutdownException,PolicyException{
		//synchronized (this.mapActiveThreadsPolicy) {
		this.lock.acquireThrowRuntime("sizeActivePolicyThreads");
		try {
			
			if(this.isStop){
				throw new PolicyShutdownException("Policy Manager shutdown");
			}
			
			if(sum){
				long sumLong = 0;
				if(this.mapActiveThreadsPolicy!=null && !this.mapActiveThreadsPolicy.isEmpty()) {
					for (String idPolicy : this.mapActiveThreadsPolicy.keySet()) {
						sumLong = sumLong +this.mapActiveThreadsPolicy.get(idPolicy).getActiveThreads();
					}
				}
				return sumLong;
			}else{
				return this.mapActiveThreadsPolicy.size();
			}
		}finally {
			this.lock.release("sizeActivePolicyThreads");
		}
	}
	
	@Override
	public String printKeysPolicy(String separator) throws PolicyShutdownException, PolicyException{
		//synchronized (this.mapActiveThreadsPolicy) {
		this.lock.acquireThrowRuntime("printKeysPolicy");
		try {
			
			if(this.isStop){
				throw new PolicyShutdownException("Policy Manager shutdown");
			}
			
			StringBuilder bf = new StringBuilder();
			if(this.mapActiveThreadsPolicy!=null && !this.mapActiveThreadsPolicy.isEmpty()) {
				int i = 0;
				for (String idPolicy : this.mapActiveThreadsPolicy.keySet()) {
					String key = idPolicy;
					if(i>0){
						bf.append(separator);
					}
					bf.append("Cache-"+this.type+"["+i+"]=["+key+"]");
					i++;
				}
			}
			return bf.toString();
		}finally {
			this.lock.release("printKeysPolicy");
		}
	}
	
	@Override
	public String printInfoPolicy(String id, String separatorGroups) throws PolicyShutdownException,PolicyException,PolicyNotFoundException{
		IPolicyGroupByActiveThreadsInMemory activeThreads = (IPolicyGroupByActiveThreadsInMemory) this.getActiveThreadsPolicy(id);	
		try{
			return activeThreads.printInfos(this.log, separatorGroups);
		}catch(Exception e){
			throw new PolicyException(e.getMessage(),e);
		}
	}
	
	@Override
	public void removeActiveThreadsPolicy(String idActivePolicy) throws PolicyShutdownException, PolicyException{
		//synchronized (this.mapActiveThreadsPolicy) {
		this.lock.acquireThrowRuntime("removeActiveThreadsPolicy");
		try {
			
			if(this.isStop){
				throw new PolicyShutdownException("Policy Manager shutdown");
			}
			
			if(this.mapActiveThreadsPolicy.containsKey(idActivePolicy)){
				this.mapActiveThreadsPolicy.remove(idActivePolicy);
			}
		}finally {
			this.lock.release("removeActiveThreadsPolicy");
		}
	}
	
	@Override
	public void removeActiveThreadsPolicyUnsafe(String idActivePolicy) throws PolicyShutdownException,PolicyException{
		if(this.isStop){
			throw new PolicyShutdownException("Policy Manager shutdown");
		}
		
		IPolicyGroupByActiveThreadsInMemory policy = this.mapActiveThreadsPolicy.remove(idActivePolicy);
		if(policy!=null) {
			try {
				policy.remove();
			}catch(Throwable e) {
				this.log.error("removeActiveThreadsPolicyUnsafe failed: "+e.getMessage(),e);
			}
		}
	}
	
	@Override
	public void removeAllActiveThreadsPolicy() throws PolicyShutdownException, PolicyException{
		//synchronized (this.mapActiveThreadsPolicy) {
		this.lock.acquireThrowRuntime("removeAllActiveThreadsPolicy");
		try {
			
			if(this.isStop){
				throw new PolicyShutdownException("Policy Manager shutdown");
			}
			
			this.mapActiveThreadsPolicy.clear();
		}finally {
			this.lock.release("removeAllActiveThreadsPolicy");
		}
	}
	
	@Override
	public void resetCountersActiveThreadsPolicy(String idActivePolicy) throws PolicyShutdownException, PolicyException{
		//synchronized (this.mapActiveThreadsPolicy) {
		this.lock.acquireThrowRuntime("resetCountersActiveThreadsPolicy");
		try {
			
			if(this.isStop){
				throw new PolicyShutdownException("Policy Manager shutdown");
			}
			
			if(this.mapActiveThreadsPolicy.containsKey(idActivePolicy)){
				this.mapActiveThreadsPolicy.get(idActivePolicy).resetCounters();
			}
		}finally {
			this.lock.release("resetCountersActiveThreadsPolicy");
		}
	}
	
	@Override
	public void resetCountersAllActiveThreadsPolicy() throws PolicyShutdownException, PolicyException{
		//synchronized (this.mapActiveThreadsPolicy) {
		this.lock.acquireThrowRuntime("resetCountersAllActiveThreadsPolicy");
		try {
			
			if(this.isStop){
				throw new PolicyShutdownException("Policy Manager shutdown");
			}
			
			if(this.mapActiveThreadsPolicy.size()>0){
				for (String key : this.mapActiveThreadsPolicy.keySet()) {
					this.mapActiveThreadsPolicy.get(key).resetCounters();
				}
			}
		}finally {
			this.lock.release("resetCountersAllActiveThreadsPolicy");
		}
	}
	
	public Set<Entry<String, IPolicyGroupByActiveThreadsInMemory>> entrySet() throws PolicyShutdownException, PolicyException{
		Set<Entry<String, IPolicyGroupByActiveThreadsInMemory>> activeThreadsPolicies;
		
		this.lock.acquireThrowRuntime("updateLocalCacheMap");
		try {
			
			if(this.isStop){
				throw new PolicyShutdownException("Policy Manager shutdown");
			}
			activeThreadsPolicies = this.mapActiveThreadsPolicy.entrySet();
		} finally {
			this.lock.release("updateLocalCacheMap");
		}
		
		return activeThreadsPolicies;
	}
	
	
	// ---- Per salvare
	
	private static final String ZIP_POLICY_PREFIX = "policy-";
	private static final String ZIP_POLICY_ID_ACTIVE_SUFFIX = "-id-active.txt";
	private static final String ZIP_POLICY_CONFIGURAZIONE_POLICY_SUFFIX = "ConfigurazionePolicy.xml";
	private static final String ZIP_POLICY_ATTIVAZIONE_POLICY_SUFFIX = "AttivazionePolicy.xml";
	private static final String ZIP_POLICY_ID_DATI_COLLEZIONATI_POLICY_SUFFIX = "-id-datiCollezionati.txt";
	private static final String ZIP_POLICY_DATI_COLLEZIONATI_POLICY_SUFFIX = "-datiCollezionati.txt";
	
	@Override
	public void serialize(OutputStream out) throws PolicyException{
				
		//synchronized (this.mapActiveThreadsPolicy) {
		try {
			this.lock.acquireThrowRuntime("serialize");
			
			if(this.isStop){
				throw new PolicyException("Already serialized");
			}
			this.isStop = true;
			
			if(this.mapActiveThreadsPolicy==null || this.mapActiveThreadsPolicy.size()<=0){
				return;
			}
			
			ZipOutputStream zipOut = null;
			try{
				zipOut = new ZipOutputStream(out);

				String rootPackageDir = "";
				// Il codice dopo fissa il problema di inserire una directory nel package.
				// Commentare la riga sotto per ripristinare il vecchio comportamento.
				rootPackageDir = Costanti.OPENSPCOOP2_ARCHIVE_ROOT_DIR+File.separatorChar;
				
				// indice 
				int index = 1;
				
				// Chiavi possiedono la policy id
				for (String idActivePolicy : this.mapActiveThreadsPolicy.keySet()) {
					
					// Id File
					String idFileActivePolicy = ZIP_POLICY_PREFIX+index;
					
					// File contenente l'identificativo della policy attivata
					String nomeFile = idFileActivePolicy+ZIP_POLICY_ID_ACTIVE_SUFFIX;
					zipOut.putNextEntry(new ZipEntry(rootPackageDir+nomeFile));
					zipOut.write(idActivePolicy.getBytes());
					
					// GroupByThread
					IPolicyGroupByActiveThreadsInMemory active = this.mapActiveThreadsPolicy.get(idActivePolicy);
					if(active!=null){
						
						ActivePolicy activePolicy = active.getActivePolicy();
						JaxbSerializer serializer = new JaxbSerializer();
						
						byte[] cPolicy = null;
						if(activePolicy.getConfigurazionePolicy()!=null){
							try {
								cPolicy = serializer.toByteArray(activePolicy.getConfigurazionePolicy());
							}catch(Throwable t) {
								this.log.error("["+this.type+"] Serializzazione configurazione policy '"+activePolicy.getConfigurazionePolicy().getIdPolicy()+"' fallita: "+t.getMessage(),t);
							}
							if(cPolicy==null) {
								continue;
							}
						}
						
						byte[] aPolicy = null;
						if(activePolicy.getInstanceConfiguration()!=null){
							try {
								aPolicy = serializer.toByteArray(activePolicy.getInstanceConfiguration());
							}catch(Throwable t) {
								this.log.error("["+this.type+"] Serializzazione attivazione policy '"+activePolicy.getInstanceConfiguration().getAlias()+"' ("+activePolicy.getInstanceConfiguration().getIdActivePolicy()+") fallita: "+t.getMessage(),t);
							}
							if(aPolicy==null) {
								continue;
							}
						}
						
						
						// ConfigurazionePolicy
						if(activePolicy.getConfigurazionePolicy()!=null){
							nomeFile = idFileActivePolicy+File.separatorChar+ZIP_POLICY_CONFIGURAZIONE_POLICY_SUFFIX;
							zipOut.putNextEntry(new ZipEntry(rootPackageDir+nomeFile));
							zipOut.write(cPolicy);
						}
						
						// AttivazionePolicy
						if(activePolicy.getInstanceConfiguration()!=null){
							nomeFile = idFileActivePolicy+File.separatorChar+ZIP_POLICY_ATTIVAZIONE_POLICY_SUFFIX;
							zipOut.putNextEntry(new ZipEntry(rootPackageDir+nomeFile));
							zipOut.write(aPolicy);
						}
						
						Map<IDUnivocoGroupByPolicy, DatiCollezionati> map = active.getMapActiveThreads();
						if(map!=null && map.size()>0){
							
							// indice 
							int indexDatoCollezionato = 1;
							
							// Chiavi dei raggruppamenti
							for (IDUnivocoGroupByPolicy idUnivocoGroupByPolicy : map.keySet()) {
								
								// Id Raggruppamento
								String idFileRaggruppamento = idFileActivePolicy+File.separatorChar+"groupBy"+File.separatorChar+"groupBy-"+indexDatoCollezionato;
								
								String id = null;
								try {
									id = IDUnivocoGroupByPolicy.serialize(idUnivocoGroupByPolicy);
								}catch(Throwable t) {
									this.log.error("["+this.type+"] Serializzazione idUnivocoGroupByPolicy ("+idUnivocoGroupByPolicy+") della policy '"+activePolicy.getInstanceConfiguration().getAlias()+"' ("+activePolicy.getInstanceConfiguration().getIdActivePolicy()+") fallita: "+t.getMessage(),t);
								}
								if(id==null) {
									continue;
								}
								
								DatiCollezionati datiCollezionati = map.get(idUnivocoGroupByPolicy);
								String dati = null;
								try {
									dati = DatiCollezionati.serialize(datiCollezionati);
								}catch(Throwable t) {
									this.log.error("["+this.type+"] Serializzazione dati per idUnivocoGroupByPolicy ("+idUnivocoGroupByPolicy+") della policy '"+activePolicy.getInstanceConfiguration().getAlias()+"' ("+activePolicy.getInstanceConfiguration().getIdActivePolicy()+") fallita: "+t.getMessage(),t);
								}
								if(dati==null) {
									continue;
								}
								
								// File contenente l'identificativo del raggruppamento
								nomeFile = idFileRaggruppamento+ZIP_POLICY_ID_DATI_COLLEZIONATI_POLICY_SUFFIX;
								zipOut.putNextEntry(new ZipEntry(rootPackageDir+nomeFile));
								zipOut.write(id.getBytes());
								
								// DatiCollezionati
								// NOTA: l'ulteriore directory serve a garantire il corretto ordine di ricostruzione
								nomeFile = idFileRaggruppamento+File.separatorChar+"dati"+ZIP_POLICY_DATI_COLLEZIONATI_POLICY_SUFFIX;
								zipOut.putNextEntry(new ZipEntry(rootPackageDir+nomeFile));
								zipOut.write(dati.getBytes());
								
								// increment
								indexDatoCollezionato++;
							}
							
						}
					}
					
					// increment
					index++;
				}
				
				zipOut.flush();

			}catch(Exception e){
				throw new PolicyException(e.getMessage(),e);
			}finally{
				try{
					if(zipOut!=null)
						zipOut.close();
				}catch(Exception eClose){}
			}

		}finally {
			this.lock.release("serialize");
		}
	}
	
	@Override
	public void initialize(InputStream in,ConfigurazioneControlloTraffico configurazioneControlloTraffico) throws PolicyException{
		
		//synchronized (this.mapActiveThreadsPolicy) {
		try {
			this.lock.acquireThrowRuntime("initialize");
			
			if(in==null){
				return;
			}
			
			File f = null;
			ZipFile zipFile = null;
			String entryName = null;
			try{
				
				// Leggo InputStream
				byte [] bytesIn = Utilities.getAsByteArray(in);
				in.close();
				in = null;
				if(bytesIn==null || bytesIn.length<=0){
					return;
				}
				f = FileSystemUtilities.createTempFile("controlloTraffico", ".tmp");
				FileSystemUtilities.writeFile(f, bytesIn);
				
				// Leggo Struttura ZIP
				try {
					zipFile = new ZipFile(f);
				}catch(Throwable t) {
					this.log.error("Inizializzazione immagine ControlloTraffico precedente allo shutdown non ripristinabile, immagine corrotta: "+t.getMessage(),t);
					return;
				}
				
				JaxbDeserializer deserializer = new JaxbDeserializer();
				
				String rootPackageDir = Costanti.OPENSPCOOP2_ARCHIVE_ROOT_DIR+File.separatorChar;
				
				String rootDir = null;
				
				String idActivePolicy = null;
				ConfigurazionePolicy configurazionePolicy = null;
				AttivazionePolicy attivazionePolicy = null;
				Map<IDUnivocoGroupByPolicy, DatiCollezionati> map = null;
				
				IDUnivocoGroupByPolicy idDatiCollezionati = null;
				
				Iterator<ZipEntry> it = ZipUtilities.entries(zipFile, true);
				while (it.hasNext()) {
					ZipEntry zipEntry = (ZipEntry) it.next();
					entryName = ZipUtilities.operativeSystemConversion(zipEntry.getName());
					
					//System.out.println("FILE NAME:  "+entryName);
					//System.out.println("SIZE:  "+entry.getSize());

					// Il codice dopo fissa il problema di inserire una directory nel package.
					// Commentare la riga sotto per ripristinare il vecchio comportamento.
					if(rootDir==null){
						// Calcolo ROOT DIR
						rootDir=ZipUtilities.getRootDir(entryName);
					}
					
					if(zipEntry.isDirectory()) {
						continue; // directory
					}
					else {
						FileDataSource fds = new FileDataSource(entryName);
						String nome = fds.getName();
						String tipo = nome.substring(nome.lastIndexOf(".")+1,nome.length()); 
						tipo = tipo.toUpperCase();
						//System.out.println("VERIFICARE NAME["+nome+"] TIPO["+tipo+"]");
						
						InputStream inputStream = zipFile.getInputStream(zipEntry);
						byte[]content = Utilities.getAsByteArray(inputStream);
						
						try{
							
							if(entryName.startsWith((rootPackageDir+ZIP_POLICY_PREFIX)) ){
								
								if(entryName.endsWith(ZIP_POLICY_ID_ACTIVE_SUFFIX)){
									
									if(idActivePolicy!=null){
										// salvo precedente immagine
										this.mapActiveThreadsPolicy.put(idActivePolicy, 
												this.buildPolicyGroupByActiveThreads(configurazionePolicy, attivazionePolicy, map, configurazioneControlloTraffico));
										//System.out.println("@@@ RICOSTRUITO ID ACTIVE POLICY ["+idActivePolicy+"]");
										idActivePolicy = null;
										configurazionePolicy = null;
										attivazionePolicy = null;
										map = null;
										idDatiCollezionati = null;
									}
									
									idActivePolicy = new String(content);
									map=new HashMap<IDUnivocoGroupByPolicy, DatiCollezionati>();
									
									//System.out.println("ENTRY ["+idActivePolicy+"] NUOVO ID ["+entryName+"]");
									
								}
								else if(entryName.endsWith(ZIP_POLICY_CONFIGURAZIONE_POLICY_SUFFIX)){
								
									configurazionePolicy = deserializer.readConfigurazionePolicy(content);
									
									//System.out.println("ENTRY ["+idActivePolicy+"] CONFIGURAZIONE POLICY ["+entryName+"]");
									
								}
								else if(entryName.endsWith(ZIP_POLICY_ATTIVAZIONE_POLICY_SUFFIX)){
									
									attivazionePolicy = deserializer.readAttivazionePolicy(content);
									
									//System.out.println("ENTRY ["+idActivePolicy+"] ATTIVAZIONE POLICY ["+entryName+"]");
									
								}
								else if(entryName.endsWith(ZIP_POLICY_ID_DATI_COLLEZIONATI_POLICY_SUFFIX)){
									
									idDatiCollezionati  = IDUnivocoGroupByPolicy.deserialize( new String(content) );
									
									//System.out.println("ENTRY ["+idActivePolicy+"] ID DATI COLLEZIONATI POLICY ["+entryName+"]");
									
								}
								else if(entryName.endsWith(ZIP_POLICY_DATI_COLLEZIONATI_POLICY_SUFFIX)){
									
									if(idDatiCollezionati==null){
										throw new Exception("Identificativo di group by not found");
									}
									map.put(idDatiCollezionati, DatiCollezionati.deserialize( new String(content) ));
																		
									//System.out.println("ENTRY ["+idActivePolicy+"] DATI COLLEZIONATI POLICY ["+entryName+"]");
									
								}
								
							}
							else{
								throw new Exception("Entry unknown");
							}
							
						}finally{
							try{
								if(inputStream!=null){
									inputStream.close();
								}
							}catch(Exception eClose){}
						}
					}
					
				}
				
				if(idActivePolicy!=null){
					// salvo precedente immagine ?
					this.mapActiveThreadsPolicy.put(idActivePolicy, 
							this.buildPolicyGroupByActiveThreads(configurazionePolicy, attivazionePolicy, map, configurazioneControlloTraffico));
					//System.out.println("@@@ RICOSTRUITO FINALE ID ACTIVE POLICY ["+idActivePolicy+"]");
					idActivePolicy = null;
					configurazionePolicy = null;
					attivazionePolicy = null;
					map = null;
					idDatiCollezionati = null;
				}
				
			}catch(Exception e){
				throw new PolicyException("["+entryName+"] "+e.getMessage(),e);
			}
			finally{
				try{
					if(zipFile!=null)
						zipFile.close();
				}catch(Exception eClose){}
				try{
					if(f!=null) {
						if(!f.delete()) {
							// ignore
						}
					}
				}catch(Exception eClose){}
				try{
					if(in!=null)
						in.close();
				}catch(Exception eClose){}
			}
			
			

		}finally {
			this.lock.release("initialize");
		}
	}
	
	@Override
	public void cleanOldActiveThreadsPolicy() throws PolicyException{
		this.lock.acquireThrowRuntime("cleanOldActiveThreadsPolicy");
		try {
			if(this.mapActiveThreadsPolicy!=null && !this.mapActiveThreadsPolicy.isEmpty()) {
				for (String uniqueIdMap : this.mapActiveThreadsPolicy.keySet()) {
					for (PolicyGroupByActiveThreadsType tipo : GestorePolicyAttive.getTipiGestoriAttivi()) {
						if(!tipo.equals(this.type)) {
							//System.out.println("======== RIPULISCO DALLA PARTENZA in '"+tipo+"' ["+uniqueIdMap+"] !!!");
							GestorePolicyAttive.getInstance(tipo).removeActiveThreadsPolicyUnsafe(uniqueIdMap);
						}
					}
				}
			}
		}catch(Exception e){
			throw new PolicyException(e.getMessage(),e);
		}	
		finally {
			this.lock.release("cleanOldActiveThreadsPolicy");
		}
	}
	
	private IPolicyGroupByActiveThreadsInMemory buildPolicyGroupByActiveThreads(ConfigurazionePolicy configurazionePolicy,
			AttivazionePolicy attivazionePolicy, Map<IDUnivocoGroupByPolicy, DatiCollezionati> map,
			ConfigurazioneControlloTraffico configurazioneControlloTraffico) throws Exception{
		
		if(configurazionePolicy==null){
			throw new PolicyException("ConfigurazionePolicy non presente");
		}
		if(attivazionePolicy==null){
			throw new PolicyException("AttivazionePolicy non presente");
		}
		if(configurazioneControlloTraffico==null){
			throw new PolicyException("ConfigurazioneControlloTraffico non presente");
		}
		
		ActivePolicy activePolicy = new ActivePolicy();
		activePolicy.setConfigurazioneControlloTraffico(configurazioneControlloTraffico);
		activePolicy.setConfigurazionePolicy(configurazionePolicy);
		activePolicy.setInstanceConfiguration(attivazionePolicy);
		activePolicy.setTipoRisorsaPolicy(TipoRisorsa.toEnumConstant(configurazionePolicy.getRisorsa(), true));		
		
		String uniqueIdMap = UniqueIdentifierUtilities.getUniqueId(activePolicy.getInstanceConfiguration());
		IPolicyGroupByActiveThreadsInMemory p = newPolicyGroupByActiveThreadsInMemory(activePolicy, uniqueIdMap,
				null, null);
		
		if(map!=null && map.size()>0){
			for (IDUnivocoGroupByPolicy id : map.keySet()) {
				map.get(id).initActiveRequestCounter();
			}
			p.initMap(map);
		}
		
		return p;
		
	}
	
	private IPolicyGroupByActiveThreadsInMemory newPolicyGroupByActiveThreadsInMemory(ActivePolicy activePolicy, String uniqueIdMap,
			DatiTransazione datiTransazione, Object state) throws PolicyException {
		switch (this.type) {
		case LOCAL:
			return new PolicyGroupByActiveThreads(activePolicy, this.type);
		case LOCAL_DIVIDED_BY_NODES:
			return new PolicyGroupByActiveThreads(activePolicy, this.type);
		case DATABASE:
			return new PolicyGroupByActiveThreadsDB(activePolicy, this.type, uniqueIdMap, 
					(state!=null && state instanceof IState) ? ((IState)state) : null, 
					datiTransazione!=null ? datiTransazione.getDominio() : null, 
				    datiTransazione!=null ? datiTransazione.getIdTransazione() : null);
		case HAZELCAST_LOCAL_CACHE:
			return new PolicyGroupByActiveThreadsDistributedLocalCache(activePolicy, uniqueIdMap, HazelcastManager.getInstance(this.type));
		case HAZELCAST_NEAR_CACHE:
			return new PolicyGroupByActiveThreadsDistributedNearCache(activePolicy, uniqueIdMap, HazelcastManager.getInstance(this.type));
		case HAZELCAST_NEAR_CACHE_UNSAFE_SYNC_MAP:
			return new PolicyGroupByActiveThreadsDistributedNearCacheWithoutEntryProcessorPutSync(activePolicy, uniqueIdMap, HazelcastManager.getInstance(this.type));
		case HAZELCAST_NEAR_CACHE_UNSAFE_ASYNC_MAP:
			return new PolicyGroupByActiveThreadsDistributedNearCacheWithoutEntryProcessorPutAsync(activePolicy, uniqueIdMap, HazelcastManager.getInstance(this.type));
		case HAZELCAST_MAP:
			return new PolicyGroupByActiveThreadsDistributedNoCache(activePolicy, uniqueIdMap, HazelcastManager.getInstance(this.type));
		case HAZELCAST_REPLICATED_MAP:
			return new PolicyGroupByActiveThreadsDistributedReplicatedMap(activePolicy, uniqueIdMap, HazelcastManager.getInstance(this.type));
  	  	case HAZELCAST_PNCOUNTER:
  	  	case HAZELCAST_ATOMIC_LONG:
  	  	case HAZELCAST_ATOMIC_LONG_ASYNC:
  	  	case REDISSON_ATOMIC_LONG:
  	  	case REDISSON_LONGADDER:
  	  		 BuilderDatiCollezionatiDistributed builder = BuilderDatiCollezionatiDistributed.getBuilder(this.type);
  	  		 if(this.useCountersWithLock) {
  	  			 return new PolicyGroupByActiveThreadsDistributedCountersWithLock(activePolicy, uniqueIdMap,  builder);
  	  		 }
  	  		 else {
  	  			 return new PolicyGroupByActiveThreadsDistributedCounters(activePolicy, uniqueIdMap,  builder);
  	  		 }
		case REDISSON_MAP:
			
			boolean throwInitializingException = true;
			RedissonClient redissonClient = null;
			try {
				redissonClient = RedissonManager.getRedissonClient(throwInitializingException);
			}catch(Exception e) {
				throw new PolicyException(e.getMessage(),e);
			}
			
			return new PolicyGroupByActiveThreadsDistributedRedis(activePolicy, uniqueIdMap, redissonClient);
		}
		throw new PolicyException("Unsupported type '"+this.type+"'");
	}
	
}