HazelcastManager.java
- /*
- * GovWay - A customizable API Gateway
- * https://govway.org
- *
- * Copyright (c) 2005-2025 Link.it srl (https://link.it).
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU General Public License version 3, as published by
- * the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- *
- */
- package org.openspcoop2.pdd.core.controllo_traffico.policy.driver.hazelcast;
- import java.io.ByteArrayInputStream;
- import java.io.File;
- import java.io.FileInputStream;
- import java.util.ArrayList;
- import java.util.HashMap;
- import java.util.List;
- import java.util.Map;
- import org.apache.commons.lang.StringUtils;
- import org.openspcoop2.core.controllo_traffico.driver.PolicyException;
- import org.openspcoop2.core.controllo_traffico.driver.PolicyGroupByActiveThreadsType;
- import org.openspcoop2.utils.Utilities;
- import org.openspcoop2.utils.json.JsonPathExpressionEngine;
- import org.openspcoop2.utils.json.JsonPathNotFoundException;
- import org.openspcoop2.utils.json.YAMLUtils;
- import org.openspcoop2.utils.resources.Charset;
- import org.openspcoop2.utils.resources.FileSystemUtilities;
- import org.slf4j.Logger;
- import com.fasterxml.jackson.databind.JsonNode;
- import com.hazelcast.config.Config;
- import com.hazelcast.config.InMemoryYamlConfig;
- import com.hazelcast.config.NetworkConfig;
- import com.hazelcast.config.YamlConfigBuilder;
- import com.hazelcast.core.Hazelcast;
- import com.hazelcast.core.HazelcastInstance;
- /**
- * HazelcastManager
- *
- * @author Francesco Scarlato (scarlato@link.it)
- * @author $Author$
- * @version $Rev$, $Date$
- */
- public class HazelcastManager {
- private static String GOVWAY_INSTANCE_PORT = "GOVWAY_INSTANCE_PORT";
-
- public static Map<PolicyGroupByActiveThreadsType, HazelcastInstance> staticMapInstance = null;
- private static Map<PolicyGroupByActiveThreadsType,String> staticMapConfig = null;
- private static String groupId;
-
- private static YAMLUtils yamlUtils = null;
- private static JsonPathExpressionEngine engine = null;
-
- private static JsonNode sharedConfigNode;
- private static File shareConfigFile;
-
- private static Logger logStartup;
- private static Logger log;
-
- public static synchronized void initialize(Logger logStartup, Logger log, Map<PolicyGroupByActiveThreadsType,String> config, String groupId, File shareConfigFile) throws Exception {
-
- if(HazelcastManager.staticMapInstance==null){
-
- /*
- * This configuration disables the shutdown hook in hazelcast, which ensures that the hazelcast instance shuts down gracefully whenever the product node shuts down.
- * If the hazelcast shutdown hook is enabled (which is the default behavior of a product),
- * you will see errors such as " Hazelcast instance is not active! " at the time of shutting down the product node:
- * This is because the hazelcast instance shuts down too early when the shutdown hook is enabled.
- **/
- System.setProperty("hazelcast.shutdownhook.enabled","false");
-
- /*
- * This configuration sets the hazelcast logging type to log4j2, which allows hazelcast logs to be written to the govway_hazelcast.log file.
- **/
- System.setProperty("hazelcast.logging.type", "log4j2");
-
-
- HazelcastManager.staticMapInstance = new HashMap<PolicyGroupByActiveThreadsType, HazelcastInstance>();
- HazelcastManager.staticMapConfig = new HashMap<PolicyGroupByActiveThreadsType, String>();
- HazelcastManager.groupId = groupId;
- HazelcastManager.logStartup = logStartup;
- HazelcastManager.log = log;
-
- if(shareConfigFile!=null) {
-
- HazelcastManager.shareConfigFile = shareConfigFile;
- HazelcastManager.yamlUtils = YAMLUtils.getInstance();
- HazelcastManager.engine = new JsonPathExpressionEngine();
-
- String sharedContentBytes = FileSystemUtilities.readFile(shareConfigFile);
-
- try {
- HazelcastManager.sharedConfigNode = yamlUtils.getAsNode(sharedContentBytes.getBytes());
- }catch(Throwable t) {
- throw new PolicyException("Configuration '"+shareConfigFile.getAbsolutePath()+"' is not valid yaml config: "+t.getMessage()+"\n"+sharedContentBytes,t);
- }
- Config sharedConfig = null;
- try {
- // La valido ma poi la ricostruiro' tutte le volte per poi sostituire la porta
- try(ByteArrayInputStream bin = new ByteArrayInputStream(sharedContentBytes.getBytes())){
- YamlConfigBuilder builder = new YamlConfigBuilder(bin);
- sharedConfig = builder.build();
- }
- }catch(Throwable t) {
- throw new PolicyException("Configuration '"+shareConfigFile.getAbsolutePath()+"' is not valid yaml config (YamlConfigBuilder): "+t.getMessage()+"\n"+sharedContentBytes,t);
- }
-
- if(sharedConfig!=null &&
- sharedConfig.getClusterName()!=null &&
- StringUtils.isNotEmpty(sharedConfig.getClusterName())) {
- // Se definita sovrascrive il valore indicato nella proprietà 'org.openspcoop2.pdd.controlloTraffico.gestorePolicy.inMemory.HAZELCAST.group_id' in govway_local.properties
- // passato come parametro 'groupId'
- HazelcastManager.groupId = sharedConfig.getClusterName();
- }
- }
-
- if(config!=null && !config.isEmpty()) {
- for (PolicyGroupByActiveThreadsType type : config.keySet()) {
- String pathConfig = config.get(type);
- String content = null;
- if(pathConfig != null) {
- File hazelcastConfigFile = new File(pathConfig);
- if(!hazelcastConfigFile.exists()) {
- //consento di definire la proprietà senza che sia presente il file
- //throw new Exception("Hazelcast file config ["+hazelcastConfigFile.getAbsolutePath()+"] not exists");
- }
- else {
- if(!hazelcastConfigFile.canRead()) {
- throw new Exception("Hazelcast (type:"+type+") file config ["+hazelcastConfigFile.getAbsolutePath()+"] cannot read");
- }
- content = FileSystemUtilities.readFile(hazelcastConfigFile);
- }
- }
- if(content==null) {
- // default
- String name = "";
- switch (type) {
- case HAZELCAST_MAP:
- name = "govway.hazelcast-map.yaml";
- break;
- case HAZELCAST_NEAR_CACHE:
- name = "govway.hazelcast-near-cache.yaml";
- break;
- case HAZELCAST_NEAR_CACHE_UNSAFE_SYNC_MAP:
- name = "govway.hazelcast-near-cache-unsafe-sync-map.yaml";
- break;
- case HAZELCAST_NEAR_CACHE_UNSAFE_ASYNC_MAP:
- name = "govway.hazelcast-near-cache-unsafe-async-map.yaml";
- break;
- case HAZELCAST_LOCAL_CACHE:
- name = "govway.hazelcast-local-cache.yaml";
- break;
- case HAZELCAST_REPLICATED_MAP:
- name= "govway.hazelcast-replicated-map.yaml";
- break;
- case HAZELCAST_ATOMIC_LONG:
- name = "govway.hazelcast-atomic-long-counters.yaml";
- break;
- case HAZELCAST_ATOMIC_LONG_ASYNC:
- name = "govway.hazelcast-atomic-long-async-counters.yaml";
- break;
- case HAZELCAST_PNCOUNTER:
- name = "govway.hazelcast-pn-counters.yaml";
- break;
- default:
- throw new Exception("Hazelcast type '"+type+"' unsupported");
- }
- content = Utilities.getAsString(HazelcastManager.class.getResourceAsStream("/"+name),Charset.UTF_8.getValue());
- }
- if(content==null) {
- throw new Exception("Hazelcast (type:"+type+") config undefined");
- }
- else {
- HazelcastManager.staticMapConfig.put(type, content);
- }
- }
- }
- }
- }
-
-
- public static List<PolicyGroupByActiveThreadsType> getTipiGestoriHazelcastAttivi() throws PolicyException{
- if(staticMapInstance==null){
- throw new PolicyException("Nessun gestore Hazelcast inizializzato");
- }
- List<PolicyGroupByActiveThreadsType> l = new ArrayList<PolicyGroupByActiveThreadsType>();
- l.addAll(staticMapInstance.keySet());
- return l;
- }
-
- public static boolean isAttivo(PolicyGroupByActiveThreadsType type) {
- if(staticMapInstance==null){
- return false;
- }
- return staticMapInstance.containsKey(type);
- }
-
- public static HazelcastInstance getInstance(PolicyGroupByActiveThreadsType type) throws PolicyException{
- if(staticMapInstance==null){
- throw new PolicyException("Nessun gestore Hazelcast inizializzato");
- }
- HazelcastInstance gestore = staticMapInstance.get(type);
- if(gestore==null) {
- HazelcastManager.initialize(type);
- gestore = staticMapInstance.get(type);
- }
- if(gestore==null) {
- throw new PolicyException("Gestore Hazelcast '"+type+"' non inizializzato ??");
- }
- return gestore;
- }
-
- private static synchronized void initialize(PolicyGroupByActiveThreadsType type) throws PolicyException{
- if(!HazelcastManager.staticMapInstance.containsKey(type)) {
-
- info("Inizializzazione Gestore Hazelcast '"+type+"' ...");
-
- HazelcastManager.staticMapInstance.put(type, newInstance(type));
-
- info("Inizializzazione Gestore Hazelcast '"+type+"' effettuata con successo");
-
- }
- }
-
- private static synchronized HazelcastInstance newInstance(PolicyGroupByActiveThreadsType type) throws PolicyException {
-
- String content = HazelcastManager.staticMapConfig.get(type);
- if(content==null) {
- throw new PolicyException("Hazelcast config undefined for type '"+type+"'");
- }
-
- String groupId = null;
- switch (type) {
- case HAZELCAST_MAP:
- groupId = HazelcastManager.groupId+"-map";
- break;
- case HAZELCAST_NEAR_CACHE:
- groupId = HazelcastManager.groupId+"-near-cache";
- break;
- case HAZELCAST_NEAR_CACHE_UNSAFE_SYNC_MAP:
- groupId = HazelcastManager.groupId+"-near-cache-unsafe-sync-map";
- break;
- case HAZELCAST_NEAR_CACHE_UNSAFE_ASYNC_MAP:
- groupId = HazelcastManager.groupId+"-near-cache-unsafe-async-map";
- break;
- case HAZELCAST_LOCAL_CACHE:
- groupId = HazelcastManager.groupId+"-local-cache";
- break;
- case HAZELCAST_REPLICATED_MAP:
- groupId = HazelcastManager.groupId+"-replicated-map";
- break;
- case HAZELCAST_PNCOUNTER:
- groupId = HazelcastManager.groupId+"-pncounter";
- break;
- case HAZELCAST_ATOMIC_LONG:
- groupId = HazelcastManager.groupId+"-atomic-long";
- break;
- case HAZELCAST_ATOMIC_LONG_ASYNC:
- groupId = HazelcastManager.groupId+"-atomic-long-async";
- break;
- default:
- throw new PolicyException("Hazelcast type '"+type+"' unsupported");
- }
-
- content = content.replace("cluster-name:", "cluster-name: "+groupId+"\n#cluster-name:");
-
- debug("Inizializzo hazelcast con la seguente configurazione (cluster-id"+groupId+"): " + content);
-
- InMemoryYamlConfig hazelcastConfig = new InMemoryYamlConfig(content);
- hazelcastConfig.setClusterName(groupId);
- setNetwork(hazelcastConfig, content, groupId);
-
- HazelcastInstance hazelcast = Hazelcast.newHazelcastInstance(hazelcastConfig);
- if(hazelcast==null) {
- throw new PolicyException("Hazelcast init failed");
- }
-
- return hazelcast;
- }
-
- private static void setNetwork(InMemoryYamlConfig hazelcastConfig, String hazelcastConfigContent, String groupId) throws PolicyException {
-
- Config sharedConfig = null;
- if(HazelcastManager.shareConfigFile!=null) {
- try {
- // La valido ma poi la ricostruiro' tutte le volte per poi sostituire la porta
- try(FileInputStream fin = new FileInputStream(HazelcastManager.shareConfigFile)){
- YamlConfigBuilder builder = new YamlConfigBuilder(fin);
- sharedConfig = builder.build();
- }
- }catch(Throwable t) {
- // e' gia' stato validato in fase di inizializzazione. Un errore qua non dovrebbe succedere
- throw new PolicyException("Configuration '"+shareConfigFile.getAbsolutePath()+"' is not valid yaml config (YamlConfigBuilder): "+t.getMessage(),t);
- }
- }
-
- if(sharedConfig!=null && sharedConfig.getNetworkConfig()!=null) {
- NetworkConfig ncShared = sharedConfig.getNetworkConfig();
- NetworkConfig ncInstance = hazelcastConfig.getNetworkConfig();
- Integer portHazelcastConfigInstance = null;
- if(ncInstance==null) {
- // non dovrebbe mai esserlo, essendo definita una porta per ogni tipo di istanza
- ncInstance = new NetworkConfig();
- hazelcastConfig.setNetworkConfig(ncInstance);
- }
- else {
- portHazelcastConfigInstance = hazelcastConfig.getNetworkConfig().getPort();
- }
-
-
- JsonNode instanceNode = null;
- try {
- instanceNode = yamlUtils.getAsNode(hazelcastConfigContent);
- }catch(Throwable t) {
- throw new PolicyException("Configuration '"+groupId+"' is not valid yaml config:\n"+hazelcastConfigContent);
- }
-
- // overrides the public address of a member
- if(ncShared.getPublicAddress()!=null) {
- boolean definedInstance = isDefined("public-address", yamlUtils, engine, instanceNode);
- if(!definedInstance) {
- ncInstance.setPublicAddress(ncShared.getPublicAddress());
- debug("(cluster-id"+groupId+") override public-address: " + ncInstance.getPublicAddress());
- }
- }
-
- // overrides only defined outbound ports
- if( (ncShared.getOutboundPorts()!=null && !ncShared.getOutboundPorts().isEmpty())
- ||
- (ncShared.getOutboundPortDefinitions()!=null && !ncShared.getOutboundPortDefinitions().isEmpty())) {
- boolean definedInstance = isDefined("outbound-ports", yamlUtils, engine, instanceNode);
- if(!definedInstance) {
- if(ncShared.getOutboundPorts()!=null && !ncShared.getOutboundPorts().isEmpty()){
- ncInstance.setOutboundPorts(ncShared.getOutboundPorts());
- debug("(cluster-id"+groupId+") override outbound-ports: " + ncInstance.getOutboundPorts());
- }
- if((ncShared.getOutboundPortDefinitions()!=null && !ncShared.getOutboundPortDefinitions().isEmpty())) {
- ncInstance.setOutboundPortDefinitions(ncShared.getOutboundPortDefinitions());
- debug("(cluster-id"+groupId+") override outbound-ports (definitions): " + ncInstance.getOutboundPortDefinitions());
- }
- }
- }
-
- // If you set the reuse-address element to true, the TIME_WAIT state is ignored and you can bind the member to the same port again
- boolean definedShared = isDefined("reuse-address", yamlUtils, engine, HazelcastManager.sharedConfigNode);
- if(definedShared) {
- boolean definedInstance = isDefined("reuse-address", yamlUtils, engine, instanceNode);
- if(!definedInstance) {
- ncInstance.setReuseAddress(ncShared.isReuseAddress());
- debug("(cluster-id"+groupId+") override reuse-address: " + ncInstance.isReuseAddress());
- }
- }
-
- // the ports that Hazelcast uses to communicate between cluster members
- //ncShared.getPort();
- //ncShared.getPortCount();
- //ncShared.isPortAutoIncrement();
- // L'unico elemento che viene ignorato se presente è il 'port'.
- // Anche se definito verrà ignorato poichè ogni singola configurazione utilizzata in GovWay richiede una porta dedicata.
- // Per modificarla si deve agire nel file govway.hazelcast-*.yaml presente all'interno dell'archivio govway.ear,
- // modificandolo direttamente dentro l'archivio o riportandolo nella directory di configurazione esterna.
- // override join
- if(ncShared.getJoin()!=null) {
- boolean definedInstance = isDefined("join", yamlUtils, engine, instanceNode);
- if(!definedInstance) {
- ncInstance.setJoin(ncShared.getJoin());
- if(ncInstance.getJoin().getTcpIpConfig()!=null) {
- if(portHazelcastConfigInstance!=null && portHazelcastConfigInstance.intValue()>0) {
- if(ncInstance.getJoin().getTcpIpConfig().getMembers()!=null && !ncInstance.getJoin().getTcpIpConfig().getMembers().isEmpty()) {
- // Se vengono usate le porte, devo clonare la lista e sistemarla con la porta usata sull'attuale istanza
- List<String> newList = new ArrayList<>();
- for (String member : ncInstance.getJoin().getTcpIpConfig().getMembers()) {
- try {
- if(member.contains(GOVWAY_INSTANCE_PORT)) {
- newList.add(member.replace(GOVWAY_INSTANCE_PORT, portHazelcastConfigInstance.intValue()+""));
- }
- else if(member.contains(GOVWAY_INSTANCE_PORT.toLowerCase())) {
- newList.add(member.replace(GOVWAY_INSTANCE_PORT.toLowerCase(), portHazelcastConfigInstance.intValue()+""));
- }
- else {
- newList.add(member);
- }
- /*
- int indexOfFirst = member.indexOf(":");
- if(indexOfFirst>0 && indexOfFirst!=(member.length()-1)) {
- int indexOfSecondIpv6 = member.indexOf(":",(indexOfFirst+1));
- if(indexOfSecondIpv6<0) {
- usePortIpv4=true;
- }
- }
- if(usePortIpv4) {
- String [] split = member.split(":");
- String newIp = split[0] + ":" + portHazelcastConfigInstance.intValue();
- newList.add(newIp);
- }
- else {
- newList.add(member);
- }*/
- }catch(Throwable t) {
- error("(cluster-id"+groupId+") tpcip analisi member '"+member+"' fallita: " +t.getMessage(),t);
- //newList.add(member);
- }
- }
- ncInstance.getJoin().getTcpIpConfig().setMembers(newList);
- }
- }
- }
- debug("(cluster-id"+groupId+") override join: " + ncInstance.getJoin());
- }
- }
-
- // override interface
- if(ncShared.getInterfaces()!=null && ncShared.getInterfaces().isEnabled()) {
- boolean definedInstance = isDefined("interfaces", yamlUtils, engine, instanceNode);
- if(!definedInstance) {
- ncInstance.setInterfaces(ncShared.getInterfaces());
- debug("(cluster-id"+groupId+") override interfaces: " + ncInstance.getInterfaces());
- }
- }
-
- // override altre configurazioni avanzate
- // example yaml: https://github.com/hazelcast/hazelcast/blob/master/hazelcast/src/test/java/com/hazelcast/config/YamlConfigBuilderTest.java
- if(ncShared.getIcmpFailureDetectorConfig()!=null && ncShared.getIcmpFailureDetectorConfig().isEnabled()) {
- boolean definedInstance = isDefined("failure-detector", yamlUtils, engine, instanceNode);
- if(!definedInstance) {
- ncInstance.setIcmpFailureDetectorConfig(ncShared.getIcmpFailureDetectorConfig());
- debug("(cluster-id"+groupId+") override failure-detector icmp: " + ncInstance.getIcmpFailureDetectorConfig());
- }
- }
- if(ncShared.getMemberAddressProviderConfig()!=null && ncShared.getMemberAddressProviderConfig().isEnabled()) {
- boolean definedInstance = isDefined("member-address-provider", yamlUtils, engine, instanceNode);
- if(!definedInstance) {
- ncInstance.setMemberAddressProviderConfig(ncShared.getMemberAddressProviderConfig());
- debug("(cluster-id"+groupId+") override member-address-provider: " + ncInstance.getMemberAddressProviderConfig());
- }
- }
- if(ncShared.getMemcacheProtocolConfig()!=null && ncShared.getMemcacheProtocolConfig().isEnabled()) {
- boolean definedInstance = isDefined("memcache-protocol", yamlUtils, engine, instanceNode);
- if(!definedInstance) {
- ncInstance.setMemcacheProtocolConfig(ncShared.getMemcacheProtocolConfig());
- debug("(cluster-id"+groupId+") override memcache-protocol: " + ncInstance.getMemcacheProtocolConfig());
- }
- }
- if(ncShared.getRestApiConfig()!=null && ncShared.getRestApiConfig().isEnabled()) {
- boolean definedInstance = isDefined("rest-api", yamlUtils, engine, instanceNode);
- if(!definedInstance) {
- ncInstance.setRestApiConfig(ncShared.getRestApiConfig());
- debug("(cluster-id"+groupId+") override rest-api: " + ncInstance.getRestApiConfig());
- }
- }
- if(ncShared.getSocketInterceptorConfig()!=null && ncShared.getSocketInterceptorConfig().getClassName()!=null) {
- boolean definedInstance = isDefined("socket-interceptor", yamlUtils, engine, instanceNode);
- if(!definedInstance) {
- ncInstance.setSocketInterceptorConfig(ncShared.getSocketInterceptorConfig());
- debug("(cluster-id"+groupId+") override socket-interceptor: " + ncInstance.getSocketInterceptorConfig());
- }
- }
- if(ncShared.getSSLConfig()!=null && ncShared.getSSLConfig().isEnabled()) {
- boolean definedInstance = isDefined("ssl", yamlUtils, engine, instanceNode);
- if(!definedInstance) {
- ncInstance.setSSLConfig(ncShared.getSSLConfig());
- debug("(cluster-id"+groupId+") override ssl: " + ncInstance.getSSLConfig());
- }
- }
- /* Deprecata
- if(ncShared.getSymmetricEncryptionConfig()!=null && ncShared.getSymmetricEncryptionConfig().isEnabled()) {
- boolean definedInstance = isDefined("symmetric-encryption", yamlUtils, engine, node);
- if(!definedInstance) {
- ncInstance.setSymmetricEncryptionConfig(ncShared.getSymmetricEncryptionConfig());
- debug("(cluster-id"+groupId+") override symmetric-encryption: " + ncInstance.getSymmetricEncryptionConfig());
- }
- }
- */
-
- }
- }
-
- private static boolean isDefined(String pattern, YAMLUtils yamlUtils, JsonPathExpressionEngine engine, JsonNode node) throws PolicyException {
- String prefixPattern = "$.hazelcast.network.";
- try {
- JsonNode result = engine.getJsonNodeMatchPattern(node, prefixPattern+pattern);
- return result!=null;
- }catch(JsonPathNotFoundException notFound) {
- return false;
- }catch(Throwable t) {
- throw new PolicyException(t.getMessage(),t);
- }
- }
-
- private static void debug(String msg) {
- if(HazelcastManager.log!=null) {
- HazelcastManager.log.debug(msg);
- }
- if(HazelcastManager.logStartup!=null) {
- HazelcastManager.logStartup.debug(msg);
- }
- }
- private static void info(String msg) {
- if(HazelcastManager.log!=null) {
- HazelcastManager.log.info(msg);
- }
- if(HazelcastManager.logStartup!=null) {
- HazelcastManager.logStartup.info(msg);
- }
- }
- private static void error(String msg, Throwable e) {
- if(HazelcastManager.log!=null) {
- HazelcastManager.log.error(msg,e);
- }
- if(HazelcastManager.logStartup!=null) {
- HazelcastManager.logStartup.error(msg,e);
- }
- }
-
- public static synchronized void close() {
- if(HazelcastManager.staticMapInstance!=null && !HazelcastManager.staticMapInstance.isEmpty()) {
- for (PolicyGroupByActiveThreadsType type : HazelcastManager.staticMapInstance.keySet()) {
- HazelcastInstance hazelcast = HazelcastManager.staticMapInstance.get(type);
- try {
- hazelcast.shutdown();
- }catch(Throwable t) {
- HazelcastManager.log.debug("Hazelcast '"+type+"' shutdown failed: " + t.getMessage(),t);
- }
- }
- }
- }
-
- }