LoadBalancerPool.java
/*
* GovWay - A customizable API Gateway
* https://govway.org
*
* Copyright (c) 2005-2025 Link.it srl (https://link.it).
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License version 3, as published by
* the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
*/
package org.openspcoop2.pdd.core.behaviour.built_in.load_balance;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.openspcoop2.pdd.config.OpenSPCoop2Properties;
import org.openspcoop2.pdd.core.behaviour.BehaviourException;
import org.openspcoop2.pdd.core.behaviour.built_in.load_balance.health_check.HealthCheckConfigurazione;
import org.openspcoop2.pdd.logger.OpenSPCoop2Logger;
import org.openspcoop2.utils.date.DateManager;
import org.openspcoop2.utils.date.DateUtils;
/**
* LoadBalancerPool
*
* @author Andrea Poli (apoli@link.it)
* @author $Author$
* @version $Rev$, $Date$
*/
public class LoadBalancerPool implements Serializable{
/**
*
*/
private static final long serialVersionUID = 1L;
public static int DEFAULT_WEIGHT = 1;
private HealthCheckConfigurazione healthCheck = null;
private boolean debug = false;
public LoadBalancerPool(HealthCheckConfigurazione healthCheck) {
this.healthCheck = healthCheck;
this.debug = OpenSPCoop2Properties.getInstance().isLoadBalancerDebug();
}
@Override
public String toString() {
//synchronized (this.semaphore) {
this.getLock().acquireThrowRuntime("toString");
try {
StringBuilder bf = new StringBuilder();
bf.append("Connectors: ").append(this.connectorMap.size());
bf.append("\nTotal Weight: ").append(this.totalWeight);
bf.append("\nPosition: ").append(this.position);
if(this.healthCheck!=null) {
bf.append("\nPassiveHealtCheck: ").append(this.healthCheck.isPassiveCheckEnabled());
if(this.healthCheck.isPassiveCheckEnabled()){
bf.append("\n Exclude for seconds: ").append(this.healthCheck.getPassiveHealthCheck_excludeForSeconds());
}
}
for (String name : this.connectorMap.keySet()) {
bf.append("\n");
bf.append("- ").append(name).append(" : ").append(" ( weight:").append(this.connectorMap.get(name));
if(this.connectorMap_activeConnections.containsKey(name)) {
bf.append(" activeConnections:").append(this.connectorMap_activeConnections.get(name));
}
if(this.connectorMap_errorDate.containsKey(name)) {
bf.append(" connectionError:").append(DateUtils.getSimpleDateFormatMs().format(this.connectorMap_errorDate.get(name)));
}
bf.append(" )");
}
return bf.toString();
}finally {
this.getLock().release("toString");
}
}
//protected Boolean semaphore = true;
private transient org.openspcoop2.utils.Semaphore _lock = null;
private synchronized void initLock() {
if(this._lock==null) {
this._lock = new org.openspcoop2.utils.Semaphore("LoadBalancerPool");
}
}
public org.openspcoop2.utils.Semaphore getLock(){
if(this._lock==null) {
initLock();
}
return this._lock;
}
protected Map<String, Integer> connectorMap = new HashMap<>();
protected Map<String, Integer> connectorMap_activeConnections = new HashMap<>();
protected Map<String, Date> connectorMap_errorDate = new HashMap<>();
private int totalWeight = 0;
private int position = -1;
public int getNextPosition(boolean checkByWeight) throws BehaviourException {
if(!isPassiveHealthCheck()) {
//synchronized (this.semaphore) {
this.getLock().acquireThrowRuntime("getNextPosition(active)");
try {
return _getNextPosition(checkByWeight);
}finally {
this.getLock().release("getNextPosition(active)");
}
}
else {
//synchronized (this.semaphore) {
this.getLock().acquireThrowRuntime("getNextPosition(passive)");
try {
int pos = _getNextPosition(checkByWeight);
Set<String> setOriginal = this.connectorMap.keySet();
List<String> serverList = new ArrayList<>();
if(checkByWeight) {
serverList.addAll(this.getWeightList(false));
}
else {
serverList.addAll(setOriginal);
}
Set<String> setAfterPassiveHealthCheck = passiveHealthCheck(setOriginal, false);
// prima verifica
String selectedConnector = serverList.get(pos);
if(setAfterPassiveHealthCheck.contains(selectedConnector)) {
return pos;
}
// controllo prossime posizioni fino a tornare a quella attuale
int nextPos = _getNextPosition(checkByWeight);
while(nextPos!=pos) {
selectedConnector = serverList.get(nextPos);
if(setAfterPassiveHealthCheck.contains(selectedConnector)) {
return nextPos;
}
nextPos = _getNextPosition(checkByWeight);
}
throw new BehaviourException("Nessun connettore selezionabile (passive health check)");
}finally {
this.getLock().release("getNextPosition(passive)");
}
}
}
private int _getNextPosition(boolean checkByWeight) {
this.position++;
if(checkByWeight) {
if(this.position==this.totalWeight) {
this.position = 0;
}
}
else {
if(this.position==this.connectorMap.size()) {
this.position = 0;
}
}
return this.position;
}
public List<String> getWeightList(boolean passiveHealthCheck) throws BehaviourException {
Set<String> servers = this.getConnectorNames(passiveHealthCheck);
if(servers.isEmpty()) {
throw new BehaviourException("Nessun connettore selezionabile (passive health check)");
}
List<String> serverList = new ArrayList<>();
Iterator<String> iterator = servers.iterator();
while (iterator.hasNext()) {
String server = iterator.next();
Integer weight = this.getWeight(server);
if (weight == null || weight <= 0) {
weight = LoadBalancerPool.DEFAULT_WEIGHT;
}
for (int i = 0; i < weight; i++) {
serverList.add(server);
}
}
debug("weightList (passiveHealthCheck:"+passiveHealthCheck+"): "+serverList);
return serverList;
}
private transient org.openspcoop2.utils.Semaphore _lockLeastConnectionsIndex = null;
private synchronized void initLockLeastConnectionsIndex() {
if(this._lockLeastConnectionsIndex==null) {
this._lockLeastConnectionsIndex = new org.openspcoop2.utils.Semaphore("LoadBalancerPoolLeastConnections");
}
}
public org.openspcoop2.utils.Semaphore getLockLeastConnectionsIndex(){
if(this._lockLeastConnectionsIndex==null) {
initLockLeastConnectionsIndex();
}
return this._lockLeastConnectionsIndex;
}
private int leastConnectionsIndex = 0;
private String getNextLeastConnectionsConnector(int min, List<String> listMin) {
if(listMin==null || listMin.isEmpty()) {
return null;
}
// Nel caso vi siano più connettori che sono con lo stesso numero di connessioni, viene effettuato un roundrobin
// Serve a evitare che se arrivano richieste simultanee prima della registrazione della nuova connessione (che avviene dopo non in maniera transazione)
// viene scelto il solito connettore
this.getLockLeastConnectionsIndex().acquireThrowRuntime("getNextLeastConnectionsIndex");
try {
int c = 0;
if(this.leastConnectionsIndex<listMin.size()) {
c = this.leastConnectionsIndex;
}
this.leastConnectionsIndex++;
debug("getNextConnectorLeastConnections minActiveConnections["+min+"] (ConnettoreSelezionato:"+c+"): "+listMin);
return listMin.get(c);
}finally{
this.getLockLeastConnectionsIndex().release("getNextLeastConnectionsIndex");
}
}
public String getNextConnectorLeastConnections() {
//synchronized (this.semaphore) {
this.getLock().acquireThrowRuntime("getNextConnectorLeastConnections");
try {
debug("getNextConnectorLeastConnections situazione iniziale ("+this.connectorMap_activeConnections+")");
Set<String> setKeys = passiveHealthCheck(this.connectorMap.keySet(), false);
List<String> listMin = new ArrayList<>();
int min = 0;
if(!this.connectorMap_activeConnections.isEmpty()) {
min = Integer.MAX_VALUE;
for (String name : setKeys) {
if(this.connectorMap_activeConnections.containsKey(name)==false) {
if(min != 0) {
min = 0;
listMin.clear();
}
listMin.add(name);
}
else {
int active = this.connectorMap_activeConnections.get(name);
if(active<min) {
min = active;
listMin.clear();
listMin.add(name);
}
else if(active==min) {
listMin.add(name);
}
}
}
}
if(listMin.isEmpty()) {
listMin.addAll(setKeys);
debug("getNextConnectorLeastConnections: list is empty");
}
return getNextLeastConnectionsConnector(min, listMin);
}finally{
this.getLock().release("getNextConnectorLeastConnections");
}
}
public boolean isEmpty() {
return this.connectorMap.isEmpty();
}
public Set<String> getConnectorNames(boolean passiveHealthCheck) {
if(passiveHealthCheck) {
return passiveHealthCheck(this.connectorMap.keySet(), true);
}
else {
return this.connectorMap.keySet();
}
}
public int getWeight(String name) {
return this.connectorMap.get(name);
}
public void addConnector(String name) throws BehaviourException {
this.addConnector(name, DEFAULT_WEIGHT);
}
public void addConnector(String name, int weight) throws BehaviourException {
//synchronized (this.semaphore) {
this.getLock().acquireThrowRuntime("addConnector");
try {
if(this.connectorMap.containsKey(name)) {
throw new BehaviourException("Already exists connector '"+name+"'");
}
this.connectorMap.put(name, weight);
this.totalWeight = this.totalWeight+weight;
}finally{
this.getLock().release("addConnector");
}
}
public void registerConnectionError(String name) throws BehaviourException {
//synchronized (this.semaphore) {
this.getLock().acquireThrowRuntime("registerConnectionError");
try {
if(this.connectorMap_errorDate.containsKey(name)==false) {
// non aggiorniamo eventualmente la data, teniamo la prima
debug("Registrazione errore di connessione per connettore ["+name+"]");
this.connectorMap_errorDate.put(name, DateManager.getDate());
}
else {
debug("Registrazione non effettuata dell'errore di connessione per connettore ["+name+"]: gia' presente una entry");
}
}finally {
this.getLock().release("registerConnectionError");
}
}
public void addActiveConnection(String name) throws BehaviourException {
//synchronized (this.semaphore) {
this.getLock().acquireThrowRuntime("addActiveConnection");
try {
int activeConnections = 0;
if(this.connectorMap_activeConnections.containsKey(name)) {
activeConnections = this.connectorMap_activeConnections.remove(name);
}
activeConnections++;
this.connectorMap_activeConnections.put(name, activeConnections);
debug("Registrazione connessione attiva per connettore ["+name+"] (active:"+activeConnections+")");
}finally {
this.getLock().release("addActiveConnection");
}
}
public void removeActiveConnection(String name) throws BehaviourException {
//synchronized (this.semaphore) {
this.getLock().acquireThrowRuntime("removeActiveConnection");
try {
int activeConnections = 0;
if(this.connectorMap_activeConnections.containsKey(name)) {
activeConnections = this.connectorMap_activeConnections.remove(name);
}
activeConnections--;
if(activeConnections>0) {
this.connectorMap_activeConnections.put(name, activeConnections);
}
debug("Rimozione connessione attiva per connettore ["+name+"] (active:"+activeConnections+")");
}finally {
this.getLock().release("removeActiveConnection");
}
}
protected boolean isPassiveHealthCheck() {
return this.healthCheck!=null && this.healthCheck.isPassiveCheckEnabled() &&
this.healthCheck.getPassiveHealthCheck_excludeForSeconds()!=null &&
this.healthCheck.getPassiveHealthCheck_excludeForSeconds().intValue()>0;
}
private Set<String> passiveHealthCheck(Set<String> set, boolean syncErase){
if(!isPassiveHealthCheck() || this.connectorMap_errorDate.isEmpty()) {
return set;
}
Date now = DateManager.getDate();
debug("Passive Health Check della lista: "+set);
Set<String> newSet = new HashSet<String>();
List<String> listRimuoviDate = new ArrayList<>();
for (String name : set) {
if(this.connectorMap_errorDate.containsKey(name)) {
Date registrationDate = this.connectorMap_errorDate.get(name);
long registrationDateLong = registrationDate.getTime();
long registrationDateExpired = registrationDateLong + (this.healthCheck.getPassiveHealthCheck_excludeForSeconds().intValue() * 1000);
if(registrationDateExpired<now.getTime()) {
debug("(PassiveHealthCheck) Rilevato errore di connessione scaduto per connettore ["+name+"]");
listRimuoviDate.add(name);
}
else {
debug("(PassiveHealthCheck) Rilevato errore di connessione non ancora scaduto per connettore ["+name+"]");
continue; // non e' ancora scaduto
}
}
else {
debug("(PassiveHealthCheck) Non è presente alcun errore di connessione per il connettore ["+name+"]");
}
newSet.add(name);
}
if(listRimuoviDate!=null && !listRimuoviDate.isEmpty()) {
debug("(PassiveHealthCheck) lista di errori di connessione scaduti: "+listRimuoviDate);
if(syncErase) {
//synchronized (this.semaphore) { // un altro thread potrebbe già averlo modificato
this.getLock().acquireThrowRuntime("passiveHealthCheck(date)");
try {
cleanErrorDate(listRimuoviDate, now);
}finally {
this.getLock().release("passiveHealthCheck(date)");
}
}
else {
cleanErrorDate(listRimuoviDate, now);
}
}
if(newSet.isEmpty()) {
// Se tutti i connettori vengono esclusi, non ha senso sospenderli tutti poichè si avrebbe un non servizio anche se poi qualcuno riprende.
// Per questo motivo si ritornano tutti e se re-inizia il giro di verifica.
debug("(PassiveHealthCheck) !!FULL!! tutti i connettori del pool risultano sospesi per errori di connessione: "+this.connectorMap_errorDate.keySet());
Date dateCleaner = DateManager.getDate();
//synchronized (this.semaphore) { // un altro thread potrebbe già averlo modificato
this.getLock().acquireThrowRuntime("passiveHealthCheck(cleanAllErrorDate)");
try {
cleanAllErrorDate(dateCleaner);
}finally {
this.getLock().release("passiveHealthCheck(cleanAllErrorDate)");
}
return set;
}
else {
debug("(PassiveHealthCheck) lista di connettori validi: "+newSet);
}
return newSet;
}
private void cleanErrorDate(List<String> listRimuoviDate, Date now) {
List<String> listDaRimuovere = new ArrayList<>();
for (String name : listRimuoviDate) {
if(this.connectorMap_errorDate.containsKey(name)) {
Date registrationDate = this.connectorMap_errorDate.get(name);
long registrationDateLong = registrationDate.getTime();
long registrationDateExpired = registrationDateLong + (this.healthCheck.getPassiveHealthCheck_excludeForSeconds().intValue() * 1000);
if(registrationDateExpired<now.getTime()) {
debug("Registro da eliminare l'informazione sull'errore di connessione per il connettore ["+name+"]");
listDaRimuovere.add(name);
}
else {
debug("Non registro da eliminare l'informazione sull'errore di connessione per il connettore ["+name+"]: la data di registrazione e' stata aggiornata");
}
}
}
if(!listDaRimuovere.isEmpty()) {
for (String name : listDaRimuovere) {
debug("Elimino l'informazione sull'errore di connessione per il connettore ["+name+"]");
this.connectorMap_errorDate.remove(name);
}
}
}
private void cleanAllErrorDate(Date now) {
debug("(HealthCheck) lista di errori di connessione prima della pulizia totale: "+this.connectorMap_errorDate.keySet());
List<String> listDaRimuovere = new ArrayList<>();
for (String name : this.connectorMap_errorDate.keySet()) {
Date registrationDate = this.connectorMap_errorDate.get(name);
if(registrationDate.before(now)) {
debug("(HealthCheck) Registro da eliminare l'informazione sull'errore di connessione per il connettore ["+name+"]");
listDaRimuovere.add(name);
}
else {
debug("(HealthCheck) Non registro da eliminare l'informazione sull'errore di connessione per il connettore ["+name+"]: la data di registrazione e' stata aggiornata");
}
}
if(!listDaRimuovere.isEmpty()) {
for (String name : listDaRimuovere) {
debug("(HealthCheck) Elimino l'informazione sull'errore di connessione per il connettore ["+name+"]");
this.connectorMap_errorDate.remove(name);
}
}
debug("(HealthCheck) lista di errori di connessione terminata la pulizia totale: "+this.connectorMap_errorDate.keySet());
}
private void debug(String msg) {
if(this.debug) {
OpenSPCoop2Logger.getLoggerOpenSPCoopConnettori().debug(msg);
}
}
}