DatoAtomicLong.java

  1. /*
  2.  * GovWay - A customizable API Gateway
  3.  * https://govway.org
  4.  *
  5.  * Copyright (c) 2005-2025 Link.it srl (https://link.it).
  6.  *
  7.  * This program is free software: you can redistribute it and/or modify
  8.  * it under the terms of the GNU General Public License version 3, as published by
  9.  * the Free Software Foundation.
  10.  *
  11.  * This program is distributed in the hope that it will be useful,
  12.  * but WITHOUT ANY WARRANTY; without even the implied warranty of
  13.  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  14.  * GNU General Public License for more details.
  15.  *
  16.  * You should have received a copy of the GNU General Public License
  17.  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
  18.  *
  19.  */

  20. package org.openspcoop2.pdd.core.controllo_traffico.policy.driver.hazelcast.counters;

  21. import java.util.concurrent.CompletionStage;

  22. import org.openspcoop2.pdd.config.OpenSPCoop2Properties;
  23. import org.openspcoop2.pdd.logger.OpenSPCoop2Logger;
  24. import org.openspcoop2.utils.Utilities;
  25. import org.slf4j.Logger;

  26. import com.hazelcast.core.HazelcastInstance;
  27. import com.hazelcast.cp.IAtomicLong;
  28. import com.hazelcast.spi.exception.DistributedObjectDestroyedException;

  29. /**
  30.  * DatoAtomicLong
  31.  *
  32.  * @author Poli Andrea (apoli@link.it)
  33.  * @author $Author$
  34.  * @version $Rev$, $Date$
  35.  */

  36. public class DatoAtomicLong {

  37.     private HazelcastInstance hazelcast;
  38.     private String name;
  39.    
  40.     private IAtomicLong counter;
  41.    
  42.     private int failover = -1;
  43.     private int failoverCheckEveryMs = -1;
  44.    
  45.     private Logger logControlloTraffico;
  46.    
  47.     public DatoAtomicLong(HazelcastInstance hazelcast, String name) {
  48.         this.hazelcast = hazelcast;
  49.         this.name = name;
  50.         this.initCounter();
  51.         OpenSPCoop2Properties op2Props = OpenSPCoop2Properties.getInstance();
  52.         this.failover = op2Props.getHazelcastCPSubsystemDistributedObjectDestroyedExceptionFailover();
  53.         this.failoverCheckEveryMs = op2Props.getHazelcastCPSubsystemDistributedObjectDestroyedExceptionFailoverCheckEveryMs();
  54.         this.logControlloTraffico = OpenSPCoop2Logger.getLoggerOpenSPCoopControlloTraffico(op2Props.isControlloTrafficoDebug());
  55.     }
  56.     private void initCounter() {
  57.         this.counter = this.hazelcast.getCPSubsystem().getAtomicLong(this.name);
  58.     }
  59.    
  60.     public void set(long value) {
  61.         process(AtomicLongOperation.SET, value, -1);
  62.     }
  63.     public long get() {
  64.         AtomicLongResponse r = process(AtomicLongOperation.GET, -1, -1);
  65.         return r!=null ? r.valueL : -1; // else non dovrebbe succedere mai
  66.     }
  67.     public long addAndGet(long value) {
  68.         AtomicLongResponse r = process(AtomicLongOperation.ADD_AND_GET, value, -1);
  69.         return r!=null ? r.valueL : -1; // else non dovrebbe succedere mai
  70.     }
  71.     public long incrementAndGet() {
  72.         AtomicLongResponse r = process(AtomicLongOperation.INCREMENT_AND_GET, -1, -1);
  73.         return r!=null ? r.valueL : -1; // else non dovrebbe succedere mai
  74.     }
  75.     public long decrementAndGet() {
  76.         AtomicLongResponse r = process(AtomicLongOperation.DECREMENT_AND_GET, -1, -1);
  77.         return r!=null ? r.valueL : -1; // else non dovrebbe succedere mai
  78.     }
  79.     public CompletionStage<Long> addAndGetAsync(long value) {
  80.         AtomicLongResponse r = process(AtomicLongOperation.ADD_AND_GET_ASYNC, value, -1);
  81.         return r!=null ? r.valueAsync : null; // else non dovrebbe succedere mai
  82.     }
  83.     public CompletionStage<Long> incrementAndGetAsync() {
  84.         AtomicLongResponse r = process(AtomicLongOperation.INCREMENT_AND_GET_ASYNC, -1, -1);
  85.         return r!=null ? r.valueAsync : null; // else non dovrebbe succedere mai
  86.     }
  87.     public CompletionStage<Long> decrementAndGetAsync() {
  88.         AtomicLongResponse r = process(AtomicLongOperation.DECREMENT_AND_GET_ASYNC, -1, -1);
  89.         return r!=null ? r.valueAsync : null; // else non dovrebbe succedere mai
  90.     }
  91.     public boolean compareAndSet(long compare, long set) {
  92.         AtomicLongResponse r = process(AtomicLongOperation.COMPARE_AND_SET, compare, set);
  93.         return r!=null && r.valueB; // else non dovrebbe succedere mai
  94.     }
  95.     public void destroy() {
  96.         process(AtomicLongOperation.DESTROY, -1, -1);
  97.     }
  98.    
  99.     private AtomicLongResponse process(AtomicLongOperation op, long arg1, long arg2) {
  100.         String prefix = "[Hazelcast-IAtomicLong-"+this.name+" operation:"+op+"] ";
  101.         if(this.failover>0) {
  102.             return processFailOver(prefix, op, arg1, arg2);
  103.         }
  104.         else {
  105.             return operation(prefix, op, arg1, arg2);
  106.         }
  107.     }
  108.     private AtomicLongResponse processFailOver(String prefix, AtomicLongOperation op, long arg1, long arg2) {
  109.         boolean success = false;
  110.         DistributedObjectDestroyedException eFinal = null;
  111.         AtomicLongResponse v = null;
  112.         for (int i = 0; i < this.failover; i++) {
  113.             try {
  114.                 if(i>0 && this.failoverCheckEveryMs>0) {
  115.                     Utilities.sleep(this.failoverCheckEveryMs);
  116.                     initCounter();
  117.                 }
  118.                 v = operation(prefix, op, arg1, arg2);
  119.                 success=true;
  120.                 break;
  121.             } catch (DistributedObjectDestroyedException e) {
  122.                 eFinal = e;
  123.                 if(i==0) {
  124.                     this.logControlloTraffico.error(prefix+"rilevato contatore distrutto (verrà riprovata la creazione): "+e.getMessage(),e);
  125.                 }
  126.                 else {
  127.                     this.logControlloTraffico.error(prefix+"il tenativo i="+i+" di ricreare il contatore è fallito: "+e.getMessage(),e);
  128.                 }
  129.             }
  130.         }
  131.         if(!success) {
  132.             throwDistributedObjectDestroyedException(prefix, eFinal);
  133.         }
  134.         return v;
  135.     }
  136.     private void throwDistributedObjectDestroyedException(String prefix, DistributedObjectDestroyedException eFinal) {
  137.         String msg = prefix+"tutti i tentativi di ricreare il contatore sono falliti";
  138.         this.logControlloTraffico.error(msg);
  139.         if(eFinal!=null) {
  140.             throw eFinal;
  141.         }
  142.         else {
  143.             throw new DistributedObjectDestroyedException("tutti i tentativi di ricreare il contatore sono falliti"); // l'eccezione eFinal esiste
  144.         }
  145.     }
  146.    
  147.     private AtomicLongResponse operation(String prefix, AtomicLongOperation op, long arg1, long arg2){
  148.         switch (op) {
  149.         case SET:
  150.             this.counter.set(arg1);
  151.             return null;
  152.         case GET:
  153.             return new AtomicLongResponse(this.counter.get());
  154.         case ADD_AND_GET:
  155.             return new AtomicLongResponse(this.counter.addAndGet(arg1));
  156.         case INCREMENT_AND_GET:
  157.             return new AtomicLongResponse(this.counter.incrementAndGet());
  158.         case DECREMENT_AND_GET:
  159.             return new AtomicLongResponse(this.counter.decrementAndGet());
  160.         case ADD_AND_GET_ASYNC:
  161.             return new AtomicLongResponse(this.counter.addAndGetAsync(arg1));
  162.         case INCREMENT_AND_GET_ASYNC:
  163.             return new AtomicLongResponse(this.counter.incrementAndGetAsync());
  164.         case DECREMENT_AND_GET_ASYNC:
  165.             return new AtomicLongResponse(this.counter.decrementAndGetAsync());
  166.         case COMPARE_AND_SET:
  167.             return new AtomicLongResponse(this.counter.compareAndSet(arg1, arg2));
  168.         case DESTROY:
  169.             try {
  170.                 this.counter.destroy();
  171.             }catch(Throwable e) {
  172.                 this.logControlloTraffico.error(prefix+"destroy non riuscito: "+e.getMessage(),e);
  173.                 throw e;
  174.             }
  175.             return null;        
  176.         }
  177.         return null;
  178.     }
  179. }

  180. enum AtomicLongOperation {
  181.     SET,
  182.     GET,
  183.     ADD_AND_GET, INCREMENT_AND_GET, DECREMENT_AND_GET,
  184.     ADD_AND_GET_ASYNC, INCREMENT_AND_GET_ASYNC, DECREMENT_AND_GET_ASYNC,
  185.     COMPARE_AND_SET,
  186.     DESTROY
  187. }

  188. class AtomicLongResponse{
  189.     AtomicLongResponse(long l){
  190.         this.valueL = l;
  191.     }
  192.     AtomicLongResponse(boolean b){
  193.         this.valueB = b;
  194.     }
  195.     AtomicLongResponse(CompletionStage<Long> v){
  196.         this.valueAsync = v;
  197.     }
  198.     long valueL;    
  199.     boolean valueB;
  200.     CompletionStage<Long> valueAsync;
  201. }