DatoRAtomicLong.java
/*
* GovWay - A customizable API Gateway
* https://govway.org
*
* Copyright (c) 2005-2025 Link.it srl (https://link.it).
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License version 3, as published by
* the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
*/
package org.openspcoop2.pdd.core.controllo_traffico.policy.driver.redisson.counters;
import java.time.Duration;
import java.util.concurrent.CompletionStage;
import org.openspcoop2.pdd.config.OpenSPCoop2Properties;
import org.openspcoop2.pdd.logger.OpenSPCoop2Logger;
import org.openspcoop2.utils.Utilities;
import org.openspcoop2.utils.UtilsRuntimeException;
import org.redisson.api.RAtomicLong;
import org.redisson.api.RedissonClient;
import org.slf4j.Logger;
/**
* DatoRAtomicLong - Wrapper per RAtomicLong di Redisson con supporto TTL
*
* Questa classe gestisce contatori atomici distribuiti in Redis con supporto
* opzionale per TTL (Time To Live), permettendo la pulizia automatica dei
* contatori non più utilizzati.
*
* @author Poli Andrea (apoli@link.it)
* @author $Author$
* @version $Rev$, $Date$
*/
public class DatoRAtomicLong {
private RedissonClient redisson;
private String name;
private RAtomicLong counter;
// TTL configuration
private RedisTTLConfig ttlConfig;
private boolean ttlApplied = false;
private int failover = -1;
public void setFailover(int failover) {
this.failover = failover;
}
private int failoverCheckEveryMs = -1;
private Logger logControlloTraffico;
private void logDebug(String msg) {
if (this.logControlloTraffico != null) {
this.logControlloTraffico.debug(msg);
}
}
private void logWarn(String msg) {
if (this.logControlloTraffico != null) {
this.logControlloTraffico.warn(msg);
}
}
/**
* Costruttore originale senza TTL (mantiene compatibilità)
*/
public DatoRAtomicLong(RedissonClient redisson, String name) {
this(redisson, name, null);
}
/**
* Costruttore con configurazione TTL
*/
public DatoRAtomicLong(RedissonClient redisson, String name, RedisTTLConfig ttlConfig) {
this.redisson = redisson;
this.name = name;
this.ttlConfig = ttlConfig;
this.initCounter();
OpenSPCoop2Properties op2Props = OpenSPCoop2Properties.getInstance();
this.failover = -1; // da gestire in futuro se serve
this.failoverCheckEveryMs = -1;
this.logControlloTraffico = OpenSPCoop2Logger.getLoggerOpenSPCoopControlloTraffico(op2Props.isControlloTrafficoDebug());
}
private void initCounter() {
this.counter = this.redisson.getAtomicLong(this.name);
// NOTA: il TTL viene applicato alla prima operazione di scrittura (set, increment, ecc.)
// perché Redis non permette di impostare EXPIRE su chiavi che non esistono ancora.
// getAtomicLong() non crea la chiave in Redis finché non viene fatta una scrittura.
}
/**
* Applica il TTL al contatore se configurato e non ancora applicato (versione sincrona)
*/
private void applyTTLIfNeeded() {
if (this.ttlConfig != null && this.ttlConfig.isEnabled() && !this.ttlApplied) {
try {
long ttlSeconds = this.ttlConfig.getTtlSeconds();
if (ttlSeconds > 0) {
// expire() ritorna true solo se la chiave esiste e il TTL è stato applicato
boolean applied = this.counter.expire(Duration.ofSeconds(ttlSeconds));
if (applied) {
this.ttlApplied = true;
/** System.out.println("[DEBUG-TTL] TTL APPLIED OK for: " + this.name + " | ttl=" + ttlSeconds + "s"); */
} else {
/** DEBUG: capire perché expire() ritorna false
boolean exists = this.counter.isExists();
long currentTTL = this.counter.remainTimeToLive();
System.out.println("[DEBUG-TTL] expire() returned FALSE for: " + this.name +
" | exists=" + exists + " | currentTTL=" + currentTTL + " | ttlToSet=" + ttlSeconds); */
}
}
} catch (Exception e) {
/** System.out.println("[DEBUG-TTL] EXCEPTION in applyTTLIfNeeded for: " + this.name +
" | " + e.getClass().getName() + ": " + e.getMessage()); */
this.logWarn("[Redis-TTL] Impossibile applicare TTL al contatore " +
this.name + ": " + e.getMessage());
}
}
}
/**
* Applica il TTL al contatore in modo asincrono (da usare nei callback async)
*/
private void applyTTLIfNeededAsync() {
if (this.ttlConfig != null && this.ttlConfig.isEnabled() && !this.ttlApplied) {
try {
long ttlSeconds = this.ttlConfig.getTtlSeconds();
if (ttlSeconds > 0) {
applyTTLIfNeededAsync(ttlSeconds);
}
} catch (Exception e) {
if (this.logControlloTraffico != null) {
this.logWarn("[Redis-TTL] Impossibile applicare TTL async al contatore " +
this.name + ": " + e.getMessage());
}
}
}
}
private void applyTTLIfNeededAsync(long ttlSeconds) {
// Usa expireAsync() per evitare "Sync methods can't be invoked from async listeners"
this.counter.expireAsync(Duration.ofSeconds(ttlSeconds)).thenAccept(applied -> {
if (applied != null && applied.booleanValue()) {
this.ttlApplied = true;
if (this.logControlloTraffico != null && this.logControlloTraffico.isDebugEnabled()) {
this.logDebug("[Redis-TTL] Applicato TTL async di " + ttlSeconds +
" secondi al contatore: " + this.name);
}
}
});
}
/**
* Rinnova il TTL se configurato per farlo ad ogni scrittura (versione sincrona)
*/
private void renewTTLIfNeeded() {
if (this.ttlConfig != null && this.ttlConfig.isEnabled() && this.ttlConfig.isRenewTTLOnWrite()) {
try {
long ttlSeconds = this.ttlConfig.getTtlSeconds();
if (ttlSeconds > 0) {
this.counter.expire(Duration.ofSeconds(ttlSeconds));
}
} catch (Exception e) {
if (this.logControlloTraffico != null && this.logControlloTraffico.isDebugEnabled()) {
this.logDebug("[Redis-TTL] Impossibile rinnovare TTL per " +
this.name + ": " + e.getMessage());
}
}
}
}
/**
* Rinnova il TTL in modo asincrono (da usare nei callback async)
*/
private void renewTTLIfNeededAsync() {
if (this.ttlConfig != null && this.ttlConfig.isEnabled() && this.ttlConfig.isRenewTTLOnWrite()) {
try {
long ttlSeconds = this.ttlConfig.getTtlSeconds();
if (ttlSeconds > 0) {
// Usa expireAsync() per evitare "Sync methods can't be invoked from async listeners"
this.counter.expireAsync(Duration.ofSeconds(ttlSeconds));
}
} catch (Exception e) {
if (this.logControlloTraffico != null && this.logControlloTraffico.isDebugEnabled()) {
this.logDebug("[Redis-TTL] Impossibile rinnovare TTL async per " +
this.name + ": " + e.getMessage());
}
}
}
}
/**
* Assicura che il TTL sia applicato al contatore, anche se è stato creato da un altro nodo.
* Da chiamare esplicitamente dopo operazioni di sola lettura (get) su contatori che devono avere un TTL.
* Utile per contatori come policyDate che potrebbero essere letti senza mai essere scritti da questo nodo.
*
* NOTA: Se la chiave non esiste ancora in Redis (get() ritorna 0 di default senza creare la chiave),
* il TTL non può essere applicato. In questo caso il flag ttlApplied NON viene impostato,
* così alla prossima scrittura (set, compareAndSet, ecc.) il TTL verrà applicato.
*/
public void ensureTTLApplied() {
if (this.ttlConfig != null && this.ttlConfig.isEnabled() && !this.ttlApplied) {
// Verifica se la chiave esiste prima di tentare di applicare il TTL
// perché get() di Redisson ritorna 0 senza creare la chiave
if (this.counter.isExists()) {
applyTTLIfNeeded();
}
else {
// Se la chiave non esiste, non facciamo nulla: il TTL sarà applicato
// alla prima operazione di scrittura (set, compareAndSet, increment, ecc.)
}
}
}
public void set(long value) {
process(RAtomicLongOperation.SET, value, -1);
applyTTLIfNeeded();
renewTTLIfNeeded();
}
public long get() {
RAtomicLongResponse r = process(RAtomicLongOperation.GET, -1, -1);
// Non rinnoviamo TTL su get() per evitare che letture passive mantengano vivo il contatore
return r!=null ? r.valueL : -1; // else non dovrebbe succedere mai
}
public long addAndGet(long value) {
RAtomicLongResponse r = process(RAtomicLongOperation.ADD_AND_GET, value, -1);
applyTTLIfNeeded();
renewTTLIfNeeded();
return r!=null ? r.valueL : -1; // else non dovrebbe succedere mai
}
public long incrementAndGet() {
RAtomicLongResponse r = process(RAtomicLongOperation.INCREMENT_AND_GET, -1, -1);
applyTTLIfNeeded();
renewTTLIfNeeded();
return r!=null ? r.valueL : -1; // else non dovrebbe succedere mai
}
public long decrementAndGet() {
RAtomicLongResponse r = process(RAtomicLongOperation.DECREMENT_AND_GET, -1, -1);
// Non rinnoviamo TTL su decrement per permettere la naturale scadenza
return r!=null ? r.valueL : -1; // else non dovrebbe succedere mai
}
public CompletionStage<Long> addAndGetAsync(long value) {
RAtomicLongResponse r = process(RAtomicLongOperation.ADD_AND_GET_ASYNC, value, -1);
if (r == null || r.valueAsync == null) {
return null;
}
// Applica TTL dopo che l'operazione asincrona è completata,
// quando la chiave esiste sicuramente in Redis.
// Usa versioni async per evitare "Sync methods can't be invoked from async listeners"
return r.valueAsync.thenApply(result -> {
applyTTLIfNeededAsync();
renewTTLIfNeededAsync();
return result;
});
}
public CompletionStage<Long> incrementAndGetAsync() {
RAtomicLongResponse r = process(RAtomicLongOperation.INCREMENT_AND_GET_ASYNC, -1, -1);
if (r == null || r.valueAsync == null) {
return null;
}
// Applica TTL dopo che l'operazione asincrona è completata,
// quando la chiave esiste sicuramente in Redis.
// Usa versioni async per evitare "Sync methods can't be invoked from async listeners"
return r.valueAsync.thenApply(result -> {
applyTTLIfNeededAsync();
renewTTLIfNeededAsync();
return result;
});
}
public CompletionStage<Long> decrementAndGetAsync() {
RAtomicLongResponse r = process(RAtomicLongOperation.DECREMENT_AND_GET_ASYNC, -1, -1);
// Non applichiamo/rinnoviamo TTL su decrement per permettere la naturale scadenza
return r!=null ? r.valueAsync : null;
}
public boolean compareAndSet(long compare, long set) {
RAtomicLongResponse r = process(RAtomicLongOperation.COMPARE_AND_SET, compare, set);
boolean casResult = r!=null && r.valueB;
/** System.out.println("[DEBUG-TTL] compareAndSet() for: " + this.name +
" | compare=" + compare + " | set=" + set + " | result=" + casResult); */
if (casResult) {
// CAS ha avuto successo: la chiave è stata modificata.
// Dobbiamo applicare/rinnovare il TTL SEMPRE (ignora il flag ttlApplied),
// perché questo contatore (es. policyDate) può essere riutilizzato per più intervalli
// e il TTL deve essere esteso ad ogni scrittura per evitare scadenza prematura.
forceApplyTTL();
} else {
// CAS fallito: la chiave esiste già con valore diverso.
// Applichiamo il TTL solo se non l'abbiamo già fatto (primo accesso da questo nodo).
applyTTLIfNeeded();
}
return casResult;
}
/**
* Applica o rinnova forzatamente il TTL, ignorando il flag ttlApplied.
* Usato dopo operazioni che modificano il valore (compareAndSet con successo)
* per estendere la vita del contatore.
*/
private void forceApplyTTL() {
if (this.ttlConfig != null && this.ttlConfig.isEnabled()) {
try {
long ttlSeconds = this.ttlConfig.getTtlSeconds();
if (ttlSeconds > 0) {
boolean applied = this.counter.expire(Duration.ofSeconds(ttlSeconds));
if (applied) {
this.ttlApplied = true;
/** System.out.println("[DEBUG-TTL] TTL FORCED OK for: " + this.name + " | ttl=" + ttlSeconds + "s"); */
} else {
/** System.out.println("[DEBUG-TTL] forceApplyTTL() expire returned FALSE for: " + this.name); */
}
}
} catch (Exception e) {
/** System.out.println("[DEBUG-TTL] EXCEPTION in forceApplyTTL for: " + this.name +
" | " + e.getClass().getName() + ": " + e.getMessage()); */
}
}
}
public void delete() {
process(RAtomicLongOperation.DELETE, -1, -1);
}
/**
* Restituisce il tempo rimanente prima della scadenza del TTL (in millisecondi)
* @return tempo rimanente in ms, -1 se nessun TTL, -2 se il contatore non esiste
*/
public long remainTimeToLive() {
try {
return this.counter.remainTimeToLive();
} catch (Exception e) {
if (this.logControlloTraffico != null && this.logControlloTraffico.isDebugEnabled()) {
this.logDebug("[Redis-TTL] Impossibile ottenere TTL rimanente per " +
this.name + ": " + e.getMessage());
}
return -2;
}
}
/**
* Verifica se il contatore esiste in Redis
*/
public boolean exists() {
try {
return this.counter.isExists();
} catch (Exception e) {
return false;
}
}
/**
* Imposta esplicitamente un nuovo TTL
*/
public void setTTL(long ttlSeconds) {
if (ttlSeconds > 0) {
try {
this.counter.expire(Duration.ofSeconds(ttlSeconds));
} catch (Exception e) {
if (this.logControlloTraffico != null) {
this.logWarn("[Redis-TTL] Impossibile impostare TTL per " +
this.name + ": " + e.getMessage());
}
}
}
}
/**
* Rimuove il TTL dal contatore (diventa persistente)
*/
public void clearTTL() {
try {
this.counter.clearExpire();
} catch (Exception e) {
if (this.logControlloTraffico != null) {
this.logWarn("[Redis-TTL] Impossibile rimuovere TTL per " +
this.name + ": " + e.getMessage());
}
}
}
/**
* Restituisce la configurazione TTL corrente
*/
public RedisTTLConfig getTTLConfig() {
return this.ttlConfig;
}
private RAtomicLongResponse process(RAtomicLongOperation op, long arg1, long arg2) {
String prefix = "[Redis-RAtomicLong-"+this.name+" operation:"+op+"] ";
if(this.failover>0) {
return processFailOver(prefix, op, arg1, arg2);
}
else {
return operation(prefix, op, arg1, arg2);
}
}
private RAtomicLongResponse processFailOver(String prefix, RAtomicLongOperation op, long arg1, long arg2) {
boolean success = false;
Exception eFinal = null; // capire l'eccezione
RAtomicLongResponse v = null;
for (int i = 0; i < this.failover; i++) {
try {
if(i>0 && this.failoverCheckEveryMs>0) {
Utilities.sleep(this.failoverCheckEveryMs);
initCounter();
}
v = operation(prefix, op, arg1, arg2);
success=true;
break;
} catch (Exception e) {
eFinal = e;
if(i==0) {
this.logControlloTraffico.error(prefix+"rilevato contatore distrutto (verrà riprovata la creazione): "+e.getMessage(),e);
}
else {
this.logControlloTraffico.error(prefix+"il tenativo i="+i+" di ricreare il contatore è fallito: "+e.getMessage(),e);
}
}
}
if(!success) {
throwDistributedObjectDestroyedException(prefix, eFinal);
}
return v;
}
private void throwDistributedObjectDestroyedException(String prefix, Exception eFinal) {
String msg = prefix+"tutti i tentativi di ricreare il contatore sono falliti";
this.logControlloTraffico.error(msg);
if(eFinal!=null) {
throw new UtilsRuntimeException(eFinal);
}
else {
throw new UtilsRuntimeException("tutti i tentativi di ricreare il contatore sono falliti"); // l'eccezione eFinal esiste
}
}
private RAtomicLongResponse operation(String prefix, RAtomicLongOperation op, long arg1, long arg2){
switch (op) {
case SET:
this.counter.set(arg1);
return null;
case GET:
return new RAtomicLongResponse(this.counter.get());
case ADD_AND_GET:
return new RAtomicLongResponse(this.counter.addAndGet(arg1));
case INCREMENT_AND_GET:
return new RAtomicLongResponse(this.counter.incrementAndGet());
case DECREMENT_AND_GET:
return new RAtomicLongResponse(this.counter.decrementAndGet());
case ADD_AND_GET_ASYNC:
return new RAtomicLongResponse(this.counter.addAndGetAsync(arg1));
case INCREMENT_AND_GET_ASYNC:
return new RAtomicLongResponse(this.counter.incrementAndGetAsync());
case DECREMENT_AND_GET_ASYNC:
return new RAtomicLongResponse(this.counter.decrementAndGetAsync());
case COMPARE_AND_SET:
return new RAtomicLongResponse(this.counter.compareAndSet(arg1, arg2));
case DELETE:
try {
this.counter.delete();
}catch(Throwable e) {
this.logControlloTraffico.error(prefix+"delete non riuscito: "+e.getMessage(),e);
throw e;
}
return null;
}
return null;
}
}
enum RAtomicLongOperation {
SET,
GET,
ADD_AND_GET, INCREMENT_AND_GET, DECREMENT_AND_GET,
ADD_AND_GET_ASYNC, INCREMENT_AND_GET_ASYNC, DECREMENT_AND_GET_ASYNC,
COMPARE_AND_SET,
DELETE
}
class RAtomicLongResponse{
RAtomicLongResponse(long l){
this.valueL = l;
}
RAtomicLongResponse(boolean b){
this.valueB = b;
}
RAtomicLongResponse(CompletionStage<Long> v){
this.valueAsync = v;
}
long valueL;
boolean valueB;
CompletionStage<Long> valueAsync;
}