LoadBalancerPool.java

  1. /*
  2.  * GovWay - A customizable API Gateway
  3.  * https://govway.org
  4.  *
  5.  * Copyright (c) 2005-2025 Link.it srl (https://link.it).
  6.  *
  7.  * This program is free software: you can redistribute it and/or modify
  8.  * it under the terms of the GNU General Public License version 3, as published by
  9.  * the Free Software Foundation.
  10.  *
  11.  * This program is distributed in the hope that it will be useful,
  12.  * but WITHOUT ANY WARRANTY; without even the implied warranty of
  13.  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  14.  * GNU General Public License for more details.
  15.  *
  16.  * You should have received a copy of the GNU General Public License
  17.  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
  18.  *
  19.  */

  20. package org.openspcoop2.pdd.core.behaviour.built_in.load_balance;

  21. import java.io.Serializable;
  22. import java.util.ArrayList;
  23. import java.util.Date;
  24. import java.util.HashMap;
  25. import java.util.HashSet;
  26. import java.util.Iterator;
  27. import java.util.List;
  28. import java.util.Map;
  29. import java.util.Set;

  30. import org.openspcoop2.pdd.config.OpenSPCoop2Properties;
  31. import org.openspcoop2.pdd.core.behaviour.BehaviourException;
  32. import org.openspcoop2.pdd.core.behaviour.built_in.load_balance.health_check.HealthCheckConfigurazione;
  33. import org.openspcoop2.pdd.logger.OpenSPCoop2Logger;
  34. import org.openspcoop2.utils.SemaphoreLock;
  35. import org.openspcoop2.utils.date.DateManager;
  36. import org.openspcoop2.utils.date.DateUtils;

  37. /**
  38.  * LoadBalancerPool
  39.  *
  40.  * @author Andrea Poli (apoli@link.it)
  41.  * @author $Author$
  42.  * @version $Rev$, $Date$
  43.  */
  44. public class LoadBalancerPool implements Serializable{

  45.     /**
  46.      *
  47.      */
  48.     private static final long serialVersionUID = 1L;
  49.    
  50.     public static int DEFAULT_WEIGHT = 1;
  51.    
  52.     private HealthCheckConfigurazione healthCheck = null;
  53.     private boolean debug = false;
  54.    
  55.     public LoadBalancerPool(HealthCheckConfigurazione healthCheck) {
  56.         this.healthCheck = healthCheck;
  57.         this.debug = OpenSPCoop2Properties.getInstance().isLoadBalancerDebug();
  58.     }
  59.    
  60.     @Override
  61.     public String toString() {
  62.         //synchronized (this.semaphore) {
  63.         SemaphoreLock lock = this.getLock().acquireThrowRuntime("toString");
  64.         try {
  65.             StringBuilder bf = new StringBuilder();
  66.             bf.append("Connectors: ").append(this.connectorMap.size());
  67.             bf.append("\nTotal Weight: ").append(this.totalWeight);
  68.             bf.append("\nPosition: ").append(this.position);
  69.             if(this.healthCheck!=null) {
  70.                 bf.append("\nPassiveHealtCheck: ").append(this.healthCheck.isPassiveCheckEnabled());
  71.                 if(this.healthCheck.isPassiveCheckEnabled()){
  72.                     bf.append("\n  Exclude for seconds: ").append(this.healthCheck.getPassiveHealthCheck_excludeForSeconds());
  73.                 }
  74.             }
  75.             for (String name : this.connectorMap.keySet()) {
  76.                 bf.append("\n");
  77.                 bf.append("- ").append(name).append(" : ").append(" ( weight:").append(this.connectorMap.get(name));
  78.                 if(this.connectorMap_activeConnections.containsKey(name)) {
  79.                     bf.append(" activeConnections:").append(this.connectorMap_activeConnections.get(name));
  80.                 }
  81.                 if(this.connectorMap_errorDate.containsKey(name)) {
  82.                     bf.append(" connectionError:").append(DateUtils.getSimpleDateFormatMs().format(this.connectorMap_errorDate.get(name)));
  83.                 }
  84.                 bf.append(" )");
  85.             }
  86.             return bf.toString();
  87.         }finally {
  88.             this.getLock().release(lock, "toString");
  89.         }
  90.     }
  91.    
  92.    
  93.     //protected Boolean semaphore = true;
  94.     private transient org.openspcoop2.utils.Semaphore _lock = null;
  95.     private synchronized void initLock() {
  96.         if(this._lock==null) {
  97.             this._lock = new org.openspcoop2.utils.Semaphore("LoadBalancerPool");
  98.         }
  99.     }
  100.     public org.openspcoop2.utils.Semaphore getLock(){
  101.         if(this._lock==null) {
  102.             initLock();
  103.         }
  104.         return this._lock;
  105.     }
  106.     protected Map<String, Integer> connectorMap = new HashMap<>();
  107.     protected Map<String, Integer> connectorMap_activeConnections = new HashMap<>();
  108.     protected Map<String, Date> connectorMap_errorDate = new HashMap<>();
  109.     private int totalWeight = 0;
  110.    
  111.     private int position = -1;
  112.    
  113.    
  114.    

  115.     public int getNextPosition(boolean checkByWeight) throws BehaviourException {
  116.        
  117.         if(!isPassiveHealthCheck()) {
  118.             //synchronized (this.semaphore) {
  119.             SemaphoreLock lock = this.getLock().acquireThrowRuntime("getNextPosition(active)");
  120.             try {
  121.                 return _getNextPosition(checkByWeight);
  122.             }finally {
  123.                 this.getLock().release(lock, "getNextPosition(active)");
  124.             }
  125.         }
  126.         else {
  127.             //synchronized (this.semaphore) {
  128.             SemaphoreLock lock = this.getLock().acquireThrowRuntime("getNextPosition(passive)");
  129.             try {
  130.                 int pos = _getNextPosition(checkByWeight);
  131.                
  132.                 Set<String> setOriginal = this.connectorMap.keySet();
  133.                 List<String> serverList = new ArrayList<>();
  134.                 if(checkByWeight) {
  135.                     serverList.addAll(this.getWeightList(false));
  136.                 }
  137.                 else {
  138.                     serverList.addAll(setOriginal);
  139.                 }
  140.                
  141.                 Set<String> setAfterPassiveHealthCheck = passiveHealthCheck(setOriginal, false);
  142.                
  143.                 // prima verifica
  144.                 String selectedConnector = serverList.get(pos);
  145.                 if(setAfterPassiveHealthCheck.contains(selectedConnector)) {
  146.                     return pos;
  147.                 }
  148.                
  149.                 // controllo prossime posizioni fino a tornare a quella attuale
  150.                 int nextPos = _getNextPosition(checkByWeight);
  151.                 while(nextPos!=pos) {
  152.                     selectedConnector = serverList.get(nextPos);
  153.                     if(setAfterPassiveHealthCheck.contains(selectedConnector)) {
  154.                         return nextPos;
  155.                     }
  156.                     nextPos = _getNextPosition(checkByWeight);
  157.                 }
  158.                
  159.                 throw new BehaviourException("Nessun connettore selezionabile (passive health check)");
  160.             }finally {
  161.                 this.getLock().release(lock, "getNextPosition(passive)");
  162.             }
  163.         }
  164.        
  165.     }
  166.     private int _getNextPosition(boolean checkByWeight) {
  167.         this.position++;
  168.         if(checkByWeight) {
  169.             if(this.position==this.totalWeight) {
  170.                 this.position = 0;
  171.             }
  172.         }
  173.         else {
  174.             if(this.position==this.connectorMap.size()) {
  175.                 this.position = 0;
  176.             }
  177.         }
  178.         return this.position;
  179.     }
  180.    
  181.    
  182.     public List<String> getWeightList(boolean passiveHealthCheck) throws BehaviourException {
  183.         Set<String> servers = this.getConnectorNames(passiveHealthCheck);
  184.         if(servers.isEmpty()) {
  185.             throw new BehaviourException("Nessun connettore selezionabile (passive health check)");
  186.         }
  187.         List<String> serverList = new ArrayList<>();    

  188.         Iterator<String> iterator = servers.iterator();
  189.         while (iterator.hasNext()) {
  190.             String server = iterator.next();
  191.             Integer weight = this.getWeight(server);
  192.             if (weight == null || weight <= 0) {
  193.                 weight = LoadBalancerPool.DEFAULT_WEIGHT;
  194.             }
  195.             for (int i = 0; i < weight; i++) {
  196.                 serverList.add(server);
  197.             }
  198.         }

  199.         debug("weightList (passiveHealthCheck:"+passiveHealthCheck+"): "+serverList);
  200.        
  201.         return serverList;
  202.     }
  203.    

  204.     private transient org.openspcoop2.utils.Semaphore _lockLeastConnectionsIndex = null;
  205.     private synchronized void initLockLeastConnectionsIndex() {
  206.         if(this._lockLeastConnectionsIndex==null) {
  207.             this._lockLeastConnectionsIndex = new org.openspcoop2.utils.Semaphore("LoadBalancerPoolLeastConnections");
  208.         }
  209.     }
  210.     public org.openspcoop2.utils.Semaphore getLockLeastConnectionsIndex(){
  211.         if(this._lockLeastConnectionsIndex==null) {
  212.             initLockLeastConnectionsIndex();
  213.         }
  214.         return this._lockLeastConnectionsIndex;
  215.     }
  216.     private int leastConnectionsIndex = 0;
  217.     private String getNextLeastConnectionsConnector(int min, List<String> listMin) {
  218.         if(listMin==null || listMin.isEmpty()) {
  219.             return null;
  220.         }
  221.         // Nel caso vi siano più connettori che sono con lo stesso numero di connessioni, viene effettuato un roundrobin
  222.         // Serve a evitare che se arrivano richieste simultanee prima della registrazione della nuova connessione (che avviene dopo non in maniera transazione)
  223.         // viene scelto il solito connettore
  224.         SemaphoreLock lock = this.getLockLeastConnectionsIndex().acquireThrowRuntime("getNextLeastConnectionsIndex");
  225.         try {
  226.             int c = 0;
  227.             if(this.leastConnectionsIndex<listMin.size()) {
  228.                 c = this.leastConnectionsIndex;
  229.             }
  230.             this.leastConnectionsIndex++;
  231.            
  232.             debug("getNextConnectorLeastConnections minActiveConnections["+min+"] (ConnettoreSelezionato:"+c+"): "+listMin);

  233.             return listMin.get(c);
  234.         }finally{
  235.             this.getLockLeastConnectionsIndex().release(lock, "getNextLeastConnectionsIndex");
  236.         }
  237.     }
  238.    
  239.     public String getNextConnectorLeastConnections() {
  240.         //synchronized (this.semaphore) {
  241.         SemaphoreLock lock = this.getLock().acquireThrowRuntime("getNextConnectorLeastConnections");
  242.         try {
  243.             debug("getNextConnectorLeastConnections situazione iniziale ("+this.connectorMap_activeConnections+")");
  244.             Set<String> setKeys = passiveHealthCheck(this.connectorMap.keySet(), false);
  245.            
  246.             List<String> listMin = new ArrayList<>();
  247.             int min = 0;
  248.             if(!this.connectorMap_activeConnections.isEmpty()) {
  249.                 min = Integer.MAX_VALUE;
  250.                 for (String name : setKeys) {
  251.                     if(this.connectorMap_activeConnections.containsKey(name)==false) {
  252.                         if(min != 0) {
  253.                             min = 0;
  254.                             listMin.clear();
  255.                         }
  256.                         listMin.add(name);
  257.                     }
  258.                     else {
  259.                         int active = this.connectorMap_activeConnections.get(name);
  260.                         if(active<min) {
  261.                             min = active;
  262.                             listMin.clear();
  263.                             listMin.add(name);
  264.                         }
  265.                         else if(active==min) {
  266.                             listMin.add(name);
  267.                         }
  268.                     }
  269.                 }
  270.             }

  271.             if(listMin.isEmpty()) {
  272.                 listMin.addAll(setKeys);
  273.                 debug("getNextConnectorLeastConnections: list is empty");
  274.                
  275.             }
  276.            
  277.             return getNextLeastConnectionsConnector(min, listMin);
  278.            
  279.         }finally{
  280.             this.getLock().release(lock, "getNextConnectorLeastConnections");
  281.         }
  282.     }
  283.    
  284.     public boolean isEmpty() {
  285.         return this.connectorMap.isEmpty();
  286.     }
  287.    
  288.     public Set<String> getConnectorNames(boolean passiveHealthCheck) {
  289.         if(passiveHealthCheck) {
  290.             return passiveHealthCheck(this.connectorMap.keySet(), true);
  291.         }
  292.         else {
  293.             return this.connectorMap.keySet();
  294.         }
  295.     }
  296.    
  297.     public int getWeight(String name) {
  298.         return this.connectorMap.get(name);
  299.     }
  300.    
  301.     public void addConnector(String name) throws BehaviourException {
  302.         this.addConnector(name, DEFAULT_WEIGHT);
  303.     }
  304.     public void addConnector(String name, int weight) throws BehaviourException {
  305.         //synchronized (this.semaphore) {
  306.         SemaphoreLock lock = this.getLock().acquireThrowRuntime("addConnector");
  307.         try {
  308.             if(this.connectorMap.containsKey(name)) {
  309.                 throw new BehaviourException("Already exists connector '"+name+"'");
  310.             }
  311.             this.connectorMap.put(name, weight);
  312.             this.totalWeight = this.totalWeight+weight;
  313.         }finally{
  314.             this.getLock().release(lock, "addConnector");
  315.         }
  316.        
  317.     }
  318.    
  319.    
  320.     public void registerConnectionError(String name) throws BehaviourException {
  321.         //synchronized (this.semaphore) {
  322.         SemaphoreLock lock = this.getLock().acquireThrowRuntime("registerConnectionError");
  323.         try {
  324.             if(this.connectorMap_errorDate.containsKey(name)==false) {
  325.                 // non aggiorniamo eventualmente la data, teniamo la prima
  326.                 debug("Registrazione errore di connessione per connettore ["+name+"]");
  327.                 this.connectorMap_errorDate.put(name, DateManager.getDate());
  328.             }
  329.             else {
  330.                 debug("Registrazione non effettuata dell'errore di connessione per connettore ["+name+"]: gia' presente una entry");
  331.             }
  332.         }finally {
  333.             this.getLock().release(lock, "registerConnectionError");
  334.         }
  335.     }

  336.     public void addActiveConnection(String name) throws BehaviourException {
  337.         //synchronized (this.semaphore) {
  338.         SemaphoreLock lock = this.getLock().acquireThrowRuntime("addActiveConnection");
  339.         try {
  340.             int activeConnections = 0;
  341.             if(this.connectorMap_activeConnections.containsKey(name)) {
  342.                 activeConnections = this.connectorMap_activeConnections.remove(name);
  343.             }
  344.             activeConnections++;
  345.             this.connectorMap_activeConnections.put(name, activeConnections);
  346.             debug("Registrazione connessione attiva per connettore ["+name+"] (active:"+activeConnections+")");
  347.         }finally {
  348.             this.getLock().release(lock, "addActiveConnection");
  349.         }
  350.     }
  351.     public void removeActiveConnection(String name) throws BehaviourException {
  352.         //synchronized (this.semaphore) {
  353.         SemaphoreLock lock = this.getLock().acquireThrowRuntime("removeActiveConnection");
  354.         try {
  355.             int activeConnections = 0;
  356.             if(this.connectorMap_activeConnections.containsKey(name)) {
  357.                 activeConnections = this.connectorMap_activeConnections.remove(name);
  358.             }
  359.             activeConnections--;
  360.             if(activeConnections>0) {
  361.                 this.connectorMap_activeConnections.put(name, activeConnections);
  362.             }
  363.             debug("Rimozione connessione attiva per connettore ["+name+"] (active:"+activeConnections+")");
  364.         }finally {
  365.             this.getLock().release(lock, "removeActiveConnection");
  366.         }
  367.     }
  368.    
  369.    
  370.     protected boolean isPassiveHealthCheck() {
  371.         return this.healthCheck!=null && this.healthCheck.isPassiveCheckEnabled() &&
  372.                 this.healthCheck.getPassiveHealthCheck_excludeForSeconds()!=null &&
  373.                 this.healthCheck.getPassiveHealthCheck_excludeForSeconds().intValue()>0;
  374.     }
  375.    
  376.     private Set<String> passiveHealthCheck(Set<String> set, boolean syncErase){
  377.         if(!isPassiveHealthCheck() || this.connectorMap_errorDate.isEmpty()) {
  378.             return set;
  379.         }
  380.        
  381.         Date now = DateManager.getDate();
  382.        
  383.         debug("Passive Health Check della lista: "+set);
  384.        
  385.         Set<String> newSet = new HashSet<String>();
  386.         List<String> listRimuoviDate = new ArrayList<>();
  387.        
  388.         for (String name : set) {
  389.             if(this.connectorMap_errorDate.containsKey(name)) {
  390.                 Date registrationDate = this.connectorMap_errorDate.get(name);
  391.                 long registrationDateLong = registrationDate.getTime();
  392.                 long registrationDateExpired = registrationDateLong + (this.healthCheck.getPassiveHealthCheck_excludeForSeconds().intValue() * 1000);
  393.                 if(registrationDateExpired<now.getTime()) {
  394.                     debug("(PassiveHealthCheck) Rilevato errore di connessione scaduto per connettore ["+name+"]");
  395.                     listRimuoviDate.add(name);
  396.                 }
  397.                 else {
  398.                     debug("(PassiveHealthCheck) Rilevato errore di connessione non ancora scaduto per connettore ["+name+"]");
  399.                     continue; // non e' ancora scaduto
  400.                 }
  401.             }
  402.             else {
  403.                 debug("(PassiveHealthCheck) Non è presente alcun errore di connessione per il connettore ["+name+"]");
  404.             }
  405.                        
  406.             newSet.add(name);
  407.         }
  408.        
  409.         if(listRimuoviDate!=null && !listRimuoviDate.isEmpty()) {
  410.             debug("(PassiveHealthCheck) lista di errori di connessione scaduti: "+listRimuoviDate);
  411.             if(syncErase) {
  412.                 //synchronized (this.semaphore) { // un altro thread potrebbe già averlo modificato
  413.                 SemaphoreLock lock = this.getLock().acquireThrowRuntime("passiveHealthCheck(date)");
  414.                 try {
  415.                     cleanErrorDate(listRimuoviDate, now);
  416.                 }finally {
  417.                     this.getLock().release(lock, "passiveHealthCheck(date)");
  418.                 }
  419.             }
  420.             else {
  421.                 cleanErrorDate(listRimuoviDate, now);
  422.             }
  423.         }
  424.        
  425.         if(newSet.isEmpty()) {
  426.             // Se tutti i connettori vengono esclusi, non ha senso sospenderli tutti poichè si avrebbe un non servizio anche se poi qualcuno riprende.
  427.             // Per questo motivo si ritornano tutti e se re-inizia il giro di verifica.
  428.             debug("(PassiveHealthCheck) !!FULL!! tutti i connettori del pool risultano sospesi per errori di connessione: "+this.connectorMap_errorDate.keySet());
  429.             Date dateCleaner = DateManager.getDate();
  430.             //synchronized (this.semaphore) { // un altro thread potrebbe già averlo modificato
  431.             SemaphoreLock lock = this.getLock().acquireThrowRuntime("passiveHealthCheck(cleanAllErrorDate)");
  432.             try {
  433.                 cleanAllErrorDate(dateCleaner);
  434.             }finally {
  435.                 this.getLock().release(lock, "passiveHealthCheck(cleanAllErrorDate)");
  436.             }
  437.             return set;
  438.         }
  439.         else {
  440.             debug("(PassiveHealthCheck) lista di connettori validi: "+newSet);
  441.         }
  442.        
  443.         return newSet;
  444.     }
  445.     private void cleanErrorDate(List<String> listRimuoviDate, Date now) {
  446.         List<String> listDaRimuovere = new ArrayList<>();
  447.         for (String name : listRimuoviDate) {
  448.             if(this.connectorMap_errorDate.containsKey(name)) {
  449.                 Date registrationDate = this.connectorMap_errorDate.get(name);
  450.                 long registrationDateLong = registrationDate.getTime();
  451.                 long registrationDateExpired = registrationDateLong + (this.healthCheck.getPassiveHealthCheck_excludeForSeconds().intValue() * 1000);
  452.                 if(registrationDateExpired<now.getTime()) {
  453.                     debug("Registro da eliminare l'informazione sull'errore di connessione per il connettore ["+name+"]");
  454.                     listDaRimuovere.add(name);
  455.                 }
  456.                 else {
  457.                     debug("Non registro da eliminare l'informazione sull'errore di connessione per il connettore ["+name+"]: la data di registrazione e' stata aggiornata");
  458.                 }
  459.             }
  460.         }
  461.         if(!listDaRimuovere.isEmpty()) {
  462.             for (String name : listDaRimuovere) {
  463.                 debug("Elimino l'informazione sull'errore di connessione per il connettore ["+name+"]");
  464.                 this.connectorMap_errorDate.remove(name);
  465.             }
  466.         }
  467.     }
  468.     private void cleanAllErrorDate(Date now) {
  469.         debug("(HealthCheck) lista di errori di connessione prima della pulizia totale: "+this.connectorMap_errorDate.keySet());
  470.         List<String> listDaRimuovere = new ArrayList<>();
  471.         for (String name : this.connectorMap_errorDate.keySet()) {
  472.             Date registrationDate = this.connectorMap_errorDate.get(name);
  473.             if(registrationDate.before(now)) {
  474.                 debug("(HealthCheck) Registro da eliminare l'informazione sull'errore di connessione per il connettore ["+name+"]");
  475.                 listDaRimuovere.add(name);
  476.             }
  477.             else {
  478.                 debug("(HealthCheck) Non registro da eliminare l'informazione sull'errore di connessione per il connettore ["+name+"]: la data di registrazione e' stata aggiornata");
  479.             }
  480.         }
  481.         if(!listDaRimuovere.isEmpty()) {
  482.             for (String name : listDaRimuovere) {
  483.                 debug("(HealthCheck) Elimino l'informazione sull'errore di connessione per il connettore ["+name+"]");
  484.                 this.connectorMap_errorDate.remove(name);
  485.             }
  486.         }
  487.         debug("(HealthCheck) lista di errori di connessione terminata la pulizia totale: "+this.connectorMap_errorDate.keySet());
  488.     }
  489.    
  490.    
  491.     private void debug(String msg) {
  492.         if(this.debug) {
  493.             OpenSPCoop2Logger.getLoggerOpenSPCoopConnettori().debug(msg);
  494.         }
  495.     }
  496. }