PolicyGroupByActiveThreadsDistributedRedis.java

  1. /*
  2.  * GovWay - A customizable API Gateway
  3.  * https://govway.org
  4.  *
  5.  * Copyright (c) 2005-2025 Link.it srl (https://link.it).
  6.  *
  7.  * This program is free software: you can redistribute it and/or modify
  8.  * it under the terms of the GNU General Public License version 3, as published by
  9.  * the Free Software Foundation.
  10.  *
  11.  * This program is distributed in the hope that it will be useful,
  12.  * but WITHOUT ANY WARRANTY; without even the implied warranty of
  13.  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  14.  * GNU General Public License for more details.
  15.  *
  16.  * You should have received a copy of the GNU General Public License
  17.  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
  18.  *
  19.  */
  20. package org.openspcoop2.pdd.core.controllo_traffico.policy.driver.redisson;

  21. import java.util.ArrayList;
  22. import java.util.Date;
  23. import java.util.List;

  24. import org.openspcoop2.core.controllo_traffico.beans.ActivePolicy;
  25. import org.openspcoop2.core.controllo_traffico.beans.DatiCollezionati;
  26. import org.openspcoop2.core.controllo_traffico.beans.IDUnivocoGroupBy;
  27. import org.openspcoop2.core.controllo_traffico.beans.IDUnivocoGroupByPolicy;
  28. import org.openspcoop2.core.controllo_traffico.beans.IDUnivocoGroupByPolicyMapId;
  29. import org.openspcoop2.core.controllo_traffico.beans.MisurazioniTransazione;
  30. import org.openspcoop2.core.controllo_traffico.beans.UniqueIdentifierUtilities;
  31. import org.openspcoop2.core.controllo_traffico.constants.Costanti;
  32. import org.openspcoop2.core.controllo_traffico.driver.IPolicyGroupByActiveThreadsInMemory;
  33. import org.openspcoop2.core.controllo_traffico.driver.PolicyException;
  34. import org.openspcoop2.core.controllo_traffico.driver.PolicyGroupByActiveThreadsType;
  35. import org.openspcoop2.core.controllo_traffico.driver.PolicyNotFoundException;
  36. import org.openspcoop2.pdd.config.OpenSPCoop2Properties;
  37. import org.openspcoop2.pdd.core.controllo_traffico.policy.PolicyDateUtils;
  38. import org.openspcoop2.pdd.logger.OpenSPCoop2Logger;
  39. import org.openspcoop2.protocol.utils.EsitiProperties;
  40. import org.openspcoop2.utils.Map;
  41. import org.openspcoop2.utils.UtilsException;
  42. import org.redisson.api.RMap;
  43. import org.redisson.api.RTransaction;
  44. import org.redisson.api.RedissonClient;
  45. import org.redisson.api.TransactionOptions;
  46. import org.redisson.transaction.TransactionException;
  47. import org.slf4j.Logger;

  48. /**    
  49.  *  PolicyGroupByActiveThreadsDistributedRedis
  50.  *
  51.  * @author Francesco Scarlato (scarlato@link.it)
  52.  * @author $Author$
  53.  * @version $Rev$, $Date$
  54.  */
  55. public class PolicyGroupByActiveThreadsDistributedRedis  implements IPolicyGroupByActiveThreadsInMemory  {
  56.    
  57.     private final ActivePolicy activePolicy;
  58.     private final RedissonClient redisson;
  59.     private final RMap<IDUnivocoGroupByPolicy, DatiCollezionati> distributedMap;
  60.     private String mapId = null;
  61.    
  62.     protected PolicyGroupByActiveThreadsType type;
  63.    
  64.     protected String uniqueIdMap_idActivePolicy;
  65.     protected Date uniqueIdMap_updateTime;
  66.    
  67.     public PolicyGroupByActiveThreadsDistributedRedis(ActivePolicy activePolicy, String uniqueIdMap,
  68.             RedissonClient redisson) throws PolicyException {
  69.         this.activePolicy = activePolicy;
  70.         this.redisson = redisson;
  71.        
  72.         this.uniqueIdMap_idActivePolicy = UniqueIdentifierUtilities.extractIdActivePolicy(uniqueIdMap);
  73.         try {
  74.             this.uniqueIdMap_updateTime = UniqueIdentifierUtilities.extractUpdateTimeActivePolicy(uniqueIdMap);
  75.         }catch(Exception e) {
  76.             throw new PolicyException(e.getMessage(),e);
  77.         }
  78.        
  79.         OpenSPCoop2Properties op2Properties = OpenSPCoop2Properties.getInstance();
  80.         Logger log = OpenSPCoop2Logger.getLoggerOpenSPCoopControlloTraffico(op2Properties.isControlloTrafficoDebug());
  81.        
  82.         this.type = PolicyGroupByActiveThreadsType.REDISSON_MAP;
  83.        
  84.         boolean oneMapForeachPolicy = OpenSPCoop2Properties.getInstance().isControlloTrafficoGestorePolicyInMemoryRedisOneMapForeachPolicy();
  85.         String mapName = "redis-";
  86.         if (oneMapForeachPolicy) {
  87.             this.mapId = mapName + this.uniqueIdMap_idActivePolicy + "-rate-limiting";
  88.             this.distributedMap = redisson.getMap(this.mapId);
  89.             log.info("Hazelcast: Utilizzo Una Distributed Map per gruppo.");
  90.         } else {
  91.             this.mapId = mapName + "rate-limiting";
  92.             this.distributedMap = redisson.getMap(this.mapId);
  93.             log.info("Hazelcast: Utilizzo Una Distributed Map globale.");
  94.         }
  95.     }

  96.    
  97.     @Override
  98.     public DatiCollezionati registerStartRequest(Logger log, String idTransazione, IDUnivocoGroupByPolicy datiGroupBy, Map<Object> ctx)
  99.             throws PolicyException {
  100.        
  101.         RTransaction transaction = this.redisson.createTransaction(TransactionOptions.defaults());
  102.         RMap<IDUnivocoGroupByPolicy, DatiCollezionati> map = transaction.getMap(this.mapId);
  103.        
  104.         DatiCollezionati datiCollezionati = map.get(datiGroupBy);
  105.         if (datiCollezionati == null) {
  106.             Date gestorePolicyConfigDate = PolicyDateUtils.readGestorePolicyConfigDateIntoContext(ctx);
  107.             datiCollezionati = new DatiCollezionati(this.activePolicy.getInstanceConfiguration().getUpdateTime(), gestorePolicyConfigDate);
  108.         }
  109.         else {
  110.             if(datiCollezionati.getUpdatePolicyDate()!=null) {
  111.                 if(!datiCollezionati.getUpdatePolicyDate().equals(this.activePolicy.getInstanceConfiguration().getUpdateTime())) {
  112.                     // data aggiornata
  113.                     datiCollezionati.resetCounters(this.activePolicy.getInstanceConfiguration().getUpdateTime());
  114.                 }
  115.             }
  116.         }
  117.        
  118.         datiCollezionati.registerStartRequest(log, this.activePolicy, ctx);
  119.        
  120.         // mi salvo l'attuale stato
  121.         DatiCollezionati datiCollezionatiReaded = (DatiCollezionati) datiCollezionati.newInstance();
  122.        
  123.         map.fastPut(datiGroupBy, datiCollezionati);
  124.        
  125.         try {
  126.             transaction.commit();
  127.         } catch(TransactionException e) {
  128.             transaction.rollback();
  129.             throw new PolicyException("Errore durante la transazione registerStartRequest di Redis", e);
  130.         }
  131.        
  132.         return datiCollezionatiReaded;
  133.     }

  134.    
  135.     @Override
  136.     public DatiCollezionati updateDatiStartRequestApplicabile(Logger log, String idTransazione,
  137.             IDUnivocoGroupByPolicy datiGroupBy, Map<Object> ctx) throws PolicyException, PolicyNotFoundException {
  138.        
  139.         RTransaction transaction = this.redisson.createTransaction(TransactionOptions.defaults());
  140.         RMap<IDUnivocoGroupByPolicy, DatiCollezionati> map = transaction.getMap(this.mapId);

  141.         DatiCollezionati datiCollezionati = map.get(datiGroupBy);
  142.         if(datiCollezionati == null){
  143.             throw new PolicyNotFoundException("Non sono presenti alcun threads registrati per la richiesta con dati identificativi ["+datiGroupBy.toString()+"]");
  144.         }      
  145.        
  146.         datiCollezionati.updateDatiStartRequestApplicabile(log, this.activePolicy, ctx);
  147.        
  148.         // mi salvo l'attuale stato
  149.         DatiCollezionati datiCollezionatiReaded = (DatiCollezionati) datiCollezionati.newInstance();
  150.        
  151.         map.fastPut(datiGroupBy, datiCollezionati);
  152.        
  153.         try {
  154.             transaction.commit();
  155.         } catch(TransactionException e) {
  156.             transaction.rollback();
  157.             throw new PolicyException("Errore durante la transazione registerStartRequest di Redis", e);
  158.         }
  159.        
  160.         return datiCollezionatiReaded;
  161.     }

  162.     @Override
  163.     public void registerStopRequest(Logger log, String idTransazione, IDUnivocoGroupByPolicy datiGroupBy, Map<Object> ctx,
  164.             MisurazioniTransazione dati, boolean isApplicabile, boolean isViolata)
  165.             throws PolicyException, PolicyNotFoundException {
  166.        
  167.         RTransaction transaction = this.redisson.createTransaction(TransactionOptions.defaults());
  168.         RMap<IDUnivocoGroupByPolicy, DatiCollezionati> map = transaction.getMap(this.mapId);

  169.         DatiCollezionati datiCollezionati = map.get(datiGroupBy);
  170.         if(datiCollezionati == null){
  171.             throw new PolicyNotFoundException("Non sono presenti alcun threads registrati per la richiesta con dati identificativi ["+datiGroupBy.toString()+"]");
  172.         }  
  173.        
  174.         if(isApplicabile) {
  175.             datiCollezionati.registerEndRequest(log, this.activePolicy, ctx, dati);
  176.             List<Integer> esitiCodeOk = null;
  177.             List<Integer> esitiCodeKo_senzaFaultApplicativo = null;
  178.             List<Integer> esitiCodeFaultApplicativo = null;
  179.             try {
  180.                 // In queste tre di sotto pare il logger non venga utilizzato
  181.                 EsitiProperties esitiProperties = EsitiProperties.getInstanceFromProtocolName(log,dati.getProtocollo());
  182.                 esitiCodeOk = esitiProperties.getEsitiCodeOk_senzaFaultApplicativo();
  183.                 esitiCodeKo_senzaFaultApplicativo = esitiProperties.getEsitiCodeKo_senzaFaultApplicativo();
  184.                 esitiCodeFaultApplicativo = esitiProperties.getEsitiCodeFaultApplicativo();
  185.                 datiCollezionati.updateDatiEndRequestApplicabile(
  186.                         log,    // logger
  187.                         this.activePolicy, ctx, dati,
  188.                         esitiCodeOk,esitiCodeKo_senzaFaultApplicativo, esitiCodeFaultApplicativo,
  189.                         isViolata);
  190.             }catch(Exception e) {
  191.                 throw new PolicyException(e.getMessage(),e);
  192.             }
  193.             map.fastPut(datiGroupBy, datiCollezionati);
  194.         } else {
  195.             datiCollezionati.registerEndRequest(null, this.activePolicy, ctx, dati);
  196.             map.fastPut(datiGroupBy, datiCollezionati);
  197.         }
  198.        
  199.     }

  200.    
  201.     @Override
  202.     public ActivePolicy getActivePolicy() {
  203.         return this.activePolicy;
  204.     }

  205.    
  206.     @Override
  207.     public java.util.Map<IDUnivocoGroupByPolicy, DatiCollezionati> getMapActiveThreads() {
  208.         return this.distributedMap;
  209.     }

  210.    
  211.     @Override
  212.     public void initMap(java.util.Map<IDUnivocoGroupByPolicy, DatiCollezionati> map) {
  213.         this.distributedMap.putAll(map);        
  214.     }

  215.    
  216.     @Override
  217.     public long getActiveThreads() {
  218.         return getActiveThreads(null);
  219.     }

  220.     @Override
  221.     public long getActiveThreads(IDUnivocoGroupByPolicy filtro) {
  222.         long counter = 0;
  223.        
  224.         // Recupero tutta la map distribuita
  225.         var cloned = this.distributedMap.readAllEntrySet();
  226.        
  227.         for (var entry :  cloned) {
  228.             if(filtro!=null){
  229.                 IDUnivocoGroupBy<IDUnivocoGroupByPolicy> idAstype = (IDUnivocoGroupBy<IDUnivocoGroupByPolicy>) entry.getKey();
  230.                 if(!idAstype.match(filtro)){
  231.                     continue;
  232.                 }
  233.                 counter += entry.getValue().getActiveRequestCounter();
  234.             }
  235.         }
  236.        
  237.         return counter;
  238.     }

  239.     @Override
  240.     public void resetCounters() {
  241.         this.distributedMap.forEach( (var id, var dati) -> {
  242.             dati.resetCounters();
  243.         });
  244.     }
  245.    
  246.     @Override
  247.     public void remove() throws UtilsException{
  248.     //      FIX: iterando nella maniera sottostante si ottiene il seguente errore se si usa la near-cache: key cannot be of type Data! hazelcast
  249.     //      for (var entry : this.distributedMap) {
  250.         List<IDUnivocoGroupByPolicy> deleteList = new ArrayList<IDUnivocoGroupByPolicy>();
  251.         for (IDUnivocoGroupByPolicy datiGroupBy : this.distributedMap.keySet()) {
  252.             if(datiGroupBy instanceof IDUnivocoGroupByPolicyMapId){
  253.                 IDUnivocoGroupByPolicyMapId mapId = (IDUnivocoGroupByPolicyMapId) datiGroupBy;
  254.                 if(this.uniqueIdMap_idActivePolicy.equals(mapId.getUniqueMapId())) {
  255.                     deleteList.add(datiGroupBy);
  256.                 }
  257.             }
  258.         }
  259.         while(!deleteList.isEmpty()) {
  260.             IDUnivocoGroupByPolicy id = deleteList.remove(0);
  261.             this.distributedMap.remove(id);
  262.         }
  263.     }

  264.     @Override
  265.     public String printInfos(Logger log, String separatorGroups) throws UtilsException {
  266.         StringBuilder bf = new StringBuilder();

  267.         this.distributedMap.forEach( (var id, var dati) -> {
  268.             IDUnivocoGroupByPolicy datiGroupBy = id;
  269.             bf.append(separatorGroups);
  270.             bf.append("\n");
  271.             bf.append(Costanti.LABEL_MODALITA_SINCRONIZZAZIONE).append(" ").append(this.type.toLabel());
  272.             bf.append("\n");
  273.             bf.append("Criterio di Collezionamento dei Dati\n");
  274.             bf.append(datiGroupBy.toString(true));
  275.             bf.append("\n");
  276.             dati.checkDate(log, this.activePolicy); // imposta correttamente gli intervalli
  277.             bf.append(dati.toString());
  278.             bf.append("\n");            
  279.         });
  280.        
  281.         if(bf.length()<=0){
  282.             bf.append("Nessuna informazione disponibile");
  283.             return bf.toString();
  284.         }
  285.         else{
  286.             return bf.toString()+separatorGroups;
  287.         }
  288.     }

  289. }