GestorePolicyAttiveInMemory.java

  1. /*
  2.  * GovWay - A customizable API Gateway
  3.  * https://govway.org
  4.  *
  5.  * Copyright (c) 2005-2025 Link.it srl (https://link.it).
  6.  *
  7.  * This program is free software: you can redistribute it and/or modify
  8.  * it under the terms of the GNU General Public License version 3, as published by
  9.  * the Free Software Foundation.
  10.  *
  11.  * This program is distributed in the hope that it will be useful,
  12.  * but WITHOUT ANY WARRANTY; without even the implied warranty of
  13.  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  14.  * GNU General Public License for more details.
  15.  *
  16.  * You should have received a copy of the GNU General Public License
  17.  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
  18.  *
  19.  */
  20. package org.openspcoop2.pdd.core.controllo_traffico.policy.driver;

  21. import java.io.File;
  22. import java.io.InputStream;
  23. import java.io.OutputStream;
  24. import java.util.HashMap;
  25. import java.util.Iterator;
  26. import java.util.Map;
  27. import java.util.Map.Entry;
  28. import java.util.Set;
  29. import java.util.zip.ZipEntry;
  30. import java.util.zip.ZipFile;
  31. import java.util.zip.ZipOutputStream;

  32. import javax.activation.FileDataSource;

  33. import org.openspcoop2.core.controllo_traffico.AttivazionePolicy;
  34. import org.openspcoop2.core.controllo_traffico.ConfigurazionePolicy;
  35. import org.openspcoop2.core.controllo_traffico.beans.ActivePolicy;
  36. import org.openspcoop2.core.controllo_traffico.beans.ConfigurazioneControlloTraffico;
  37. import org.openspcoop2.core.controllo_traffico.beans.DatiCollezionati;
  38. import org.openspcoop2.core.controllo_traffico.beans.DatiTransazione;
  39. import org.openspcoop2.core.controllo_traffico.beans.IDUnivocoGroupByPolicy;
  40. import org.openspcoop2.core.controllo_traffico.beans.UniqueIdentifierUtilities;
  41. import org.openspcoop2.core.controllo_traffico.constants.TipoRisorsa;
  42. import org.openspcoop2.core.controllo_traffico.driver.IGestorePolicyAttive;
  43. import org.openspcoop2.core.controllo_traffico.driver.IPolicyGroupByActiveThreads;
  44. import org.openspcoop2.core.controllo_traffico.driver.IPolicyGroupByActiveThreadsInMemory;
  45. import org.openspcoop2.core.controllo_traffico.driver.PolicyException;
  46. import org.openspcoop2.core.controllo_traffico.driver.PolicyGroupByActiveThreadsType;
  47. import org.openspcoop2.core.controllo_traffico.driver.PolicyNotFoundException;
  48. import org.openspcoop2.core.controllo_traffico.driver.PolicyShutdownException;
  49. import org.openspcoop2.core.controllo_traffico.utils.serializer.JaxbDeserializer;
  50. import org.openspcoop2.core.controllo_traffico.utils.serializer.JaxbSerializer;
  51. import org.openspcoop2.pdd.config.DynamicClusterManager;
  52. import org.openspcoop2.pdd.config.OpenSPCoop2Properties;
  53. import org.openspcoop2.pdd.core.controllo_traffico.policy.driver.hazelcast.HazelcastManager;
  54. import org.openspcoop2.pdd.core.controllo_traffico.policy.driver.hazelcast.PolicyGroupByActiveThreadsDistributedLocalCache;
  55. import org.openspcoop2.pdd.core.controllo_traffico.policy.driver.hazelcast.PolicyGroupByActiveThreadsDistributedNearCache;
  56. import org.openspcoop2.pdd.core.controllo_traffico.policy.driver.hazelcast.PolicyGroupByActiveThreadsDistributedNearCacheWithoutEntryProcessorPutAsync;
  57. import org.openspcoop2.pdd.core.controllo_traffico.policy.driver.hazelcast.PolicyGroupByActiveThreadsDistributedNearCacheWithoutEntryProcessorPutSync;
  58. import org.openspcoop2.pdd.core.controllo_traffico.policy.driver.hazelcast.PolicyGroupByActiveThreadsDistributedNoCache;
  59. import org.openspcoop2.pdd.core.controllo_traffico.policy.driver.hazelcast.PolicyGroupByActiveThreadsDistributedReplicatedMap;
  60. import org.openspcoop2.pdd.core.controllo_traffico.policy.driver.redisson.PolicyGroupByActiveThreadsDistributedRedis;
  61. import org.openspcoop2.pdd.core.controllo_traffico.policy.driver.redisson.RedissonManager;
  62. import org.openspcoop2.pdd.services.OpenSPCoop2Startup;
  63. import org.openspcoop2.protocol.basic.Costanti;
  64. import org.openspcoop2.protocol.sdk.state.IState;
  65. import org.openspcoop2.utils.SemaphoreLock;
  66. import org.openspcoop2.utils.Utilities;
  67. import org.openspcoop2.utils.io.ZipUtilities;
  68. import org.openspcoop2.utils.resources.FileSystemUtilities;
  69. import org.redisson.api.RedissonClient;
  70. import org.slf4j.Logger;

  71. /**    
  72.  * GestorePolicyAttiveInMemory
  73.  *
  74.  * @author Poli Andrea (poli@link.it)
  75.  * @author $Author$
  76.  * @version $Rev$, $Date$
  77.  */
  78. public class GestorePolicyAttiveInMemory implements IGestorePolicyAttive {


  79.     /**
  80.      * Threads allocati sulle Policy. La chiave รจ l'active-policy-id
  81.      **/
  82.     private Map<String, IPolicyGroupByActiveThreadsInMemory> mapActiveThreadsPolicy =
  83.             new HashMap<String, IPolicyGroupByActiveThreadsInMemory>();
  84.     private final org.openspcoop2.utils.Semaphore lock = new org.openspcoop2.utils.Semaphore("GestorePolicyAttiveInMemory");
  85.    
  86.     private static final String IMPL_DESCR = "Implementazione InMemory IGestorePolicyAttive";
  87.     public static String getImplDescr(){
  88.         return IMPL_DESCR;
  89.     }
  90.    
  91.     private Logger log;
  92.     private PolicyGroupByActiveThreadsType type;
  93.     private boolean useCountersWithLock = false;
  94.     @Override
  95.     public void initialize(Logger log, boolean isStartupGovWay, PolicyGroupByActiveThreadsType type, Object ... params) throws PolicyException{
  96.         this.log = log;
  97.         this.type = type;
  98.         if(this.type==null) {
  99.             this.type = PolicyGroupByActiveThreadsType.LOCAL;
  100.         }
  101.        
  102.         switch (this.type) {
  103.         case LOCAL:
  104.             break;
  105.         case LOCAL_DIVIDED_BY_NODES:
  106.             if(!DynamicClusterManager.isInitialized()) {
  107.                 try {
  108.                     DynamicClusterManager.initStaticInstance();
  109.                     DynamicClusterManager.getInstance().setRateLimitingGestioneCluster(true);
  110.                     DynamicClusterManager.getInstance().register(log);
  111.                     OpenSPCoop2Startup.startTimerClusterDinamicoThread();
  112.                 }catch(Exception e) {
  113.                     throw new PolicyException(e.getMessage(),e);
  114.                 }
  115.             }
  116.             else {
  117.                 try {
  118.                     DynamicClusterManager.getInstance().setRateLimitingGestioneCluster(true);
  119.                 }catch(Exception e) {
  120.                     throw new PolicyException(e.getMessage(),e);
  121.                 }
  122.             }
  123.             break;
  124.            
  125.         case DATABASE:
  126.             break;
  127.        
  128.         case HAZELCAST_MAP:
  129.         case HAZELCAST_NEAR_CACHE:
  130.         case HAZELCAST_NEAR_CACHE_UNSAFE_ASYNC_MAP:
  131.         case HAZELCAST_NEAR_CACHE_UNSAFE_SYNC_MAP:
  132.         case HAZELCAST_PNCOUNTER:
  133.         case HAZELCAST_ATOMIC_LONG:
  134.         case HAZELCAST_ATOMIC_LONG_ASYNC:
  135.         case HAZELCAST_REPLICATED_MAP:
  136.             HazelcastManager.getInstance(this.type);
  137.             break;
  138.         case HAZELCAST_LOCAL_CACHE:
  139.             HazelcastManager.getInstance(this.type);
  140.             if(!OpenSPCoop2Startup.isStartedTimerClusteredRateLimitingLocalCache()) {
  141.                 try {
  142.                     OpenSPCoop2Startup.startTimerClusteredRateLimitingLocalCache(this);
  143.                 }catch(Exception e) {
  144.                     throw new PolicyException(e.getMessage(),e);
  145.                 }
  146.             }
  147.             break;

  148.         case REDISSON_MAP:
  149.         case REDISSON_ATOMIC_LONG:
  150.         case REDISSON_LONGADDER:
  151.            
  152.             boolean throwInitializingException = true;
  153.             if(isStartupGovWay) {
  154.                 throwInitializingException = OpenSPCoop2Properties.getInstance().isControlloTrafficoGestorePolicyInMemoryRedisThrowExceptionIfRedisNotReady();
  155.             }
  156.            
  157.             try {
  158.                 RedissonManager.getRedissonClient(throwInitializingException);
  159.             }catch(Exception e) {
  160.                 throw new PolicyException(e.getMessage(),e);
  161.             }
  162.             break;
  163.         }
  164.        
  165.         if(this.type.isHazelcastCounters() || this.type.isRedisCounters()) {
  166.             this.useCountersWithLock=OpenSPCoop2Properties.getInstance().isControlloTrafficoGestorePolicyInMemoryRemoteCountersUseLocalLock();
  167.         }
  168.     }
  169.    
  170.     private boolean isStop = false;
  171.    
  172.    
  173.     @Override
  174.     public PolicyGroupByActiveThreadsType getType() {
  175.         return this.type;
  176.     }
  177.    
  178.     @Override
  179.     public IPolicyGroupByActiveThreads getActiveThreadsPolicy(ActivePolicy activePolicy,DatiTransazione datiTransazione, Object state) throws PolicyShutdownException,PolicyException {    
  180.        
  181.         String uniqueIdMap = UniqueIdentifierUtilities.getUniqueId(activePolicy.getInstanceConfiguration());
  182.                
  183.         /**synchronized (this.mapActiveThreadsPolicy) {*/
  184.         SemaphoreLock slock = this.lock.acquireThrowRuntime("getActiveThreadsPolicy(ActivePolicy)");
  185.         try {
  186.            
  187.             if(this.isStop){
  188.                 throw new PolicyShutdownException("Policy Manager shutdown");
  189.             }
  190.            
  191.             IPolicyGroupByActiveThreadsInMemory active = null;
  192.             /**System.out.println("@@@ getActiveThreadsPolicy["+uniqueIdMap+"] contains["+this.mapActiveThreadsPolicy.containsKey(uniqueIdMap)+"]...");*/
  193.             if(this.mapActiveThreadsPolicy.containsKey(uniqueIdMap)){
  194.                 active = this.mapActiveThreadsPolicy.get(uniqueIdMap);
  195.                 /**System.out.println("@@@ getActiveThreadsPolicy["+uniqueIdMap+"] GET");*/
  196.             }
  197.             else{
  198.                 active = newPolicyGroupByActiveThreadsInMemory(activePolicy, uniqueIdMap, datiTransazione, state);
  199.                 this.mapActiveThreadsPolicy.put(uniqueIdMap, active);
  200.                 /**System.out.println("@@@ getActiveThreadsPolicy["+uniqueIdMap+"] CREATE");*/
  201.             }
  202.             return active;
  203.         }finally {
  204.             this.lock.release(slock, "getActiveThreadsPolicy(ActivePolicy)");
  205.         }
  206.     }
  207.     @Override
  208.     public IPolicyGroupByActiveThreads getActiveThreadsPolicy(String uniqueIdMap) throws PolicyShutdownException,PolicyException,PolicyNotFoundException { // usata per la remove
  209.         /**synchronized (this.mapActiveThreadsPolicy) {*/
  210.         SemaphoreLock slock = this.lock.acquireThrowRuntime("getActiveThreadsPolicy(uniqueIdMap)");
  211.         try {
  212.            
  213.             if(this.isStop){
  214.                 throw new PolicyShutdownException("Policy Manager shutdown");
  215.             }
  216.            
  217.             IPolicyGroupByActiveThreads active = null;
  218.             /**System.out.println("@@@ getActiveThreadsPolicy["+uniqueIdMap+"] contains["+this.mapActiveThreadsPolicy.containsKey(uniqueIdMap)+"]...");*/
  219.             if(this.mapActiveThreadsPolicy.containsKey(uniqueIdMap)){
  220.                 active = this.mapActiveThreadsPolicy.get(uniqueIdMap);
  221.                 /**System.out.println("@@@ getActiveThreadsPolicy["+uniqueIdMap+"] GET");*/
  222.             }
  223.             else{
  224.                 throw new PolicyNotFoundException("ActivePolicy ["+uniqueIdMap+"] notFound");
  225.             }
  226.             return active;
  227.         }finally{
  228.             this.lock.release(slock, "getActiveThreadsPolicy(uniqueIdMap)");
  229.         }
  230.     }
  231.    
  232.     @Override
  233.     public long sizeActivePolicyThreads(boolean sum) throws PolicyShutdownException,PolicyException{
  234.         /**synchronized (this.mapActiveThreadsPolicy) {*/
  235.         SemaphoreLock slock = this.lock.acquireThrowRuntime("sizeActivePolicyThreads");
  236.         try {
  237.            
  238.             if(this.isStop){
  239.                 throw new PolicyShutdownException("Policy Manager shutdown");
  240.             }
  241.            
  242.             if(sum){
  243.                 long sumLong = 0;
  244.                 if(this.mapActiveThreadsPolicy!=null && !this.mapActiveThreadsPolicy.isEmpty()) {
  245.                     for (String idPolicy : this.mapActiveThreadsPolicy.keySet()) {
  246.                         sumLong = sumLong +this.mapActiveThreadsPolicy.get(idPolicy).getActiveThreads();
  247.                     }
  248.                 }
  249.                 return sumLong;
  250.             }else{
  251.                 return this.mapActiveThreadsPolicy.size();
  252.             }
  253.         }finally {
  254.             this.lock.release(slock, "sizeActivePolicyThreads");
  255.         }
  256.     }
  257.    
  258.     @Override
  259.     public String printKeysPolicy(String separator) throws PolicyShutdownException, PolicyException{
  260.         /**synchronized (this.mapActiveThreadsPolicy) {*/
  261.         SemaphoreLock slock = this.lock.acquireThrowRuntime("printKeysPolicy");
  262.         try {
  263.            
  264.             if(this.isStop){
  265.                 throw new PolicyShutdownException("Policy Manager shutdown");
  266.             }
  267.            
  268.             StringBuilder bf = new StringBuilder();
  269.             if(this.mapActiveThreadsPolicy!=null && !this.mapActiveThreadsPolicy.isEmpty()) {
  270.                 int i = 0;
  271.                 for (String idPolicy : this.mapActiveThreadsPolicy.keySet()) {
  272.                     String key = idPolicy;
  273.                     if(i>0){
  274.                         bf.append(separator);
  275.                     }
  276.                     bf.append("Cache-"+this.type+"["+i+"]=["+key+"]");
  277.                     i++;
  278.                 }
  279.             }
  280.             return bf.toString();
  281.         }finally {
  282.             this.lock.release(slock, "printKeysPolicy");
  283.         }
  284.     }
  285.    
  286.     @Override
  287.     public String printInfoPolicy(String id, String separatorGroups) throws PolicyShutdownException,PolicyException,PolicyNotFoundException{
  288.         IPolicyGroupByActiveThreadsInMemory activeThreads = (IPolicyGroupByActiveThreadsInMemory) this.getActiveThreadsPolicy(id);  
  289.         try{
  290.             return activeThreads.printInfos(this.log, separatorGroups);
  291.         }catch(Exception e){
  292.             throw new PolicyException(e.getMessage(),e);
  293.         }
  294.     }
  295.    
  296.     @Override
  297.     public void removeActiveThreadsPolicy(String idActivePolicy) throws PolicyShutdownException, PolicyException{
  298.         /**synchronized (this.mapActiveThreadsPolicy) {*/
  299.         SemaphoreLock slock = this.lock.acquireThrowRuntime("removeActiveThreadsPolicy");
  300.         try {
  301.            
  302.             if(this.isStop){
  303.                 throw new PolicyShutdownException("Policy Manager shutdown");
  304.             }
  305.            
  306.             if(this.mapActiveThreadsPolicy.containsKey(idActivePolicy)){
  307.                 this.mapActiveThreadsPolicy.remove(idActivePolicy);
  308.             }
  309.         }finally {
  310.             this.lock.release(slock, "removeActiveThreadsPolicy");
  311.         }
  312.     }
  313.    
  314.     @Override
  315.     public void removeActiveThreadsPolicyUnsafe(String idActivePolicy) throws PolicyShutdownException,PolicyException{
  316.         if(this.isStop){
  317.             throw new PolicyShutdownException("Policy Manager shutdown");
  318.         }
  319.        
  320.         IPolicyGroupByActiveThreadsInMemory policy = this.mapActiveThreadsPolicy.remove(idActivePolicy);
  321.         if(policy!=null) {
  322.             try {
  323.                 policy.remove();
  324.             }catch(Throwable e) {
  325.                 this.log.error("removeActiveThreadsPolicyUnsafe failed: "+e.getMessage(),e);
  326.             }
  327.         }
  328.     }
  329.    
  330.     @Override
  331.     public void removeAllActiveThreadsPolicy() throws PolicyShutdownException, PolicyException{
  332.         /**synchronized (this.mapActiveThreadsPolicy) {*/
  333.         SemaphoreLock slock = this.lock.acquireThrowRuntime("removeAllActiveThreadsPolicy");
  334.         try {
  335.            
  336.             if(this.isStop){
  337.                 throw new PolicyShutdownException("Policy Manager shutdown");
  338.             }
  339.            
  340.             this.mapActiveThreadsPolicy.clear();
  341.         }finally {
  342.             this.lock.release(slock, "removeAllActiveThreadsPolicy");
  343.         }
  344.     }
  345.    
  346.     @Override
  347.     public void resetCountersActiveThreadsPolicy(String idActivePolicy) throws PolicyShutdownException, PolicyException{
  348.         /**synchronized (this.mapActiveThreadsPolicy) {*/
  349.         SemaphoreLock slock = this.lock.acquireThrowRuntime("resetCountersActiveThreadsPolicy");
  350.         try {
  351.            
  352.             if(this.isStop){
  353.                 throw new PolicyShutdownException("Policy Manager shutdown");
  354.             }
  355.            
  356.             if(this.mapActiveThreadsPolicy.containsKey(idActivePolicy)){
  357.                 this.mapActiveThreadsPolicy.get(idActivePolicy).resetCounters();
  358.             }
  359.         }finally {
  360.             this.lock.release(slock, "resetCountersActiveThreadsPolicy");
  361.         }
  362.     }
  363.    
  364.     @Override
  365.     public void resetCountersAllActiveThreadsPolicy() throws PolicyShutdownException, PolicyException{
  366.         /**synchronized (this.mapActiveThreadsPolicy) {*/
  367.         SemaphoreLock slock = this.lock.acquireThrowRuntime("resetCountersAllActiveThreadsPolicy");
  368.         try {
  369.            
  370.             if(this.isStop){
  371.                 throw new PolicyShutdownException("Policy Manager shutdown");
  372.             }
  373.            
  374.             if(this.mapActiveThreadsPolicy.size()>0){
  375.                 for (String key : this.mapActiveThreadsPolicy.keySet()) {
  376.                     this.mapActiveThreadsPolicy.get(key).resetCounters();
  377.                 }
  378.             }
  379.         }finally {
  380.             this.lock.release(slock, "resetCountersAllActiveThreadsPolicy");
  381.         }
  382.     }
  383.    
  384.     public Set<Entry<String, IPolicyGroupByActiveThreadsInMemory>> entrySet() throws PolicyShutdownException, PolicyException{
  385.         Set<Entry<String, IPolicyGroupByActiveThreadsInMemory>> activeThreadsPolicies;
  386.        
  387.         SemaphoreLock slock = this.lock.acquireThrowRuntime("updateLocalCacheMap");
  388.         try {
  389.            
  390.             if(this.isStop){
  391.                 throw new PolicyShutdownException("Policy Manager shutdown");
  392.             }
  393.             activeThreadsPolicies = this.mapActiveThreadsPolicy.entrySet();
  394.         } finally {
  395.             this.lock.release(slock, "updateLocalCacheMap");
  396.         }
  397.        
  398.         return activeThreadsPolicies;
  399.     }
  400.    
  401.    
  402.     // ---- Per salvare
  403.    
  404.     private static final String ZIP_POLICY_PREFIX = "policy-";
  405.     private static final String ZIP_POLICY_ID_ACTIVE_SUFFIX = "-id-active.txt";
  406.     private static final String ZIP_POLICY_CONFIGURAZIONE_POLICY_SUFFIX = "ConfigurazionePolicy.xml";
  407.     private static final String ZIP_POLICY_ATTIVAZIONE_POLICY_SUFFIX = "AttivazionePolicy.xml";
  408.     private static final String ZIP_POLICY_ID_DATI_COLLEZIONATI_POLICY_SUFFIX = "-id-datiCollezionati.txt";
  409.     private static final String ZIP_POLICY_DATI_COLLEZIONATI_POLICY_SUFFIX = "-datiCollezionati.txt";
  410.    
  411.     @Override
  412.     public void serialize(OutputStream out) throws PolicyException{
  413.                
  414.         /**synchronized (this.mapActiveThreadsPolicy) {*/
  415.         SemaphoreLock slock = null;
  416.         try {
  417.             slock = this.lock.acquireThrowRuntime("serialize");
  418.            
  419.             if(this.isStop){
  420.                 throw new PolicyException("Already serialized");
  421.             }
  422.             this.isStop = true;
  423.            
  424.             if(this.mapActiveThreadsPolicy==null || this.mapActiveThreadsPolicy.size()<=0){
  425.                 return;
  426.             }
  427.            
  428.             ZipOutputStream zipOut = null;
  429.             try{
  430.                 zipOut = new ZipOutputStream(out);

  431.                 String rootPackageDir = "";
  432.                 // Il codice dopo fissa il problema di inserire una directory nel package.
  433.                 // Commentare la riga sotto per ripristinare il vecchio comportamento.
  434.                 rootPackageDir = Costanti.OPENSPCOOP2_ARCHIVE_ROOT_DIR+File.separatorChar;
  435.                
  436.                 // indice
  437.                 int index = 1;
  438.                
  439.                 // Chiavi possiedono la policy id
  440.                 for (String idActivePolicy : this.mapActiveThreadsPolicy.keySet()) {
  441.                    
  442.                     // Id File
  443.                     String idFileActivePolicy = ZIP_POLICY_PREFIX+index;
  444.                    
  445.                     // File contenente l'identificativo della policy attivata
  446.                     String nomeFile = idFileActivePolicy+ZIP_POLICY_ID_ACTIVE_SUFFIX;
  447.                     zipOut.putNextEntry(new ZipEntry(rootPackageDir+nomeFile));
  448.                     zipOut.write(idActivePolicy.getBytes());
  449.                    
  450.                     // GroupByThread
  451.                     IPolicyGroupByActiveThreadsInMemory active = this.mapActiveThreadsPolicy.get(idActivePolicy);
  452.                     if(active!=null){
  453.                        
  454.                         ActivePolicy activePolicy = active.getActivePolicy();
  455.                         JaxbSerializer serializer = new JaxbSerializer();
  456.                        
  457.                         byte[] cPolicy = null;
  458.                         if(activePolicy.getConfigurazionePolicy()!=null){
  459.                             try {
  460.                                 cPolicy = serializer.toByteArray(activePolicy.getConfigurazionePolicy());
  461.                             }catch(Throwable t) {
  462.                                 this.log.error("["+this.type+"] Serializzazione configurazione policy '"+activePolicy.getConfigurazionePolicy().getIdPolicy()+"' fallita: "+t.getMessage(),t);
  463.                             }
  464.                             if(cPolicy==null) {
  465.                                 continue;
  466.                             }
  467.                         }
  468.                        
  469.                         byte[] aPolicy = null;
  470.                         if(activePolicy.getInstanceConfiguration()!=null){
  471.                             try {
  472.                                 aPolicy = serializer.toByteArray(activePolicy.getInstanceConfiguration());
  473.                             }catch(Throwable t) {
  474.                                 this.log.error("["+this.type+"] Serializzazione attivazione policy '"+activePolicy.getInstanceConfiguration().getAlias()+"' ("+activePolicy.getInstanceConfiguration().getIdActivePolicy()+") fallita: "+t.getMessage(),t);
  475.                             }
  476.                             if(aPolicy==null) {
  477.                                 continue;
  478.                             }
  479.                         }
  480.                        
  481.                        
  482.                         // ConfigurazionePolicy
  483.                         if(activePolicy.getConfigurazionePolicy()!=null){
  484.                             nomeFile = idFileActivePolicy+File.separatorChar+ZIP_POLICY_CONFIGURAZIONE_POLICY_SUFFIX;
  485.                             zipOut.putNextEntry(new ZipEntry(rootPackageDir+nomeFile));
  486.                             zipOut.write(cPolicy);
  487.                         }
  488.                        
  489.                         // AttivazionePolicy
  490.                         if(activePolicy.getInstanceConfiguration()!=null){
  491.                             nomeFile = idFileActivePolicy+File.separatorChar+ZIP_POLICY_ATTIVAZIONE_POLICY_SUFFIX;
  492.                             zipOut.putNextEntry(new ZipEntry(rootPackageDir+nomeFile));
  493.                             zipOut.write(aPolicy);
  494.                         }
  495.                        
  496.                         Map<IDUnivocoGroupByPolicy, DatiCollezionati> map = active.getMapActiveThreads();
  497.                         if(map!=null && map.size()>0){
  498.                            
  499.                             // indice
  500.                             int indexDatoCollezionato = 1;
  501.                            
  502.                             // Chiavi dei raggruppamenti
  503.                             for (IDUnivocoGroupByPolicy idUnivocoGroupByPolicy : map.keySet()) {
  504.                                
  505.                                 // Id Raggruppamento
  506.                                 String idFileRaggruppamento = idFileActivePolicy+File.separatorChar+"groupBy"+File.separatorChar+"groupBy-"+indexDatoCollezionato;
  507.                                
  508.                                 String id = null;
  509.                                 try {
  510.                                     id = IDUnivocoGroupByPolicy.serialize(idUnivocoGroupByPolicy);
  511.                                 }catch(Throwable t) {
  512.                                     this.log.error("["+this.type+"] Serializzazione idUnivocoGroupByPolicy ("+idUnivocoGroupByPolicy+") della policy '"+activePolicy.getInstanceConfiguration().getAlias()+"' ("+activePolicy.getInstanceConfiguration().getIdActivePolicy()+") fallita: "+t.getMessage(),t);
  513.                                 }
  514.                                 if(id==null) {
  515.                                     continue;
  516.                                 }
  517.                                
  518.                                 DatiCollezionati datiCollezionati = map.get(idUnivocoGroupByPolicy);
  519.                                 String dati = null;
  520.                                 try {
  521.                                     dati = DatiCollezionati.serialize(datiCollezionati);
  522.                                 }catch(Throwable t) {
  523.                                     this.log.error("["+this.type+"] Serializzazione dati per idUnivocoGroupByPolicy ("+idUnivocoGroupByPolicy+") della policy '"+activePolicy.getInstanceConfiguration().getAlias()+"' ("+activePolicy.getInstanceConfiguration().getIdActivePolicy()+") fallita: "+t.getMessage(),t);
  524.                                 }
  525.                                 if(dati==null) {
  526.                                     continue;
  527.                                 }
  528.                                
  529.                                 // File contenente l'identificativo del raggruppamento
  530.                                 nomeFile = idFileRaggruppamento+ZIP_POLICY_ID_DATI_COLLEZIONATI_POLICY_SUFFIX;
  531.                                 zipOut.putNextEntry(new ZipEntry(rootPackageDir+nomeFile));
  532.                                 zipOut.write(id.getBytes());
  533.                                
  534.                                 // DatiCollezionati
  535.                                 // NOTA: l'ulteriore directory serve a garantire il corretto ordine di ricostruzione
  536.                                 nomeFile = idFileRaggruppamento+File.separatorChar+"dati"+ZIP_POLICY_DATI_COLLEZIONATI_POLICY_SUFFIX;
  537.                                 zipOut.putNextEntry(new ZipEntry(rootPackageDir+nomeFile));
  538.                                 zipOut.write(dati.getBytes());
  539.                                
  540.                                 // increment
  541.                                 indexDatoCollezionato++;
  542.                             }
  543.                            
  544.                         }
  545.                     }
  546.                    
  547.                     // increment
  548.                     index++;
  549.                 }
  550.                
  551.                 zipOut.flush();

  552.             }catch(Exception e){
  553.                 throw new PolicyException(e.getMessage(),e);
  554.             }finally{
  555.                 try{
  556.                     if(zipOut!=null)
  557.                         zipOut.close();
  558.                 }catch(Exception eClose){}
  559.             }

  560.         }finally {
  561.             this.lock.release(slock, "serialize");
  562.         }
  563.     }
  564.    
  565.     @Override
  566.     public void initialize(InputStream in,ConfigurazioneControlloTraffico configurazioneControlloTraffico) throws PolicyException{
  567.        
  568.         /**synchronized (this.mapActiveThreadsPolicy) {*/
  569.         SemaphoreLock slock = null;
  570.         try {
  571.             slock = this.lock.acquireThrowRuntime("initialize");
  572.            
  573.             if(in==null){
  574.                 return;
  575.             }
  576.            
  577.             File f = null;
  578.             ZipFile zipFile = null;
  579.             String entryName = null;
  580.             try{
  581.                
  582.                 // Leggo InputStream
  583.                 byte [] bytesIn = Utilities.getAsByteArray(in);
  584.                 in.close();
  585.                 in = null;
  586.                 if(bytesIn==null || bytesIn.length<=0){
  587.                     return;
  588.                 }
  589.                 f = FileSystemUtilities.createTempFile("controlloTraffico", ".tmp");
  590.                 FileSystemUtilities.writeFile(f, bytesIn);
  591.                
  592.                 // Leggo Struttura ZIP
  593.                 try {
  594.                     zipFile = new ZipFile(f);
  595.                 }catch(Throwable t) {
  596.                     this.log.error("Inizializzazione immagine ControlloTraffico precedente allo shutdown non ripristinabile, immagine corrotta: "+t.getMessage(),t);
  597.                     return;
  598.                 }
  599.                
  600.                 JaxbDeserializer deserializer = new JaxbDeserializer();
  601.                
  602.                 String rootPackageDir = Costanti.OPENSPCOOP2_ARCHIVE_ROOT_DIR+File.separatorChar;
  603.                
  604.                 String rootDir = null;
  605.                
  606.                 String idActivePolicy = null;
  607.                 ConfigurazionePolicy configurazionePolicy = null;
  608.                 AttivazionePolicy attivazionePolicy = null;
  609.                 Map<IDUnivocoGroupByPolicy, DatiCollezionati> map = null;
  610.                
  611.                 IDUnivocoGroupByPolicy idDatiCollezionati = null;
  612.                
  613.                 Iterator<ZipEntry> it = ZipUtilities.entries(zipFile, true);
  614.                 while (it.hasNext()) {
  615.                     ZipEntry zipEntry = (ZipEntry) it.next();
  616.                     entryName = ZipUtilities.operativeSystemConversion(zipEntry.getName());
  617.                    
  618.                     //System.out.println("FILE NAME:  "+entryName);
  619.                     //System.out.println("SIZE:  "+entry.getSize());

  620.                     // Il codice dopo fissa il problema di inserire una directory nel package.
  621.                     // Commentare la riga sotto per ripristinare il vecchio comportamento.
  622.                     if(rootDir==null){
  623.                         // Calcolo ROOT DIR
  624.                         rootDir=ZipUtilities.getRootDir(entryName);
  625.                     }
  626.                    
  627.                     if(zipEntry.isDirectory()) {
  628.                         continue; // directory
  629.                     }
  630.                     else {
  631.                         FileDataSource fds = new FileDataSource(entryName);
  632.                         String nome = fds.getName();
  633.                         String tipo = nome.substring(nome.lastIndexOf(".")+1,nome.length());
  634.                         tipo = tipo.toUpperCase();
  635.                         //System.out.println("VERIFICARE NAME["+nome+"] TIPO["+tipo+"]");
  636.                        
  637.                         InputStream inputStream = zipFile.getInputStream(zipEntry);
  638.                         byte[]content = Utilities.getAsByteArray(inputStream);
  639.                        
  640.                         try{
  641.                            
  642.                             if(entryName.startsWith((rootPackageDir+ZIP_POLICY_PREFIX)) ){
  643.                                
  644.                                 if(entryName.endsWith(ZIP_POLICY_ID_ACTIVE_SUFFIX)){
  645.                                    
  646.                                     if(idActivePolicy!=null){
  647.                                         // salvo precedente immagine
  648.                                         this.mapActiveThreadsPolicy.put(idActivePolicy,
  649.                                                 this.buildPolicyGroupByActiveThreads(configurazionePolicy, attivazionePolicy, map, configurazioneControlloTraffico));
  650.                                         //System.out.println("@@@ RICOSTRUITO ID ACTIVE POLICY ["+idActivePolicy+"]");
  651.                                         idActivePolicy = null;
  652.                                         configurazionePolicy = null;
  653.                                         attivazionePolicy = null;
  654.                                         map = null;
  655.                                         idDatiCollezionati = null;
  656.                                     }
  657.                                    
  658.                                     idActivePolicy = new String(content);
  659.                                     map=new HashMap<IDUnivocoGroupByPolicy, DatiCollezionati>();
  660.                                    
  661.                                     //System.out.println("ENTRY ["+idActivePolicy+"] NUOVO ID ["+entryName+"]");
  662.                                    
  663.                                 }
  664.                                 else if(entryName.endsWith(ZIP_POLICY_CONFIGURAZIONE_POLICY_SUFFIX)){
  665.                                
  666.                                     configurazionePolicy = deserializer.readConfigurazionePolicy(content);
  667.                                    
  668.                                     //System.out.println("ENTRY ["+idActivePolicy+"] CONFIGURAZIONE POLICY ["+entryName+"]");
  669.                                    
  670.                                 }
  671.                                 else if(entryName.endsWith(ZIP_POLICY_ATTIVAZIONE_POLICY_SUFFIX)){
  672.                                    
  673.                                     attivazionePolicy = deserializer.readAttivazionePolicy(content);
  674.                                    
  675.                                     //System.out.println("ENTRY ["+idActivePolicy+"] ATTIVAZIONE POLICY ["+entryName+"]");
  676.                                    
  677.                                 }
  678.                                 else if(entryName.endsWith(ZIP_POLICY_ID_DATI_COLLEZIONATI_POLICY_SUFFIX)){
  679.                                    
  680.                                     idDatiCollezionati  = IDUnivocoGroupByPolicy.deserialize( new String(content) );
  681.                                    
  682.                                     //System.out.println("ENTRY ["+idActivePolicy+"] ID DATI COLLEZIONATI POLICY ["+entryName+"]");
  683.                                    
  684.                                 }
  685.                                 else if(entryName.endsWith(ZIP_POLICY_DATI_COLLEZIONATI_POLICY_SUFFIX)){
  686.                                    
  687.                                     if(idDatiCollezionati==null){
  688.                                         throw new Exception("Identificativo di group by not found");
  689.                                     }
  690.                                     map.put(idDatiCollezionati, DatiCollezionati.deserialize( new String(content) ));
  691.                                                                        
  692.                                     //System.out.println("ENTRY ["+idActivePolicy+"] DATI COLLEZIONATI POLICY ["+entryName+"]");
  693.                                    
  694.                                 }
  695.                                
  696.                             }
  697.                             else{
  698.                                 throw new Exception("Entry unknown");
  699.                             }
  700.                            
  701.                         }finally{
  702.                             try{
  703.                                 if(inputStream!=null){
  704.                                     inputStream.close();
  705.                                 }
  706.                             }catch(Exception eClose){}
  707.                         }
  708.                     }
  709.                    
  710.                 }
  711.                
  712.                 if(idActivePolicy!=null){
  713.                     // salvo precedente immagine ?
  714.                     this.mapActiveThreadsPolicy.put(idActivePolicy,
  715.                             this.buildPolicyGroupByActiveThreads(configurazionePolicy, attivazionePolicy, map, configurazioneControlloTraffico));
  716.                     //System.out.println("@@@ RICOSTRUITO FINALE ID ACTIVE POLICY ["+idActivePolicy+"]");
  717.                     idActivePolicy = null;
  718.                     configurazionePolicy = null;
  719.                     attivazionePolicy = null;
  720.                     map = null;
  721.                     idDatiCollezionati = null;
  722.                 }
  723.                
  724.             }catch(Exception e){
  725.                 throw new PolicyException("["+entryName+"] "+e.getMessage(),e);
  726.             }
  727.             finally{
  728.                 try{
  729.                     if(zipFile!=null)
  730.                         zipFile.close();
  731.                 }catch(Exception eClose){}
  732.                 try{
  733.                     if(f!=null) {
  734.                         if(!f.delete()) {
  735.                             // ignore
  736.                         }
  737.                     }
  738.                 }catch(Exception eClose){}
  739.                 try{
  740.                     if(in!=null)
  741.                         in.close();
  742.                 }catch(Exception eClose){}
  743.             }
  744.            
  745.            

  746.         }finally {
  747.             this.lock.release(slock, "initialize");
  748.         }
  749.     }
  750.    
  751.     @Override
  752.     public void cleanOldActiveThreadsPolicy() throws PolicyException{
  753.         SemaphoreLock slock =this.lock.acquireThrowRuntime("cleanOldActiveThreadsPolicy");
  754.         try {
  755.             if(this.mapActiveThreadsPolicy!=null && !this.mapActiveThreadsPolicy.isEmpty()) {
  756.                 for (String uniqueIdMap : this.mapActiveThreadsPolicy.keySet()) {
  757.                     for (PolicyGroupByActiveThreadsType tipo : GestorePolicyAttive.getTipiGestoriAttivi()) {
  758.                         if(!tipo.equals(this.type)) {
  759.                             /**System.out.println("======== RIPULISCO DALLA PARTENZA in '"+tipo+"' ["+uniqueIdMap+"] !!!");*/
  760.                             GestorePolicyAttive.getInstance(tipo).removeActiveThreadsPolicyUnsafe(uniqueIdMap);
  761.                         }
  762.                     }
  763.                 }
  764.             }
  765.         }catch(Exception e){
  766.             throw new PolicyException(e.getMessage(),e);
  767.         }  
  768.         finally {
  769.             this.lock.release(slock, "cleanOldActiveThreadsPolicy");
  770.         }
  771.     }
  772.    
  773.     private IPolicyGroupByActiveThreadsInMemory buildPolicyGroupByActiveThreads(ConfigurazionePolicy configurazionePolicy,
  774.             AttivazionePolicy attivazionePolicy, Map<IDUnivocoGroupByPolicy, DatiCollezionati> map,
  775.             ConfigurazioneControlloTraffico configurazioneControlloTraffico) throws Exception{
  776.        
  777.         if(configurazionePolicy==null){
  778.             throw new PolicyException("ConfigurazionePolicy non presente");
  779.         }
  780.         if(attivazionePolicy==null){
  781.             throw new PolicyException("AttivazionePolicy non presente");
  782.         }
  783.         if(configurazioneControlloTraffico==null){
  784.             throw new PolicyException("ConfigurazioneControlloTraffico non presente");
  785.         }
  786.        
  787.         ActivePolicy activePolicy = new ActivePolicy();
  788.         activePolicy.setConfigurazioneControlloTraffico(configurazioneControlloTraffico);
  789.         activePolicy.setConfigurazionePolicy(configurazionePolicy);
  790.         activePolicy.setInstanceConfiguration(attivazionePolicy);
  791.         activePolicy.setTipoRisorsaPolicy(TipoRisorsa.toEnumConstant(configurazionePolicy.getRisorsa(), true));    
  792.        
  793.         String uniqueIdMap = UniqueIdentifierUtilities.getUniqueId(activePolicy.getInstanceConfiguration());
  794.         IPolicyGroupByActiveThreadsInMemory p = newPolicyGroupByActiveThreadsInMemory(activePolicy, uniqueIdMap,
  795.                 null, null);
  796.        
  797.         if(map!=null && map.size()>0){
  798.             for (IDUnivocoGroupByPolicy id : map.keySet()) {
  799.                 map.get(id).initActiveRequestCounter();
  800.             }
  801.             p.initMap(map);
  802.         }
  803.        
  804.         return p;
  805.        
  806.     }
  807.    
  808.     private IPolicyGroupByActiveThreadsInMemory newPolicyGroupByActiveThreadsInMemory(ActivePolicy activePolicy, String uniqueIdMap,
  809.             DatiTransazione datiTransazione, Object state) throws PolicyException {
  810.         switch (this.type) {
  811.         case LOCAL:
  812.             return new PolicyGroupByActiveThreads(activePolicy, this.type);
  813.         case LOCAL_DIVIDED_BY_NODES:
  814.             return new PolicyGroupByActiveThreads(activePolicy, this.type);
  815.         case DATABASE:
  816.             return new PolicyGroupByActiveThreadsDB(activePolicy, this.type, uniqueIdMap,
  817.                     (state!=null && state instanceof IState) ? ((IState)state) : null,
  818.                     datiTransazione!=null ? datiTransazione.getDominio() : null,
  819.                     datiTransazione!=null ? datiTransazione.getIdTransazione() : null);
  820.         case HAZELCAST_LOCAL_CACHE:
  821.             return new PolicyGroupByActiveThreadsDistributedLocalCache(activePolicy, uniqueIdMap, HazelcastManager.getInstance(this.type));
  822.         case HAZELCAST_NEAR_CACHE:
  823.             return new PolicyGroupByActiveThreadsDistributedNearCache(activePolicy, uniqueIdMap, HazelcastManager.getInstance(this.type));
  824.         case HAZELCAST_NEAR_CACHE_UNSAFE_SYNC_MAP:
  825.             return new PolicyGroupByActiveThreadsDistributedNearCacheWithoutEntryProcessorPutSync(activePolicy, uniqueIdMap, HazelcastManager.getInstance(this.type));
  826.         case HAZELCAST_NEAR_CACHE_UNSAFE_ASYNC_MAP:
  827.             return new PolicyGroupByActiveThreadsDistributedNearCacheWithoutEntryProcessorPutAsync(activePolicy, uniqueIdMap, HazelcastManager.getInstance(this.type));
  828.         case HAZELCAST_MAP:
  829.             return new PolicyGroupByActiveThreadsDistributedNoCache(activePolicy, uniqueIdMap, HazelcastManager.getInstance(this.type));
  830.         case HAZELCAST_REPLICATED_MAP:
  831.             return new PolicyGroupByActiveThreadsDistributedReplicatedMap(activePolicy, uniqueIdMap, HazelcastManager.getInstance(this.type));
  832.         case HAZELCAST_PNCOUNTER:
  833.         case HAZELCAST_ATOMIC_LONG:
  834.         case HAZELCAST_ATOMIC_LONG_ASYNC:
  835.         case REDISSON_ATOMIC_LONG:
  836.         case REDISSON_LONGADDER:
  837.              BuilderDatiCollezionatiDistributed builder = BuilderDatiCollezionatiDistributed.getBuilder(this.type);
  838.              if(this.useCountersWithLock) {
  839.                  return new PolicyGroupByActiveThreadsDistributedCountersWithLock(activePolicy, uniqueIdMap,  builder);
  840.              }
  841.              else {
  842.                  return new PolicyGroupByActiveThreadsDistributedCounters(activePolicy, uniqueIdMap,  builder);
  843.              }
  844.         case REDISSON_MAP:
  845.            
  846.             boolean throwInitializingException = true;
  847.             RedissonClient redissonClient = null;
  848.             try {
  849.                 redissonClient = RedissonManager.getRedissonClient(throwInitializingException);
  850.             }catch(Exception e) {
  851.                 throw new PolicyException(e.getMessage(),e);
  852.             }
  853.            
  854.             return new PolicyGroupByActiveThreadsDistributedRedis(activePolicy, uniqueIdMap, redissonClient);
  855.         }
  856.         throw new PolicyException("Unsupported type '"+this.type+"'");
  857.     }
  858.    
  859. }