DatoPNCounter.java

/*
 * GovWay - A customizable API Gateway 
 * https://govway.org
 * 
 * Copyright (c) 2005-2026 Link.it srl (https://link.it). 
 * 
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU General Public License version 3, as published by
 * the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
 *
 */

package org.openspcoop2.pdd.core.controllo_traffico.policy.driver.hazelcast.counters;

import java.util.concurrent.CompletionStage;

import org.openspcoop2.pdd.config.OpenSPCoop2Properties;
import org.openspcoop2.pdd.logger.OpenSPCoop2Logger;
import org.openspcoop2.utils.Utilities;
import org.slf4j.Logger;

import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.crdt.pncounter.PNCounter;
import com.hazelcast.spi.exception.DistributedObjectDestroyedException;

/**
 * DatoPNCounter
 *
 * @author Poli Andrea (apoli@link.it)
 * @author $Author$
 * @version $Rev$, $Date$
 */

public class DatoPNCounter {

	private HazelcastInstance hazelcast;
	private String name;

	private PNCounter counter;

	private int failover = -1;
	private int failoverCheckEveryMs = -1;

	private Logger logControlloTraffico;

	private boolean cleanupTimerEnabled = false;

	// Ultimo valore letto con successo, usato come fallback in caso di ConsistencyLostException persistente
	private volatile long lastKnownValue = 0;

	public DatoPNCounter(HazelcastInstance hazelcast, String name) {
		this.hazelcast = hazelcast;
		this.name = name;
		this.initCounter();
		OpenSPCoop2Properties op2Props = OpenSPCoop2Properties.getInstance();
		this.failover = op2Props.getHazelcastCPSubsystemDistributedObjectDestroyedExceptionFailover();
		this.failoverCheckEveryMs = op2Props.getHazelcastCPSubsystemDistributedObjectDestroyedExceptionFailoverCheckEveryMs();
		this.logControlloTraffico = OpenSPCoop2Logger.getLoggerOpenSPCoopControlloTraffico(op2Props.isControlloTrafficoDebug());
		this.cleanupTimerEnabled = op2Props.isControlloTrafficoGestorePolicyInMemoryHazelcastOrphanedProxiesCleanupEnabled();
	}
	private void initCounter() {
		this.counter = this.hazelcast.getPNCounter(this.name);
	}
	
	public String getName() {
		return this.name;
	}
	
	public long get() {
		PNCounterResponse r = process(PNCounterOperation.GET, -1, -1);
		return r!=null ? r.valueL : -1; // else non dovrebbe succedere mai
	}
	public long addAndGet(long value) {
		PNCounterResponse r = process(PNCounterOperation.ADD_AND_GET, value, -1);
		return r!=null ? r.valueL : -1; // else non dovrebbe succedere mai
	}
	public long incrementAndGet() {
		PNCounterResponse r = process(PNCounterOperation.INCREMENT_AND_GET, -1, -1);
		return r!=null ? r.valueL : -1; // else non dovrebbe succedere mai
	}
	public long decrementAndGet() {
		PNCounterResponse r = process(PNCounterOperation.DECREMENT_AND_GET, -1, -1);
		return r!=null ? r.valueL : -1; // else non dovrebbe succedere mai
	}
	public long subtractAndGet(long value) {
		PNCounterResponse r = process(PNCounterOperation.SUBTRACT_AND_GET, value, -1);
		return r!=null ? r.valueL : -1; // else non dovrebbe succedere mai
	}
	public void destroy() {
		process(PNCounterOperation.DESTROY, -1, -1);
	}
	/**
	 * Rilascia il contatore in modo sicuro, evitando la 'DistributedObjectDestroyedException'.
	 *
	 * Nel CP Subsystem di Hazelcast, una volta chiamato destroy(), il nome dell'oggetto resta "avvelenato"
	 * e non può essere ricreato: qualsiasi successiva getCPSubsystem().getAtomicLong(stessoNome)
	 * restituisce un oggetto in stato "destroyed", causando:
	 * 'com.hazelcast.spi.exception.DistributedObjectDestroyedException: AtomicValue[...] is already destroyed!'
	 *
	 * Questo si verifica quando un altro nodo del cluster sta ancora utilizzando il contatore
	 * nel momento in cui viene distrutto, anche se il destroy avviene su contatori di 2 intervalli indietro (pattern "cestino").
	 *
	 * Se il timer di cleanup dei contatori orfani è abilitato (default), il destroy non viene eseguito
	 * e la pulizia viene delegata al timer (HazelcastManager.cleanupOrphanedAtomicLongCounters).
	 * Se il timer è disabilitato, il destroy viene eseguito direttamente;
	 * eventuali errori vengono solo loggati senza propagare l'eccezione,
	 * come avviene nel timer di cleanup (HazelcastManager.cleanupOrphanedProxies).
	 */
	public void destroySafe() {
		if(this.cleanupTimerEnabled) {
			return;
		}
		try {
			destroy();
		} catch(Throwable t) {
			this.logControlloTraffico.error("[Hazelcast-PNCounter-"+this.name+"] destroySafe non riuscito: "+t.getMessage(),t);
		}
	}

