HazelcastManager.java
/*
* GovWay - A customizable API Gateway
* https://govway.org
*
* Copyright (c) 2005-2024 Link.it srl (https://link.it).
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License version 3, as published by
* the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
*/
package org.openspcoop2.pdd.core.controllo_traffico.policy.driver.hazelcast;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.commons.lang.StringUtils;
import org.openspcoop2.core.controllo_traffico.driver.PolicyException;
import org.openspcoop2.core.controllo_traffico.driver.PolicyGroupByActiveThreadsType;
import org.openspcoop2.utils.Utilities;
import org.openspcoop2.utils.json.JsonPathExpressionEngine;
import org.openspcoop2.utils.json.JsonPathNotFoundException;
import org.openspcoop2.utils.json.YAMLUtils;
import org.openspcoop2.utils.resources.Charset;
import org.openspcoop2.utils.resources.FileSystemUtilities;
import org.slf4j.Logger;
import com.fasterxml.jackson.databind.JsonNode;
import com.hazelcast.config.Config;
import com.hazelcast.config.InMemoryYamlConfig;
import com.hazelcast.config.NetworkConfig;
import com.hazelcast.config.YamlConfigBuilder;
import com.hazelcast.core.Hazelcast;
import com.hazelcast.core.HazelcastInstance;
/**
* HazelcastManager
*
* @author Francesco Scarlato (scarlato@link.it)
* @author $Author$
* @version $Rev$, $Date$
*/
public class HazelcastManager {
private static String GOVWAY_INSTANCE_PORT = "GOVWAY_INSTANCE_PORT";
public static Map<PolicyGroupByActiveThreadsType, HazelcastInstance> staticMapInstance = null;
private static Map<PolicyGroupByActiveThreadsType,String> staticMapConfig = null;
private static String groupId;
private static YAMLUtils yamlUtils = null;
private static JsonPathExpressionEngine engine = null;
private static JsonNode sharedConfigNode;
private static File shareConfigFile;
private static Logger logStartup;
private static Logger log;
public static synchronized void initialize(Logger logStartup, Logger log, Map<PolicyGroupByActiveThreadsType,String> config, String groupId, File shareConfigFile) throws Exception {
if(HazelcastManager.staticMapInstance==null){
/*
* This configuration disables the shutdown hook in hazelcast, which ensures that the hazelcast instance shuts down gracefully whenever the product node shuts down.
* If the hazelcast shutdown hook is enabled (which is the default behavior of a product),
* you will see errors such as " Hazelcast instance is not active! " at the time of shutting down the product node:
* This is because the hazelcast instance shuts down too early when the shutdown hook is enabled.
**/
System.setProperty("hazelcast.shutdownhook.enabled","false");
/*
* This configuration sets the hazelcast logging type to log4j2, which allows hazelcast logs to be written to the govway_hazelcast.log file.
**/
System.setProperty("hazelcast.logging.type", "log4j2");
HazelcastManager.staticMapInstance = new HashMap<PolicyGroupByActiveThreadsType, HazelcastInstance>();
HazelcastManager.staticMapConfig = new HashMap<PolicyGroupByActiveThreadsType, String>();
HazelcastManager.groupId = groupId;
HazelcastManager.logStartup = logStartup;
HazelcastManager.log = log;
if(shareConfigFile!=null) {
HazelcastManager.shareConfigFile = shareConfigFile;
HazelcastManager.yamlUtils = YAMLUtils.getInstance();
HazelcastManager.engine = new JsonPathExpressionEngine();
String sharedContentBytes = FileSystemUtilities.readFile(shareConfigFile);
try {
HazelcastManager.sharedConfigNode = yamlUtils.getAsNode(sharedContentBytes.getBytes());
}catch(Throwable t) {
throw new PolicyException("Configuration '"+shareConfigFile.getAbsolutePath()+"' is not valid yaml config: "+t.getMessage()+"\n"+sharedContentBytes,t);
}
Config sharedConfig = null;
try {
// La valido ma poi la ricostruiro' tutte le volte per poi sostituire la porta
try(ByteArrayInputStream bin = new ByteArrayInputStream(sharedContentBytes.getBytes())){
YamlConfigBuilder builder = new YamlConfigBuilder(bin);
sharedConfig = builder.build();
}
}catch(Throwable t) {
throw new PolicyException("Configuration '"+shareConfigFile.getAbsolutePath()+"' is not valid yaml config (YamlConfigBuilder): "+t.getMessage()+"\n"+sharedContentBytes,t);
}
if(sharedConfig!=null &&
sharedConfig.getClusterName()!=null &&
StringUtils.isNotEmpty(sharedConfig.getClusterName())) {
// Se definita sovrascrive il valore indicato nella proprietà 'org.openspcoop2.pdd.controlloTraffico.gestorePolicy.inMemory.HAZELCAST.group_id' in govway_local.properties
// passato come parametro 'groupId'
HazelcastManager.groupId = sharedConfig.getClusterName();
}
}
if(config!=null && !config.isEmpty()) {
for (PolicyGroupByActiveThreadsType type : config.keySet()) {
String pathConfig = config.get(type);
String content = null;
if(pathConfig != null) {
File hazelcastConfigFile = new File(pathConfig);
if(!hazelcastConfigFile.exists()) {
//consento di definire la proprietà senza che sia presente il file
//throw new Exception("Hazelcast file config ["+hazelcastConfigFile.getAbsolutePath()+"] not exists");
}
else {
if(!hazelcastConfigFile.canRead()) {
throw new Exception("Hazelcast (type:"+type+") file config ["+hazelcastConfigFile.getAbsolutePath()+"] cannot read");
}
content = FileSystemUtilities.readFile(hazelcastConfigFile);
}
}
if(content==null) {
// default
String name = "";
switch (type) {
case HAZELCAST_MAP:
name = "govway.hazelcast-map.yaml";
break;
case HAZELCAST_NEAR_CACHE:
name = "govway.hazelcast-near-cache.yaml";
break;
case HAZELCAST_NEAR_CACHE_UNSAFE_SYNC_MAP:
name = "govway.hazelcast-near-cache-unsafe-sync-map.yaml";
break;
case HAZELCAST_NEAR_CACHE_UNSAFE_ASYNC_MAP:
name = "govway.hazelcast-near-cache-unsafe-async-map.yaml";
break;
case HAZELCAST_LOCAL_CACHE:
name = "govway.hazelcast-local-cache.yaml";
break;
case HAZELCAST_REPLICATED_MAP:
name= "govway.hazelcast-replicated-map.yaml";
break;
case HAZELCAST_ATOMIC_LONG:
name = "govway.hazelcast-atomic-long-counters.yaml";
break;
case HAZELCAST_ATOMIC_LONG_ASYNC:
name = "govway.hazelcast-atomic-long-async-counters.yaml";
break;
case HAZELCAST_PNCOUNTER:
name = "govway.hazelcast-pn-counters.yaml";
break;
default:
throw new Exception("Hazelcast type '"+type+"' unsupported");
}
content = Utilities.getAsString(HazelcastManager.class.getResourceAsStream("/"+name),Charset.UTF_8.getValue());
}
if(content==null) {
throw new Exception("Hazelcast (type:"+type+") config undefined");
}
else {
HazelcastManager.staticMapConfig.put(type, content);
}
}
}
}
}
public static List<PolicyGroupByActiveThreadsType> getTipiGestoriHazelcastAttivi() throws PolicyException{
if(staticMapInstance==null){
throw new PolicyException("Nessun gestore Hazelcast inizializzato");
}
List<PolicyGroupByActiveThreadsType> l = new ArrayList<PolicyGroupByActiveThreadsType>();
l.addAll(staticMapInstance.keySet());
return l;
}
public static boolean isAttivo(PolicyGroupByActiveThreadsType type) {
if(staticMapInstance==null){
return false;
}
return staticMapInstance.containsKey(type);
}
public static HazelcastInstance getInstance(PolicyGroupByActiveThreadsType type) throws PolicyException{
if(staticMapInstance==null){
throw new PolicyException("Nessun gestore Hazelcast inizializzato");
}
HazelcastInstance gestore = staticMapInstance.get(type);
if(gestore==null) {
HazelcastManager.initialize(type);
gestore = staticMapInstance.get(type);
}
if(gestore==null) {
throw new PolicyException("Gestore Hazelcast '"+type+"' non inizializzato ??");
}
return gestore;
}
private static synchronized void initialize(PolicyGroupByActiveThreadsType type) throws PolicyException{
if(!HazelcastManager.staticMapInstance.containsKey(type)) {
info("Inizializzazione Gestore Hazelcast '"+type+"' ...");
HazelcastManager.staticMapInstance.put(type, newInstance(type));
info("Inizializzazione Gestore Hazelcast '"+type+"' effettuata con successo");
}
}
private static synchronized HazelcastInstance newInstance(PolicyGroupByActiveThreadsType type) throws PolicyException {
String content = HazelcastManager.staticMapConfig.get(type);
if(content==null) {
throw new PolicyException("Hazelcast config undefined for type '"+type+"'");
}
String groupId = null;
switch (type) {
case HAZELCAST_MAP:
groupId = HazelcastManager.groupId+"-map";
break;
case HAZELCAST_NEAR_CACHE:
groupId = HazelcastManager.groupId+"-near-cache";
break;
case HAZELCAST_NEAR_CACHE_UNSAFE_SYNC_MAP:
groupId = HazelcastManager.groupId+"-near-cache-unsafe-sync-map";
break;
case HAZELCAST_NEAR_CACHE_UNSAFE_ASYNC_MAP:
groupId = HazelcastManager.groupId+"-near-cache-unsafe-async-map";
break;
case HAZELCAST_LOCAL_CACHE:
groupId = HazelcastManager.groupId+"-local-cache";
break;
case HAZELCAST_REPLICATED_MAP:
groupId = HazelcastManager.groupId+"-replicated-map";
break;
case HAZELCAST_PNCOUNTER:
groupId = HazelcastManager.groupId+"-pncounter";
break;
case HAZELCAST_ATOMIC_LONG:
groupId = HazelcastManager.groupId+"-atomic-long";
break;
case HAZELCAST_ATOMIC_LONG_ASYNC:
groupId = HazelcastManager.groupId+"-atomic-long-async";
break;
default:
throw new PolicyException("Hazelcast type '"+type+"' unsupported");
}
content = content.replace("cluster-name:", "cluster-name: "+groupId+"\n#cluster-name:");
debug("Inizializzo hazelcast con la seguente configurazione (cluster-id"+groupId+"): " + content);
InMemoryYamlConfig hazelcastConfig = new InMemoryYamlConfig(content);
hazelcastConfig.setClusterName(groupId);
setNetwork(hazelcastConfig, content, groupId);
HazelcastInstance hazelcast = Hazelcast.newHazelcastInstance(hazelcastConfig);
if(hazelcast==null) {
throw new PolicyException("Hazelcast init failed");
}
return hazelcast;
}
private static void setNetwork(InMemoryYamlConfig hazelcastConfig, String hazelcastConfigContent, String groupId) throws PolicyException {
Config sharedConfig = null;
if(HazelcastManager.shareConfigFile!=null) {
try {
// La valido ma poi la ricostruiro' tutte le volte per poi sostituire la porta
try(FileInputStream fin = new FileInputStream(HazelcastManager.shareConfigFile)){
YamlConfigBuilder builder = new YamlConfigBuilder(fin);
sharedConfig = builder.build();
}
}catch(Throwable t) {
// e' gia' stato validato in fase di inizializzazione. Un errore qua non dovrebbe succedere
throw new PolicyException("Configuration '"+shareConfigFile.getAbsolutePath()+"' is not valid yaml config (YamlConfigBuilder): "+t.getMessage(),t);
}
}
if(sharedConfig!=null && sharedConfig.getNetworkConfig()!=null) {
NetworkConfig ncShared = sharedConfig.getNetworkConfig();
NetworkConfig ncInstance = hazelcastConfig.getNetworkConfig();
Integer portHazelcastConfigInstance = null;
if(ncInstance==null) {
// non dovrebbe mai esserlo, essendo definita una porta per ogni tipo di istanza
ncInstance = new NetworkConfig();
hazelcastConfig.setNetworkConfig(ncInstance);
}
else {
portHazelcastConfigInstance = hazelcastConfig.getNetworkConfig().getPort();
}
JsonNode instanceNode = null;
try {
instanceNode = yamlUtils.getAsNode(hazelcastConfigContent);
}catch(Throwable t) {
throw new PolicyException("Configuration '"+groupId+"' is not valid yaml config:\n"+hazelcastConfigContent);
}
// overrides the public address of a member
if(ncShared.getPublicAddress()!=null) {
boolean definedInstance = isDefined("public-address", yamlUtils, engine, instanceNode);
if(!definedInstance) {
ncInstance.setPublicAddress(ncShared.getPublicAddress());
debug("(cluster-id"+groupId+") override public-address: " + ncInstance.getPublicAddress());
}
}
// overrides only defined outbound ports
if( (ncShared.getOutboundPorts()!=null && !ncShared.getOutboundPorts().isEmpty())
||
(ncShared.getOutboundPortDefinitions()!=null && !ncShared.getOutboundPortDefinitions().isEmpty())) {
boolean definedInstance = isDefined("outbound-ports", yamlUtils, engine, instanceNode);
if(!definedInstance) {
if(ncShared.getOutboundPorts()!=null && !ncShared.getOutboundPorts().isEmpty()){
ncInstance.setOutboundPorts(ncShared.getOutboundPorts());
debug("(cluster-id"+groupId+") override outbound-ports: " + ncInstance.getOutboundPorts());
}
if((ncShared.getOutboundPortDefinitions()!=null && !ncShared.getOutboundPortDefinitions().isEmpty())) {
ncInstance.setOutboundPortDefinitions(ncShared.getOutboundPortDefinitions());
debug("(cluster-id"+groupId+") override outbound-ports (definitions): " + ncInstance.getOutboundPortDefinitions());
}
}
}
// If you set the reuse-address element to true, the TIME_WAIT state is ignored and you can bind the member to the same port again
boolean definedShared = isDefined("reuse-address", yamlUtils, engine, HazelcastManager.sharedConfigNode);
if(definedShared) {
boolean definedInstance = isDefined("reuse-address", yamlUtils, engine, instanceNode);
if(!definedInstance) {
ncInstance.setReuseAddress(ncShared.isReuseAddress());
debug("(cluster-id"+groupId+") override reuse-address: " + ncInstance.isReuseAddress());
}
}
// the ports that Hazelcast uses to communicate between cluster members
//ncShared.getPort();
//ncShared.getPortCount();
//ncShared.isPortAutoIncrement();
// L'unico elemento che viene ignorato se presente è il 'port'.
// Anche se definito verrà ignorato poichè ogni singola configurazione utilizzata in GovWay richiede una porta dedicata.
// Per modificarla si deve agire nel file govway.hazelcast-*.yaml presente all'interno dell'archivio govway.ear,
// modificandolo direttamente dentro l'archivio o riportandolo nella directory di configurazione esterna.
// override join
if(ncShared.getJoin()!=null) {
boolean definedInstance = isDefined("join", yamlUtils, engine, instanceNode);
if(!definedInstance) {
ncInstance.setJoin(ncShared.getJoin());
if(ncInstance.getJoin().getTcpIpConfig()!=null) {
if(portHazelcastConfigInstance!=null && portHazelcastConfigInstance.intValue()>0) {
if(ncInstance.getJoin().getTcpIpConfig().getMembers()!=null && !ncInstance.getJoin().getTcpIpConfig().getMembers().isEmpty()) {
// Se vengono usate le porte, devo clonare la lista e sistemarla con la porta usata sull'attuale istanza
List<String> newList = new ArrayList<>();
for (String member : ncInstance.getJoin().getTcpIpConfig().getMembers()) {
try {
if(member.contains(GOVWAY_INSTANCE_PORT)) {
newList.add(member.replace(GOVWAY_INSTANCE_PORT, portHazelcastConfigInstance.intValue()+""));
}
else if(member.contains(GOVWAY_INSTANCE_PORT.toLowerCase())) {
newList.add(member.replace(GOVWAY_INSTANCE_PORT.toLowerCase(), portHazelcastConfigInstance.intValue()+""));
}
else {
newList.add(member);
}
/*
int indexOfFirst = member.indexOf(":");
if(indexOfFirst>0 && indexOfFirst!=(member.length()-1)) {
int indexOfSecondIpv6 = member.indexOf(":",(indexOfFirst+1));
if(indexOfSecondIpv6<0) {
usePortIpv4=true;
}
}
if(usePortIpv4) {
String [] split = member.split(":");
String newIp = split[0] + ":" + portHazelcastConfigInstance.intValue();
newList.add(newIp);
}
else {
newList.add(member);
}*/
}catch(Throwable t) {
error("(cluster-id"+groupId+") tpcip analisi member '"+member+"' fallita: " +t.getMessage(),t);
//newList.add(member);
}
}
ncInstance.getJoin().getTcpIpConfig().setMembers(newList);
}
}
}
debug("(cluster-id"+groupId+") override join: " + ncInstance.getJoin());
}
}
// override interface
if(ncShared.getInterfaces()!=null && ncShared.getInterfaces().isEnabled()) {
boolean definedInstance = isDefined("interfaces", yamlUtils, engine, instanceNode);
if(!definedInstance) {
ncInstance.setInterfaces(ncShared.getInterfaces());
debug("(cluster-id"+groupId+") override interfaces: " + ncInstance.getInterfaces());
}
}
// override altre configurazioni avanzate
// example yaml: https://github.com/hazelcast/hazelcast/blob/master/hazelcast/src/test/java/com/hazelcast/config/YamlConfigBuilderTest.java
if(ncShared.getIcmpFailureDetectorConfig()!=null && ncShared.getIcmpFailureDetectorConfig().isEnabled()) {
boolean definedInstance = isDefined("failure-detector", yamlUtils, engine, instanceNode);
if(!definedInstance) {
ncInstance.setIcmpFailureDetectorConfig(ncShared.getIcmpFailureDetectorConfig());
debug("(cluster-id"+groupId+") override failure-detector icmp: " + ncInstance.getIcmpFailureDetectorConfig());
}
}
if(ncShared.getMemberAddressProviderConfig()!=null && ncShared.getMemberAddressProviderConfig().isEnabled()) {
boolean definedInstance = isDefined("member-address-provider", yamlUtils, engine, instanceNode);
if(!definedInstance) {
ncInstance.setMemberAddressProviderConfig(ncShared.getMemberAddressProviderConfig());
debug("(cluster-id"+groupId+") override member-address-provider: " + ncInstance.getMemberAddressProviderConfig());
}
}
if(ncShared.getMemcacheProtocolConfig()!=null && ncShared.getMemcacheProtocolConfig().isEnabled()) {
boolean definedInstance = isDefined("memcache-protocol", yamlUtils, engine, instanceNode);
if(!definedInstance) {
ncInstance.setMemcacheProtocolConfig(ncShared.getMemcacheProtocolConfig());
debug("(cluster-id"+groupId+") override memcache-protocol: " + ncInstance.getMemcacheProtocolConfig());
}
}
if(ncShared.getRestApiConfig()!=null && ncShared.getRestApiConfig().isEnabled()) {
boolean definedInstance = isDefined("rest-api", yamlUtils, engine, instanceNode);
if(!definedInstance) {
ncInstance.setRestApiConfig(ncShared.getRestApiConfig());
debug("(cluster-id"+groupId+") override rest-api: " + ncInstance.getRestApiConfig());
}
}
if(ncShared.getSocketInterceptorConfig()!=null && ncShared.getSocketInterceptorConfig().getClassName()!=null) {
boolean definedInstance = isDefined("socket-interceptor", yamlUtils, engine, instanceNode);
if(!definedInstance) {
ncInstance.setSocketInterceptorConfig(ncShared.getSocketInterceptorConfig());
debug("(cluster-id"+groupId+") override socket-interceptor: " + ncInstance.getSocketInterceptorConfig());
}
}
if(ncShared.getSSLConfig()!=null && ncShared.getSSLConfig().isEnabled()) {
boolean definedInstance = isDefined("ssl", yamlUtils, engine, instanceNode);
if(!definedInstance) {
ncInstance.setSSLConfig(ncShared.getSSLConfig());
debug("(cluster-id"+groupId+") override ssl: " + ncInstance.getSSLConfig());
}
}
/* Deprecata
if(ncShared.getSymmetricEncryptionConfig()!=null && ncShared.getSymmetricEncryptionConfig().isEnabled()) {
boolean definedInstance = isDefined("symmetric-encryption", yamlUtils, engine, node);
if(!definedInstance) {
ncInstance.setSymmetricEncryptionConfig(ncShared.getSymmetricEncryptionConfig());
debug("(cluster-id"+groupId+") override symmetric-encryption: " + ncInstance.getSymmetricEncryptionConfig());
}
}
*/
}
}
private static boolean isDefined(String pattern, YAMLUtils yamlUtils, JsonPathExpressionEngine engine, JsonNode node) throws PolicyException {
String prefixPattern = "$.hazelcast.network.";
try {
JsonNode result = engine.getJsonNodeMatchPattern(node, prefixPattern+pattern);
return result!=null;
}catch(JsonPathNotFoundException notFound) {
return false;
}catch(Throwable t) {
throw new PolicyException(t.getMessage(),t);
}
}
private static void debug(String msg) {
if(HazelcastManager.log!=null) {
HazelcastManager.log.debug(msg);
}
if(HazelcastManager.logStartup!=null) {
HazelcastManager.logStartup.debug(msg);
}
}
private static void info(String msg) {
if(HazelcastManager.log!=null) {
HazelcastManager.log.info(msg);
}
if(HazelcastManager.logStartup!=null) {
HazelcastManager.logStartup.info(msg);
}
}
private static void error(String msg, Throwable e) {
if(HazelcastManager.log!=null) {
HazelcastManager.log.error(msg,e);
}
if(HazelcastManager.logStartup!=null) {
HazelcastManager.logStartup.error(msg,e);
}
}
public static synchronized void close() {
if(HazelcastManager.staticMapInstance!=null && !HazelcastManager.staticMapInstance.isEmpty()) {
for (PolicyGroupByActiveThreadsType type : HazelcastManager.staticMapInstance.keySet()) {
HazelcastInstance hazelcast = HazelcastManager.staticMapInstance.get(type);
try {
hazelcast.shutdown();
}catch(Throwable t) {
HazelcastManager.log.debug("Hazelcast '"+type+"' shutdown failed: " + t.getMessage(),t);
}
}
}
}
}