DatoRAtomicLong.java

  1. /*
  2.  * GovWay - A customizable API Gateway
  3.  * https://govway.org
  4.  *
  5.  * Copyright (c) 2005-2025 Link.it srl (https://link.it).
  6.  *
  7.  * This program is free software: you can redistribute it and/or modify
  8.  * it under the terms of the GNU General Public License version 3, as published by
  9.  * the Free Software Foundation.
  10.  *
  11.  * This program is distributed in the hope that it will be useful,
  12.  * but WITHOUT ANY WARRANTY; without even the implied warranty of
  13.  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  14.  * GNU General Public License for more details.
  15.  *
  16.  * You should have received a copy of the GNU General Public License
  17.  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
  18.  *
  19.  */

  20. package org.openspcoop2.pdd.core.controllo_traffico.policy.driver.redisson.counters;

  21. import java.util.concurrent.CompletionStage;

  22. import org.openspcoop2.pdd.config.OpenSPCoop2Properties;
  23. import org.openspcoop2.pdd.logger.OpenSPCoop2Logger;
  24. import org.openspcoop2.utils.Utilities;
  25. import org.openspcoop2.utils.UtilsRuntimeException;
  26. import org.redisson.api.RAtomicLong;
  27. import org.redisson.api.RedissonClient;
  28. import org.slf4j.Logger;

  29. /**
  30.  * DatoRAtomicLong
  31.  *
  32.  * @author Poli Andrea (apoli@link.it)
  33.  * @author $Author$
  34.  * @version $Rev$, $Date$
  35.  */

  36. public class DatoRAtomicLong {

  37.     private RedissonClient redisson;
  38.     private String name;
  39.    
  40.     private RAtomicLong counter;
  41.    
  42.     private int failover = -1;
  43.     public void setFailover(int failover) {
  44.         this.failover = failover;
  45.     }
  46.     private int failoverCheckEveryMs = -1;
  47.    
  48.     private Logger logControlloTraffico;
  49.    
  50.     public DatoRAtomicLong(RedissonClient redisson, String name) {
  51.         this.redisson = redisson;
  52.         this.name = name;
  53.         this.initCounter();
  54.         OpenSPCoop2Properties op2Props = OpenSPCoop2Properties.getInstance();
  55.         this.failover = -1; // da gestire in futuro se serve
  56.         this.failoverCheckEveryMs = -1;
  57.         this.logControlloTraffico = OpenSPCoop2Logger.getLoggerOpenSPCoopControlloTraffico(op2Props.isControlloTrafficoDebug());
  58.     }
  59.     private void initCounter() {
  60.         this.counter = this.redisson.getAtomicLong(this.name);
  61.     }
  62.    
  63.     public void set(long value) {
  64.         process(RAtomicLongOperation.SET, value, -1);
  65.     }
  66.     public long get() {
  67.         RAtomicLongResponse r = process(RAtomicLongOperation.GET, -1, -1);
  68.         return r!=null ? r.valueL : -1; // else non dovrebbe succedere mai
  69.     }
  70.     public long addAndGet(long value) {
  71.         RAtomicLongResponse r = process(RAtomicLongOperation.ADD_AND_GET, value, -1);
  72.         return r!=null ? r.valueL : -1; // else non dovrebbe succedere mai
  73.     }
  74.     public long incrementAndGet() {
  75.         RAtomicLongResponse r = process(RAtomicLongOperation.INCREMENT_AND_GET, -1, -1);
  76.         return r!=null ? r.valueL : -1; // else non dovrebbe succedere mai
  77.     }
  78.     public long decrementAndGet() {
  79.         RAtomicLongResponse r = process(RAtomicLongOperation.DECREMENT_AND_GET, -1, -1);
  80.         return r!=null ? r.valueL : -1; // else non dovrebbe succedere mai
  81.     }
  82.     public CompletionStage<Long> addAndGetAsync(long value) {
  83.         RAtomicLongResponse r = process(RAtomicLongOperation.ADD_AND_GET_ASYNC, value, -1);
  84.         return r!=null ? r.valueAsync : null; // else non dovrebbe succedere mai
  85.     }
  86.     public CompletionStage<Long> incrementAndGetAsync() {
  87.         RAtomicLongResponse r = process(RAtomicLongOperation.INCREMENT_AND_GET_ASYNC, -1, -1);
  88.         return r!=null ? r.valueAsync : null; // else non dovrebbe succedere mai
  89.     }
  90.     public CompletionStage<Long> decrementAndGetAsync() {
  91.         RAtomicLongResponse r = process(RAtomicLongOperation.DECREMENT_AND_GET_ASYNC, -1, -1);
  92.         return r!=null ? r.valueAsync : null; // else non dovrebbe succedere mai
  93.     }
  94.     public boolean compareAndSet(long compare, long set) {
  95.         RAtomicLongResponse r = process(RAtomicLongOperation.COMPARE_AND_SET, compare, set);
  96.         return r!=null && r.valueB; // else non dovrebbe succedere mai
  97.     }
  98.     public void delete() {
  99.         process(RAtomicLongOperation.DELETE, -1, -1);
  100.     }
  101.    
  102.     private RAtomicLongResponse process(RAtomicLongOperation op, long arg1, long arg2) {
  103.         String prefix = "[Redis-RAtomicLong-"+this.name+" operation:"+op+"] ";
  104.         if(this.failover>0) {
  105.             return processFailOver(prefix, op, arg1, arg2);
  106.         }
  107.         else {
  108.             return operation(prefix, op, arg1, arg2);
  109.         }
  110.     }
  111.     private RAtomicLongResponse processFailOver(String prefix, RAtomicLongOperation op, long arg1, long arg2) {
  112.         boolean success = false;
  113.         Exception eFinal = null; // capire l'eccezione
  114.         RAtomicLongResponse v = null;
  115.         for (int i = 0; i < this.failover; i++) {
  116.             try {
  117.                 if(i>0 && this.failoverCheckEveryMs>0) {
  118.                     Utilities.sleep(this.failoverCheckEveryMs);
  119.                     initCounter();
  120.                 }
  121.                 v = operation(prefix, op, arg1, arg2);
  122.                 success=true;
  123.                 break;
  124.             } catch (Exception e) {
  125.                 eFinal = e;
  126.                 if(i==0) {
  127.                     this.logControlloTraffico.error(prefix+"rilevato contatore distrutto (verrà riprovata la creazione): "+e.getMessage(),e);
  128.                 }
  129.                 else {
  130.                     this.logControlloTraffico.error(prefix+"il tenativo i="+i+" di ricreare il contatore è fallito: "+e.getMessage(),e);
  131.                 }
  132.             }
  133.         }
  134.         if(!success) {
  135.             throwDistributedObjectDestroyedException(prefix, eFinal);
  136.         }
  137.         return v;
  138.     }
  139.     private void throwDistributedObjectDestroyedException(String prefix, Exception eFinal) {
  140.         String msg = prefix+"tutti i tentativi di ricreare il contatore sono falliti";
  141.         this.logControlloTraffico.error(msg);
  142.         if(eFinal!=null) {
  143.             throw new UtilsRuntimeException(eFinal);
  144.         }
  145.         else {
  146.             throw new UtilsRuntimeException("tutti i tentativi di ricreare il contatore sono falliti"); // l'eccezione eFinal esiste
  147.         }
  148.     }
  149.    
  150.     private RAtomicLongResponse operation(String prefix, RAtomicLongOperation op, long arg1, long arg2){
  151.         switch (op) {
  152.         case SET:
  153.             this.counter.set(arg1);
  154.             return null;
  155.         case GET:
  156.             return new RAtomicLongResponse(this.counter.get());
  157.         case ADD_AND_GET:
  158.             return new RAtomicLongResponse(this.counter.addAndGet(arg1));
  159.         case INCREMENT_AND_GET:
  160.             return new RAtomicLongResponse(this.counter.incrementAndGet());
  161.         case DECREMENT_AND_GET:
  162.             return new RAtomicLongResponse(this.counter.decrementAndGet());
  163.         case ADD_AND_GET_ASYNC:
  164.             return new RAtomicLongResponse(this.counter.addAndGetAsync(arg1));
  165.         case INCREMENT_AND_GET_ASYNC:
  166.             return new RAtomicLongResponse(this.counter.incrementAndGetAsync());
  167.         case DECREMENT_AND_GET_ASYNC:
  168.             return new RAtomicLongResponse(this.counter.decrementAndGetAsync());
  169.         case COMPARE_AND_SET:
  170.             return new RAtomicLongResponse(this.counter.compareAndSet(arg1, arg2));
  171.         case DELETE:
  172.             try {
  173.                 this.counter.delete();
  174.             }catch(Throwable e) {
  175.                 this.logControlloTraffico.error(prefix+"delete non riuscito: "+e.getMessage(),e);
  176.                 throw e;
  177.             }
  178.             return null;        
  179.         }
  180.         return null;
  181.     }
  182. }

  183. enum RAtomicLongOperation {
  184.     SET,
  185.     GET,
  186.     ADD_AND_GET, INCREMENT_AND_GET, DECREMENT_AND_GET,
  187.     ADD_AND_GET_ASYNC, INCREMENT_AND_GET_ASYNC, DECREMENT_AND_GET_ASYNC,
  188.     COMPARE_AND_SET,
  189.     DELETE
  190. }

  191. class RAtomicLongResponse{
  192.     RAtomicLongResponse(long l){
  193.         this.valueL = l;
  194.     }
  195.     RAtomicLongResponse(boolean b){
  196.         this.valueB = b;
  197.     }
  198.     RAtomicLongResponse(CompletionStage<Long> v){
  199.         this.valueAsync = v;
  200.     }
  201.     long valueL;    
  202.     boolean valueB;
  203.     CompletionStage<Long> valueAsync;
  204. }