	private PNCounterResponse process(PNCounterOperation op, long arg1, long arg2) {
		String prefix = "[Hazelcast-PNCounter-"+this.name+" operation:"+op+"] ";
		if(this.failover>0) {
			return processFailOver(prefix, op, arg1, arg2);
		}
		else {
			return operation(prefix, op, arg1, arg2);
		}
	}
	private PNCounterResponse processFailOver(String prefix, PNCounterOperation op, long arg1, long arg2) {
		boolean success = false;
		DistributedObjectDestroyedException eFinal = null;
		PNCounterResponse v = null;
		for (int i = 0; i < this.failover; i++) {
			try {
				if(i>0 && this.failoverCheckEveryMs>0) {
					Utilities.sleep(this.failoverCheckEveryMs);
					initCounter();
				}
				v = operation(prefix, op, arg1, arg2);
				success=true;
				break;
			} catch (DistributedObjectDestroyedException e) {
				eFinal = e;
				if(i==0) {
					this.logControlloTraffico.error("{}rilevato contatore distrutto (verrà riprovata la creazione): {}", prefix, e.getMessage(),e);
				}
				else {
					this.logControlloTraffico.error("{}il tenativo i={} di ricreare il contatore è fallito: {}", prefix, i, e.getMessage(),e);
				}
			}
		}
		if(!success) {
			throwDistributedObjectDestroyedException(prefix, eFinal);
		}
		return v;
	}
	private void throwDistributedObjectDestroyedException(String prefix, DistributedObjectDestroyedException eFinal) {
		String msg = prefix+"tutti i tentativi di ricreare il contatore sono falliti";
		this.logControlloTraffico.error(msg);
		if(eFinal!=null) {
			throw eFinal;
		}
		else {
			throw new DistributedObjectDestroyedException("tutti i tentativi di ricreare il contatore sono falliti"); // l'eccezione eFinal esiste
		}
	}
	
	private PNCounterResponse operation(String prefix, PNCounterOperation op, long arg1, long arg2){
		if(PNCounterOperation.DESTROY.equals(op)) {
			return operationEngine(this.counter, prefix, op, arg1, arg2);
		}
		try {
			PNCounterResponse r = operationEngine(this.counter, prefix, op, arg1, arg2);
			if(r != null) {
				this.lastKnownValue = r.valueL;
			}
			return r;
		} catch (Exception e) {
			// PNCounter è approssimativo (CRDT): gli errori transitori vengono gestiti con retry + fallback
			// anziché propagati al client. Eccezioni note:
			// - ConsistencyLostException: replica locale temporaneamente stale
			// - IllegalStateException ("Attempt to reuse same operation"): bug interno PNCounterProxy sotto alta concorrenza
			// - NoDataMemberInClusterException: nessun data member raggiungibile momentaneamente
			// Crea un nuovo proxy PNCounter locale per il retry (non this.counter per evitare race condition).
			/**System.out.println(prefix+"ConsistencyLostException, retry con nuova replica: "+e.getMessage());*/
			this.logControlloTraffico.debug("{}{}, retry con nuova replica: {}", prefix, e.getClass().getSimpleName(), e.getMessage());
			PNCounter retryCounter = this.hazelcast.getPNCounter(this.name);
			try {
				PNCounterResponse r = operationEngine(retryCounter, prefix, op, arg1, arg2);
				if(r != null) {
					this.lastKnownValue = r.valueL;
				}
				return r;
			} catch (Exception e2) {
				// Errore persistente: ritorna l'ultimo valore noto
				/**System.out.println(prefix+"ConsistencyLostException persistente, fallback a lastKnownValue="+this.lastKnownValue);*/
				this.logControlloTraffico.debug("{}eccezione persistente ({}), fallback a lastKnownValue={}", prefix, e2.getMessage(), this.lastKnownValue);
				return new PNCounterResponse(this.lastKnownValue);
			}
		}
	}
	private PNCounterResponse operationEngine(PNCounter pnCounter, String prefix, PNCounterOperation op, long arg1, long arg2){
		if(arg2>0) {
			// nop
		}
		switch (op) {
		case GET:
			return new PNCounterResponse(pnCounter.get());
		case ADD_AND_GET:
			return new PNCounterResponse(pnCounter.addAndGet(arg1));
		case INCREMENT_AND_GET:
			return new PNCounterResponse(pnCounter.incrementAndGet());
		case DECREMENT_AND_GET:
			return new PNCounterResponse(pnCounter.decrementAndGet());
		case SUBTRACT_AND_GET:
			return new PNCounterResponse(pnCounter.subtractAndGet(arg1));
		case DESTROY:
			try {
				pnCounter.destroy();
			}catch(Throwable e) {
				this.logControlloTraffico.error("{}destroy non riuscito: {}", prefix, e.getMessage(),e);
				throw e;
			}
			return null;
		}
		return null;
	}
}

enum PNCounterOperation {
	GET,
	ADD_AND_GET, INCREMENT_AND_GET, DECREMENT_AND_GET, SUBTRACT_AND_GET,
	DESTROY
}

class PNCounterResponse{
	PNCounterResponse(long l){
		this.valueL = l;
	}
	PNCounterResponse(boolean b){
		this.valueB = b;
	}
	PNCounterResponse(CompletionStage<Long> v){
		this.valueAsync = v;
	}
	long valueL;	
	boolean valueB;
	CompletionStage<Long> valueAsync;
}