GestorePolicyAttiveInMemory.java
- /*
- * GovWay - A customizable API Gateway
- * https://govway.org
- *
- * Copyright (c) 2005-2025 Link.it srl (https://link.it).
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU General Public License version 3, as published by
- * the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- *
- */
- package org.openspcoop2.pdd.core.controllo_traffico.policy.driver;
- import java.io.File;
- import java.io.InputStream;
- import java.io.OutputStream;
- import java.util.HashMap;
- import java.util.Iterator;
- import java.util.Map;
- import java.util.Map.Entry;
- import java.util.Set;
- import java.util.zip.ZipEntry;
- import java.util.zip.ZipFile;
- import java.util.zip.ZipOutputStream;
- import javax.activation.FileDataSource;
- import org.openspcoop2.core.controllo_traffico.AttivazionePolicy;
- import org.openspcoop2.core.controllo_traffico.ConfigurazionePolicy;
- import org.openspcoop2.core.controllo_traffico.beans.ActivePolicy;
- import org.openspcoop2.core.controllo_traffico.beans.ConfigurazioneControlloTraffico;
- import org.openspcoop2.core.controllo_traffico.beans.DatiCollezionati;
- import org.openspcoop2.core.controllo_traffico.beans.DatiTransazione;
- import org.openspcoop2.core.controllo_traffico.beans.IDUnivocoGroupByPolicy;
- import org.openspcoop2.core.controllo_traffico.beans.UniqueIdentifierUtilities;
- import org.openspcoop2.core.controllo_traffico.constants.TipoRisorsa;
- import org.openspcoop2.core.controllo_traffico.driver.IGestorePolicyAttive;
- import org.openspcoop2.core.controllo_traffico.driver.IPolicyGroupByActiveThreads;
- import org.openspcoop2.core.controllo_traffico.driver.IPolicyGroupByActiveThreadsInMemory;
- import org.openspcoop2.core.controllo_traffico.driver.PolicyException;
- import org.openspcoop2.core.controllo_traffico.driver.PolicyGroupByActiveThreadsType;
- import org.openspcoop2.core.controllo_traffico.driver.PolicyNotFoundException;
- import org.openspcoop2.core.controllo_traffico.driver.PolicyShutdownException;
- import org.openspcoop2.core.controllo_traffico.utils.serializer.JaxbDeserializer;
- import org.openspcoop2.core.controllo_traffico.utils.serializer.JaxbSerializer;
- import org.openspcoop2.pdd.config.DynamicClusterManager;
- import org.openspcoop2.pdd.config.OpenSPCoop2Properties;
- import org.openspcoop2.pdd.core.controllo_traffico.policy.driver.hazelcast.HazelcastManager;
- import org.openspcoop2.pdd.core.controllo_traffico.policy.driver.hazelcast.PolicyGroupByActiveThreadsDistributedLocalCache;
- import org.openspcoop2.pdd.core.controllo_traffico.policy.driver.hazelcast.PolicyGroupByActiveThreadsDistributedNearCache;
- import org.openspcoop2.pdd.core.controllo_traffico.policy.driver.hazelcast.PolicyGroupByActiveThreadsDistributedNearCacheWithoutEntryProcessorPutAsync;
- import org.openspcoop2.pdd.core.controllo_traffico.policy.driver.hazelcast.PolicyGroupByActiveThreadsDistributedNearCacheWithoutEntryProcessorPutSync;
- import org.openspcoop2.pdd.core.controllo_traffico.policy.driver.hazelcast.PolicyGroupByActiveThreadsDistributedNoCache;
- import org.openspcoop2.pdd.core.controllo_traffico.policy.driver.hazelcast.PolicyGroupByActiveThreadsDistributedReplicatedMap;
- import org.openspcoop2.pdd.core.controllo_traffico.policy.driver.redisson.PolicyGroupByActiveThreadsDistributedRedis;
- import org.openspcoop2.pdd.core.controllo_traffico.policy.driver.redisson.RedissonManager;
- import org.openspcoop2.pdd.services.OpenSPCoop2Startup;
- import org.openspcoop2.protocol.basic.Costanti;
- import org.openspcoop2.protocol.sdk.state.IState;
- import org.openspcoop2.utils.SemaphoreLock;
- import org.openspcoop2.utils.Utilities;
- import org.openspcoop2.utils.io.ZipUtilities;
- import org.openspcoop2.utils.resources.FileSystemUtilities;
- import org.redisson.api.RedissonClient;
- import org.slf4j.Logger;
- /**
- * GestorePolicyAttiveInMemory
- *
- * @author Poli Andrea (poli@link.it)
- * @author $Author$
- * @version $Rev$, $Date$
- */
- public class GestorePolicyAttiveInMemory implements IGestorePolicyAttive {
- /**
- * Threads allocati sulle Policy. La chiave รจ l'active-policy-id
- **/
- private Map<String, IPolicyGroupByActiveThreadsInMemory> mapActiveThreadsPolicy =
- new HashMap<String, IPolicyGroupByActiveThreadsInMemory>();
- private final org.openspcoop2.utils.Semaphore lock = new org.openspcoop2.utils.Semaphore("GestorePolicyAttiveInMemory");
-
- private static final String IMPL_DESCR = "Implementazione InMemory IGestorePolicyAttive";
- public static String getImplDescr(){
- return IMPL_DESCR;
- }
-
- private Logger log;
- private PolicyGroupByActiveThreadsType type;
- private boolean useCountersWithLock = false;
- @Override
- public void initialize(Logger log, boolean isStartupGovWay, PolicyGroupByActiveThreadsType type, Object ... params) throws PolicyException{
- this.log = log;
- this.type = type;
- if(this.type==null) {
- this.type = PolicyGroupByActiveThreadsType.LOCAL;
- }
-
- switch (this.type) {
- case LOCAL:
- break;
- case LOCAL_DIVIDED_BY_NODES:
- if(!DynamicClusterManager.isInitialized()) {
- try {
- DynamicClusterManager.initStaticInstance();
- DynamicClusterManager.getInstance().setRateLimitingGestioneCluster(true);
- DynamicClusterManager.getInstance().register(log);
- OpenSPCoop2Startup.startTimerClusterDinamicoThread();
- }catch(Exception e) {
- throw new PolicyException(e.getMessage(),e);
- }
- }
- else {
- try {
- DynamicClusterManager.getInstance().setRateLimitingGestioneCluster(true);
- }catch(Exception e) {
- throw new PolicyException(e.getMessage(),e);
- }
- }
- break;
-
- case DATABASE:
- break;
-
- case HAZELCAST_MAP:
- case HAZELCAST_NEAR_CACHE:
- case HAZELCAST_NEAR_CACHE_UNSAFE_ASYNC_MAP:
- case HAZELCAST_NEAR_CACHE_UNSAFE_SYNC_MAP:
- case HAZELCAST_PNCOUNTER:
- case HAZELCAST_ATOMIC_LONG:
- case HAZELCAST_ATOMIC_LONG_ASYNC:
- case HAZELCAST_REPLICATED_MAP:
- HazelcastManager.getInstance(this.type);
- break;
- case HAZELCAST_LOCAL_CACHE:
- HazelcastManager.getInstance(this.type);
- if(!OpenSPCoop2Startup.isStartedTimerClusteredRateLimitingLocalCache()) {
- try {
- OpenSPCoop2Startup.startTimerClusteredRateLimitingLocalCache(this);
- }catch(Exception e) {
- throw new PolicyException(e.getMessage(),e);
- }
- }
- break;
- case REDISSON_MAP:
- case REDISSON_ATOMIC_LONG:
- case REDISSON_LONGADDER:
-
- boolean throwInitializingException = true;
- if(isStartupGovWay) {
- throwInitializingException = OpenSPCoop2Properties.getInstance().isControlloTrafficoGestorePolicyInMemoryRedisThrowExceptionIfRedisNotReady();
- }
-
- try {
- RedissonManager.getRedissonClient(throwInitializingException);
- }catch(Exception e) {
- throw new PolicyException(e.getMessage(),e);
- }
- break;
- }
-
- if(this.type.isHazelcastCounters() || this.type.isRedisCounters()) {
- this.useCountersWithLock=OpenSPCoop2Properties.getInstance().isControlloTrafficoGestorePolicyInMemoryRemoteCountersUseLocalLock();
- }
- }
-
- private boolean isStop = false;
-
-
- @Override
- public PolicyGroupByActiveThreadsType getType() {
- return this.type;
- }
-
- @Override
- public IPolicyGroupByActiveThreads getActiveThreadsPolicy(ActivePolicy activePolicy,DatiTransazione datiTransazione, Object state) throws PolicyShutdownException,PolicyException {
-
- String uniqueIdMap = UniqueIdentifierUtilities.getUniqueId(activePolicy.getInstanceConfiguration());
-
- /**synchronized (this.mapActiveThreadsPolicy) {*/
- SemaphoreLock slock = this.lock.acquireThrowRuntime("getActiveThreadsPolicy(ActivePolicy)");
- try {
-
- if(this.isStop){
- throw new PolicyShutdownException("Policy Manager shutdown");
- }
-
- IPolicyGroupByActiveThreadsInMemory active = null;
- /**System.out.println("@@@ getActiveThreadsPolicy["+uniqueIdMap+"] contains["+this.mapActiveThreadsPolicy.containsKey(uniqueIdMap)+"]...");*/
- if(this.mapActiveThreadsPolicy.containsKey(uniqueIdMap)){
- active = this.mapActiveThreadsPolicy.get(uniqueIdMap);
- /**System.out.println("@@@ getActiveThreadsPolicy["+uniqueIdMap+"] GET");*/
- }
- else{
- active = newPolicyGroupByActiveThreadsInMemory(activePolicy, uniqueIdMap, datiTransazione, state);
- this.mapActiveThreadsPolicy.put(uniqueIdMap, active);
- /**System.out.println("@@@ getActiveThreadsPolicy["+uniqueIdMap+"] CREATE");*/
- }
- return active;
- }finally {
- this.lock.release(slock, "getActiveThreadsPolicy(ActivePolicy)");
- }
- }
- @Override
- public IPolicyGroupByActiveThreads getActiveThreadsPolicy(String uniqueIdMap) throws PolicyShutdownException,PolicyException,PolicyNotFoundException { // usata per la remove
- /**synchronized (this.mapActiveThreadsPolicy) {*/
- SemaphoreLock slock = this.lock.acquireThrowRuntime("getActiveThreadsPolicy(uniqueIdMap)");
- try {
-
- if(this.isStop){
- throw new PolicyShutdownException("Policy Manager shutdown");
- }
-
- IPolicyGroupByActiveThreads active = null;
- /**System.out.println("@@@ getActiveThreadsPolicy["+uniqueIdMap+"] contains["+this.mapActiveThreadsPolicy.containsKey(uniqueIdMap)+"]...");*/
- if(this.mapActiveThreadsPolicy.containsKey(uniqueIdMap)){
- active = this.mapActiveThreadsPolicy.get(uniqueIdMap);
- /**System.out.println("@@@ getActiveThreadsPolicy["+uniqueIdMap+"] GET");*/
- }
- else{
- throw new PolicyNotFoundException("ActivePolicy ["+uniqueIdMap+"] notFound");
- }
- return active;
- }finally{
- this.lock.release(slock, "getActiveThreadsPolicy(uniqueIdMap)");
- }
- }
-
- @Override
- public long sizeActivePolicyThreads(boolean sum) throws PolicyShutdownException,PolicyException{
- /**synchronized (this.mapActiveThreadsPolicy) {*/
- SemaphoreLock slock = this.lock.acquireThrowRuntime("sizeActivePolicyThreads");
- try {
-
- if(this.isStop){
- throw new PolicyShutdownException("Policy Manager shutdown");
- }
-
- if(sum){
- long sumLong = 0;
- if(this.mapActiveThreadsPolicy!=null && !this.mapActiveThreadsPolicy.isEmpty()) {
- for (String idPolicy : this.mapActiveThreadsPolicy.keySet()) {
- sumLong = sumLong +this.mapActiveThreadsPolicy.get(idPolicy).getActiveThreads();
- }
- }
- return sumLong;
- }else{
- return this.mapActiveThreadsPolicy.size();
- }
- }finally {
- this.lock.release(slock, "sizeActivePolicyThreads");
- }
- }
-
- @Override
- public String printKeysPolicy(String separator) throws PolicyShutdownException, PolicyException{
- /**synchronized (this.mapActiveThreadsPolicy) {*/
- SemaphoreLock slock = this.lock.acquireThrowRuntime("printKeysPolicy");
- try {
-
- if(this.isStop){
- throw new PolicyShutdownException("Policy Manager shutdown");
- }
-
- StringBuilder bf = new StringBuilder();
- if(this.mapActiveThreadsPolicy!=null && !this.mapActiveThreadsPolicy.isEmpty()) {
- int i = 0;
- for (String idPolicy : this.mapActiveThreadsPolicy.keySet()) {
- String key = idPolicy;
- if(i>0){
- bf.append(separator);
- }
- bf.append("Cache-"+this.type+"["+i+"]=["+key+"]");
- i++;
- }
- }
- return bf.toString();
- }finally {
- this.lock.release(slock, "printKeysPolicy");
- }
- }
-
- @Override
- public String printInfoPolicy(String id, String separatorGroups) throws PolicyShutdownException,PolicyException,PolicyNotFoundException{
- IPolicyGroupByActiveThreadsInMemory activeThreads = (IPolicyGroupByActiveThreadsInMemory) this.getActiveThreadsPolicy(id);
- try{
- return activeThreads.printInfos(this.log, separatorGroups);
- }catch(Exception e){
- throw new PolicyException(e.getMessage(),e);
- }
- }
-
- @Override
- public void removeActiveThreadsPolicy(String idActivePolicy) throws PolicyShutdownException, PolicyException{
- /**synchronized (this.mapActiveThreadsPolicy) {*/
- SemaphoreLock slock = this.lock.acquireThrowRuntime("removeActiveThreadsPolicy");
- try {
-
- if(this.isStop){
- throw new PolicyShutdownException("Policy Manager shutdown");
- }
-
- if(this.mapActiveThreadsPolicy.containsKey(idActivePolicy)){
- this.mapActiveThreadsPolicy.remove(idActivePolicy);
- }
- }finally {
- this.lock.release(slock, "removeActiveThreadsPolicy");
- }
- }
-
- @Override
- public void removeActiveThreadsPolicyUnsafe(String idActivePolicy) throws PolicyShutdownException,PolicyException{
- if(this.isStop){
- throw new PolicyShutdownException("Policy Manager shutdown");
- }
-
- IPolicyGroupByActiveThreadsInMemory policy = this.mapActiveThreadsPolicy.remove(idActivePolicy);
- if(policy!=null) {
- try {
- policy.remove();
- }catch(Throwable e) {
- this.log.error("removeActiveThreadsPolicyUnsafe failed: "+e.getMessage(),e);
- }
- }
- }
-
- @Override
- public void removeAllActiveThreadsPolicy() throws PolicyShutdownException, PolicyException{
- /**synchronized (this.mapActiveThreadsPolicy) {*/
- SemaphoreLock slock = this.lock.acquireThrowRuntime("removeAllActiveThreadsPolicy");
- try {
-
- if(this.isStop){
- throw new PolicyShutdownException("Policy Manager shutdown");
- }
-
- this.mapActiveThreadsPolicy.clear();
- }finally {
- this.lock.release(slock, "removeAllActiveThreadsPolicy");
- }
- }
-
- @Override
- public void resetCountersActiveThreadsPolicy(String idActivePolicy) throws PolicyShutdownException, PolicyException{
- /**synchronized (this.mapActiveThreadsPolicy) {*/
- SemaphoreLock slock = this.lock.acquireThrowRuntime("resetCountersActiveThreadsPolicy");
- try {
-
- if(this.isStop){
- throw new PolicyShutdownException("Policy Manager shutdown");
- }
-
- if(this.mapActiveThreadsPolicy.containsKey(idActivePolicy)){
- this.mapActiveThreadsPolicy.get(idActivePolicy).resetCounters();
- }
- }finally {
- this.lock.release(slock, "resetCountersActiveThreadsPolicy");
- }
- }
-
- @Override
- public void resetCountersAllActiveThreadsPolicy() throws PolicyShutdownException, PolicyException{
- /**synchronized (this.mapActiveThreadsPolicy) {*/
- SemaphoreLock slock = this.lock.acquireThrowRuntime("resetCountersAllActiveThreadsPolicy");
- try {
-
- if(this.isStop){
- throw new PolicyShutdownException("Policy Manager shutdown");
- }
-
- if(this.mapActiveThreadsPolicy.size()>0){
- for (String key : this.mapActiveThreadsPolicy.keySet()) {
- this.mapActiveThreadsPolicy.get(key).resetCounters();
- }
- }
- }finally {
- this.lock.release(slock, "resetCountersAllActiveThreadsPolicy");
- }
- }
-
- public Set<Entry<String, IPolicyGroupByActiveThreadsInMemory>> entrySet() throws PolicyShutdownException, PolicyException{
- Set<Entry<String, IPolicyGroupByActiveThreadsInMemory>> activeThreadsPolicies;
-
- SemaphoreLock slock = this.lock.acquireThrowRuntime("updateLocalCacheMap");
- try {
-
- if(this.isStop){
- throw new PolicyShutdownException("Policy Manager shutdown");
- }
- activeThreadsPolicies = this.mapActiveThreadsPolicy.entrySet();
- } finally {
- this.lock.release(slock, "updateLocalCacheMap");
- }
-
- return activeThreadsPolicies;
- }
-
-
- // ---- Per salvare
-
- private static final String ZIP_POLICY_PREFIX = "policy-";
- private static final String ZIP_POLICY_ID_ACTIVE_SUFFIX = "-id-active.txt";
- private static final String ZIP_POLICY_CONFIGURAZIONE_POLICY_SUFFIX = "ConfigurazionePolicy.xml";
- private static final String ZIP_POLICY_ATTIVAZIONE_POLICY_SUFFIX = "AttivazionePolicy.xml";
- private static final String ZIP_POLICY_ID_DATI_COLLEZIONATI_POLICY_SUFFIX = "-id-datiCollezionati.txt";
- private static final String ZIP_POLICY_DATI_COLLEZIONATI_POLICY_SUFFIX = "-datiCollezionati.txt";
-
- @Override
- public void serialize(OutputStream out) throws PolicyException{
-
- /**synchronized (this.mapActiveThreadsPolicy) {*/
- SemaphoreLock slock = null;
- try {
- slock = this.lock.acquireThrowRuntime("serialize");
-
- if(this.isStop){
- throw new PolicyException("Already serialized");
- }
- this.isStop = true;
-
- if(this.mapActiveThreadsPolicy==null || this.mapActiveThreadsPolicy.size()<=0){
- return;
- }
-
- ZipOutputStream zipOut = null;
- try{
- zipOut = new ZipOutputStream(out);
- String rootPackageDir = "";
- // Il codice dopo fissa il problema di inserire una directory nel package.
- // Commentare la riga sotto per ripristinare il vecchio comportamento.
- rootPackageDir = Costanti.OPENSPCOOP2_ARCHIVE_ROOT_DIR+File.separatorChar;
-
- // indice
- int index = 1;
-
- // Chiavi possiedono la policy id
- for (String idActivePolicy : this.mapActiveThreadsPolicy.keySet()) {
-
- // Id File
- String idFileActivePolicy = ZIP_POLICY_PREFIX+index;
-
- // File contenente l'identificativo della policy attivata
- String nomeFile = idFileActivePolicy+ZIP_POLICY_ID_ACTIVE_SUFFIX;
- zipOut.putNextEntry(new ZipEntry(rootPackageDir+nomeFile));
- zipOut.write(idActivePolicy.getBytes());
-
- // GroupByThread
- IPolicyGroupByActiveThreadsInMemory active = this.mapActiveThreadsPolicy.get(idActivePolicy);
- if(active!=null){
-
- ActivePolicy activePolicy = active.getActivePolicy();
- JaxbSerializer serializer = new JaxbSerializer();
-
- byte[] cPolicy = null;
- if(activePolicy.getConfigurazionePolicy()!=null){
- try {
- cPolicy = serializer.toByteArray(activePolicy.getConfigurazionePolicy());
- }catch(Throwable t) {
- this.log.error("["+this.type+"] Serializzazione configurazione policy '"+activePolicy.getConfigurazionePolicy().getIdPolicy()+"' fallita: "+t.getMessage(),t);
- }
- if(cPolicy==null) {
- continue;
- }
- }
-
- byte[] aPolicy = null;
- if(activePolicy.getInstanceConfiguration()!=null){
- try {
- aPolicy = serializer.toByteArray(activePolicy.getInstanceConfiguration());
- }catch(Throwable t) {
- this.log.error("["+this.type+"] Serializzazione attivazione policy '"+activePolicy.getInstanceConfiguration().getAlias()+"' ("+activePolicy.getInstanceConfiguration().getIdActivePolicy()+") fallita: "+t.getMessage(),t);
- }
- if(aPolicy==null) {
- continue;
- }
- }
-
-
- // ConfigurazionePolicy
- if(activePolicy.getConfigurazionePolicy()!=null){
- nomeFile = idFileActivePolicy+File.separatorChar+ZIP_POLICY_CONFIGURAZIONE_POLICY_SUFFIX;
- zipOut.putNextEntry(new ZipEntry(rootPackageDir+nomeFile));
- zipOut.write(cPolicy);
- }
-
- // AttivazionePolicy
- if(activePolicy.getInstanceConfiguration()!=null){
- nomeFile = idFileActivePolicy+File.separatorChar+ZIP_POLICY_ATTIVAZIONE_POLICY_SUFFIX;
- zipOut.putNextEntry(new ZipEntry(rootPackageDir+nomeFile));
- zipOut.write(aPolicy);
- }
-
- Map<IDUnivocoGroupByPolicy, DatiCollezionati> map = active.getMapActiveThreads();
- if(map!=null && map.size()>0){
-
- // indice
- int indexDatoCollezionato = 1;
-
- // Chiavi dei raggruppamenti
- for (IDUnivocoGroupByPolicy idUnivocoGroupByPolicy : map.keySet()) {
-
- // Id Raggruppamento
- String idFileRaggruppamento = idFileActivePolicy+File.separatorChar+"groupBy"+File.separatorChar+"groupBy-"+indexDatoCollezionato;
-
- String id = null;
- try {
- id = IDUnivocoGroupByPolicy.serialize(idUnivocoGroupByPolicy);
- }catch(Throwable t) {
- this.log.error("["+this.type+"] Serializzazione idUnivocoGroupByPolicy ("+idUnivocoGroupByPolicy+") della policy '"+activePolicy.getInstanceConfiguration().getAlias()+"' ("+activePolicy.getInstanceConfiguration().getIdActivePolicy()+") fallita: "+t.getMessage(),t);
- }
- if(id==null) {
- continue;
- }
-
- DatiCollezionati datiCollezionati = map.get(idUnivocoGroupByPolicy);
- String dati = null;
- try {
- dati = DatiCollezionati.serialize(datiCollezionati);
- }catch(Throwable t) {
- this.log.error("["+this.type+"] Serializzazione dati per idUnivocoGroupByPolicy ("+idUnivocoGroupByPolicy+") della policy '"+activePolicy.getInstanceConfiguration().getAlias()+"' ("+activePolicy.getInstanceConfiguration().getIdActivePolicy()+") fallita: "+t.getMessage(),t);
- }
- if(dati==null) {
- continue;
- }
-
- // File contenente l'identificativo del raggruppamento
- nomeFile = idFileRaggruppamento+ZIP_POLICY_ID_DATI_COLLEZIONATI_POLICY_SUFFIX;
- zipOut.putNextEntry(new ZipEntry(rootPackageDir+nomeFile));
- zipOut.write(id.getBytes());
-
- // DatiCollezionati
- // NOTA: l'ulteriore directory serve a garantire il corretto ordine di ricostruzione
- nomeFile = idFileRaggruppamento+File.separatorChar+"dati"+ZIP_POLICY_DATI_COLLEZIONATI_POLICY_SUFFIX;
- zipOut.putNextEntry(new ZipEntry(rootPackageDir+nomeFile));
- zipOut.write(dati.getBytes());
-
- // increment
- indexDatoCollezionato++;
- }
-
- }
- }
-
- // increment
- index++;
- }
-
- zipOut.flush();
- }catch(Exception e){
- throw new PolicyException(e.getMessage(),e);
- }finally{
- try{
- if(zipOut!=null)
- zipOut.close();
- }catch(Exception eClose){}
- }
- }finally {
- this.lock.release(slock, "serialize");
- }
- }
-
- @Override
- public void initialize(InputStream in,ConfigurazioneControlloTraffico configurazioneControlloTraffico) throws PolicyException{
-
- /**synchronized (this.mapActiveThreadsPolicy) {*/
- SemaphoreLock slock = null;
- try {
- slock = this.lock.acquireThrowRuntime("initialize");
-
- if(in==null){
- return;
- }
-
- File f = null;
- ZipFile zipFile = null;
- String entryName = null;
- try{
-
- // Leggo InputStream
- byte [] bytesIn = Utilities.getAsByteArray(in);
- in.close();
- in = null;
- if(bytesIn==null || bytesIn.length<=0){
- return;
- }
- f = FileSystemUtilities.createTempFile("controlloTraffico", ".tmp");
- FileSystemUtilities.writeFile(f, bytesIn);
-
- // Leggo Struttura ZIP
- try {
- zipFile = new ZipFile(f);
- }catch(Throwable t) {
- this.log.error("Inizializzazione immagine ControlloTraffico precedente allo shutdown non ripristinabile, immagine corrotta: "+t.getMessage(),t);
- return;
- }
-
- JaxbDeserializer deserializer = new JaxbDeserializer();
-
- String rootPackageDir = Costanti.OPENSPCOOP2_ARCHIVE_ROOT_DIR+File.separatorChar;
-
- String rootDir = null;
-
- String idActivePolicy = null;
- ConfigurazionePolicy configurazionePolicy = null;
- AttivazionePolicy attivazionePolicy = null;
- Map<IDUnivocoGroupByPolicy, DatiCollezionati> map = null;
-
- IDUnivocoGroupByPolicy idDatiCollezionati = null;
-
- Iterator<ZipEntry> it = ZipUtilities.entries(zipFile, true);
- while (it.hasNext()) {
- ZipEntry zipEntry = (ZipEntry) it.next();
- entryName = ZipUtilities.operativeSystemConversion(zipEntry.getName());
-
- //System.out.println("FILE NAME: "+entryName);
- //System.out.println("SIZE: "+entry.getSize());
- // Il codice dopo fissa il problema di inserire una directory nel package.
- // Commentare la riga sotto per ripristinare il vecchio comportamento.
- if(rootDir==null){
- // Calcolo ROOT DIR
- rootDir=ZipUtilities.getRootDir(entryName);
- }
-
- if(zipEntry.isDirectory()) {
- continue; // directory
- }
- else {
- FileDataSource fds = new FileDataSource(entryName);
- String nome = fds.getName();
- String tipo = nome.substring(nome.lastIndexOf(".")+1,nome.length());
- tipo = tipo.toUpperCase();
- //System.out.println("VERIFICARE NAME["+nome+"] TIPO["+tipo+"]");
-
- InputStream inputStream = zipFile.getInputStream(zipEntry);
- byte[]content = Utilities.getAsByteArray(inputStream);
-
- try{
-
- if(entryName.startsWith((rootPackageDir+ZIP_POLICY_PREFIX)) ){
-
- if(entryName.endsWith(ZIP_POLICY_ID_ACTIVE_SUFFIX)){
-
- if(idActivePolicy!=null){
- // salvo precedente immagine
- this.mapActiveThreadsPolicy.put(idActivePolicy,
- this.buildPolicyGroupByActiveThreads(configurazionePolicy, attivazionePolicy, map, configurazioneControlloTraffico));
- //System.out.println("@@@ RICOSTRUITO ID ACTIVE POLICY ["+idActivePolicy+"]");
- idActivePolicy = null;
- configurazionePolicy = null;
- attivazionePolicy = null;
- map = null;
- idDatiCollezionati = null;
- }
-
- idActivePolicy = new String(content);
- map=new HashMap<IDUnivocoGroupByPolicy, DatiCollezionati>();
-
- //System.out.println("ENTRY ["+idActivePolicy+"] NUOVO ID ["+entryName+"]");
-
- }
- else if(entryName.endsWith(ZIP_POLICY_CONFIGURAZIONE_POLICY_SUFFIX)){
-
- configurazionePolicy = deserializer.readConfigurazionePolicy(content);
-
- //System.out.println("ENTRY ["+idActivePolicy+"] CONFIGURAZIONE POLICY ["+entryName+"]");
-
- }
- else if(entryName.endsWith(ZIP_POLICY_ATTIVAZIONE_POLICY_SUFFIX)){
-
- attivazionePolicy = deserializer.readAttivazionePolicy(content);
-
- //System.out.println("ENTRY ["+idActivePolicy+"] ATTIVAZIONE POLICY ["+entryName+"]");
-
- }
- else if(entryName.endsWith(ZIP_POLICY_ID_DATI_COLLEZIONATI_POLICY_SUFFIX)){
-
- idDatiCollezionati = IDUnivocoGroupByPolicy.deserialize( new String(content) );
-
- //System.out.println("ENTRY ["+idActivePolicy+"] ID DATI COLLEZIONATI POLICY ["+entryName+"]");
-
- }
- else if(entryName.endsWith(ZIP_POLICY_DATI_COLLEZIONATI_POLICY_SUFFIX)){
-
- if(idDatiCollezionati==null){
- throw new Exception("Identificativo di group by not found");
- }
- map.put(idDatiCollezionati, DatiCollezionati.deserialize( new String(content) ));
-
- //System.out.println("ENTRY ["+idActivePolicy+"] DATI COLLEZIONATI POLICY ["+entryName+"]");
-
- }
-
- }
- else{
- throw new Exception("Entry unknown");
- }
-
- }finally{
- try{
- if(inputStream!=null){
- inputStream.close();
- }
- }catch(Exception eClose){}
- }
- }
-
- }
-
- if(idActivePolicy!=null){
- // salvo precedente immagine ?
- this.mapActiveThreadsPolicy.put(idActivePolicy,
- this.buildPolicyGroupByActiveThreads(configurazionePolicy, attivazionePolicy, map, configurazioneControlloTraffico));
- //System.out.println("@@@ RICOSTRUITO FINALE ID ACTIVE POLICY ["+idActivePolicy+"]");
- idActivePolicy = null;
- configurazionePolicy = null;
- attivazionePolicy = null;
- map = null;
- idDatiCollezionati = null;
- }
-
- }catch(Exception e){
- throw new PolicyException("["+entryName+"] "+e.getMessage(),e);
- }
- finally{
- try{
- if(zipFile!=null)
- zipFile.close();
- }catch(Exception eClose){}
- try{
- if(f!=null) {
- if(!f.delete()) {
- // ignore
- }
- }
- }catch(Exception eClose){}
- try{
- if(in!=null)
- in.close();
- }catch(Exception eClose){}
- }
-
-
- }finally {
- this.lock.release(slock, "initialize");
- }
- }
-
- @Override
- public void cleanOldActiveThreadsPolicy() throws PolicyException{
- SemaphoreLock slock =this.lock.acquireThrowRuntime("cleanOldActiveThreadsPolicy");
- try {
- if(this.mapActiveThreadsPolicy!=null && !this.mapActiveThreadsPolicy.isEmpty()) {
- for (String uniqueIdMap : this.mapActiveThreadsPolicy.keySet()) {
- for (PolicyGroupByActiveThreadsType tipo : GestorePolicyAttive.getTipiGestoriAttivi()) {
- if(!tipo.equals(this.type)) {
- /**System.out.println("======== RIPULISCO DALLA PARTENZA in '"+tipo+"' ["+uniqueIdMap+"] !!!");*/
- GestorePolicyAttive.getInstance(tipo).removeActiveThreadsPolicyUnsafe(uniqueIdMap);
- }
- }
- }
- }
- }catch(Exception e){
- throw new PolicyException(e.getMessage(),e);
- }
- finally {
- this.lock.release(slock, "cleanOldActiveThreadsPolicy");
- }
- }
-
- private IPolicyGroupByActiveThreadsInMemory buildPolicyGroupByActiveThreads(ConfigurazionePolicy configurazionePolicy,
- AttivazionePolicy attivazionePolicy, Map<IDUnivocoGroupByPolicy, DatiCollezionati> map,
- ConfigurazioneControlloTraffico configurazioneControlloTraffico) throws Exception{
-
- if(configurazionePolicy==null){
- throw new PolicyException("ConfigurazionePolicy non presente");
- }
- if(attivazionePolicy==null){
- throw new PolicyException("AttivazionePolicy non presente");
- }
- if(configurazioneControlloTraffico==null){
- throw new PolicyException("ConfigurazioneControlloTraffico non presente");
- }
-
- ActivePolicy activePolicy = new ActivePolicy();
- activePolicy.setConfigurazioneControlloTraffico(configurazioneControlloTraffico);
- activePolicy.setConfigurazionePolicy(configurazionePolicy);
- activePolicy.setInstanceConfiguration(attivazionePolicy);
- activePolicy.setTipoRisorsaPolicy(TipoRisorsa.toEnumConstant(configurazionePolicy.getRisorsa(), true));
-
- String uniqueIdMap = UniqueIdentifierUtilities.getUniqueId(activePolicy.getInstanceConfiguration());
- IPolicyGroupByActiveThreadsInMemory p = newPolicyGroupByActiveThreadsInMemory(activePolicy, uniqueIdMap,
- null, null);
-
- if(map!=null && map.size()>0){
- for (IDUnivocoGroupByPolicy id : map.keySet()) {
- map.get(id).initActiveRequestCounter();
- }
- p.initMap(map);
- }
-
- return p;
-
- }
-
- private IPolicyGroupByActiveThreadsInMemory newPolicyGroupByActiveThreadsInMemory(ActivePolicy activePolicy, String uniqueIdMap,
- DatiTransazione datiTransazione, Object state) throws PolicyException {
- switch (this.type) {
- case LOCAL:
- return new PolicyGroupByActiveThreads(activePolicy, this.type);
- case LOCAL_DIVIDED_BY_NODES:
- return new PolicyGroupByActiveThreads(activePolicy, this.type);
- case DATABASE:
- return new PolicyGroupByActiveThreadsDB(activePolicy, this.type, uniqueIdMap,
- (state!=null && state instanceof IState) ? ((IState)state) : null,
- datiTransazione!=null ? datiTransazione.getDominio() : null,
- datiTransazione!=null ? datiTransazione.getIdTransazione() : null);
- case HAZELCAST_LOCAL_CACHE:
- return new PolicyGroupByActiveThreadsDistributedLocalCache(activePolicy, uniqueIdMap, HazelcastManager.getInstance(this.type));
- case HAZELCAST_NEAR_CACHE:
- return new PolicyGroupByActiveThreadsDistributedNearCache(activePolicy, uniqueIdMap, HazelcastManager.getInstance(this.type));
- case HAZELCAST_NEAR_CACHE_UNSAFE_SYNC_MAP:
- return new PolicyGroupByActiveThreadsDistributedNearCacheWithoutEntryProcessorPutSync(activePolicy, uniqueIdMap, HazelcastManager.getInstance(this.type));
- case HAZELCAST_NEAR_CACHE_UNSAFE_ASYNC_MAP:
- return new PolicyGroupByActiveThreadsDistributedNearCacheWithoutEntryProcessorPutAsync(activePolicy, uniqueIdMap, HazelcastManager.getInstance(this.type));
- case HAZELCAST_MAP:
- return new PolicyGroupByActiveThreadsDistributedNoCache(activePolicy, uniqueIdMap, HazelcastManager.getInstance(this.type));
- case HAZELCAST_REPLICATED_MAP:
- return new PolicyGroupByActiveThreadsDistributedReplicatedMap(activePolicy, uniqueIdMap, HazelcastManager.getInstance(this.type));
- case HAZELCAST_PNCOUNTER:
- case HAZELCAST_ATOMIC_LONG:
- case HAZELCAST_ATOMIC_LONG_ASYNC:
- case REDISSON_ATOMIC_LONG:
- case REDISSON_LONGADDER:
- BuilderDatiCollezionatiDistributed builder = BuilderDatiCollezionatiDistributed.getBuilder(this.type);
- if(this.useCountersWithLock) {
- return new PolicyGroupByActiveThreadsDistributedCountersWithLock(activePolicy, uniqueIdMap, builder);
- }
- else {
- return new PolicyGroupByActiveThreadsDistributedCounters(activePolicy, uniqueIdMap, builder);
- }
- case REDISSON_MAP:
-
- boolean throwInitializingException = true;
- RedissonClient redissonClient = null;
- try {
- redissonClient = RedissonManager.getRedissonClient(throwInitializingException);
- }catch(Exception e) {
- throw new PolicyException(e.getMessage(),e);
- }
-
- return new PolicyGroupByActiveThreadsDistributedRedis(activePolicy, uniqueIdMap, redissonClient);
- }
- throw new PolicyException("Unsupported type '"+this.type+"'");
- }
-
- }