Added bootstrap code to auto-generate cluster.json, tablets.json and hazelcast.xml using local site address.
Change-Id: I3210aadc63403022b4aac3bc3591736801240b50
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedClusterStore.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedClusterStore.java
index dec2c23..0472cff 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedClusterStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedClusterStore.java
@@ -57,6 +57,8 @@
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
+import static java.net.NetworkInterface.getNetworkInterfaces;
+import static java.util.Collections.list;
import static org.onlab.util.Tools.groupedThreads;
import static org.slf4j.LoggerFactory.getLogger;
@@ -70,7 +72,7 @@
extends AbstractStore<ClusterEvent, ClusterStoreDelegate>
implements ClusterStore {
- private final Logger log = getLogger(DistributedClusterStore.class);
+ private static final Logger log = getLogger(DistributedClusterStore.class);
// TODO: make these configurable.
private static final int HEARTBEAT_FD_PORT = 2419;
@@ -81,14 +83,16 @@
private static final String CLUSTER_DEFINITION_FILE = "cluster.json";
private static final String HEARTBEAT_MESSAGE = "onos-cluster-heartbeat";
+ public static final int DEFAULT_PORT = 9876;
+
private static final KryoSerializer SERIALIZER = new KryoSerializer() {
@Override
protected void setupKryoPool() {
serializerPool = KryoNamespace.newBuilder()
- .register(KryoNamespaces.API)
- .register(HeartbeatMessage.class)
- .build()
- .populate(1);
+ .register(KryoNamespaces.API)
+ .register(HeartbeatMessage.class)
+ .build()
+ .populate(1);
}
};
@@ -112,18 +116,22 @@
@Activate
public void activate() {
- File clusterDefinitionFile = new File(CONFIG_DIR,
- CLUSTER_DEFINITION_FILE);
+ File clusterDefinitionFile = new File(CONFIG_DIR, CLUSTER_DEFINITION_FILE);
+ ClusterDefinitionStore clusterDefinitionStore =
+ new ClusterDefinitionStore(clusterDefinitionFile.getPath());
+
+ if (!clusterDefinitionFile.exists()) {
+ createDefaultClusterDefinition(clusterDefinitionStore);
+ }
try {
- clusterDefinition = new ClusterDefinitionStore(
- clusterDefinitionFile.getPath()).read();
+ clusterDefinition = clusterDefinitionStore.read();
seedNodes = ImmutableSet
.copyOf(clusterDefinition.getNodes())
.stream()
- .map(nodeInfo -> new DefaultControllerNode(new NodeId(
- nodeInfo.getId()), IpAddress.valueOf(nodeInfo
- .getIp()), nodeInfo.getTcpPort()))
+ .map(nodeInfo -> new DefaultControllerNode(new NodeId(nodeInfo.getId()),
+ IpAddress.valueOf(nodeInfo.getIp()),
+ nodeInfo.getTcpPort()))
.collect(Collectors.toSet());
} catch (IOException e) {
throw new IllegalStateException(
@@ -148,16 +156,51 @@
+ " failure detector communication channel.", e);
}
messagingService.registerHandler(HEARTBEAT_MESSAGE,
- new HeartbeatMessageHandler(), heartBeatMessageHandler);
+ new HeartbeatMessageHandler(), heartBeatMessageHandler);
failureDetector = new PhiAccrualFailureDetector();
heartBeatSender.scheduleWithFixedDelay(this::heartbeat, 0,
- HEARTBEAT_INTERVAL_MS, TimeUnit.MILLISECONDS);
+ HEARTBEAT_INTERVAL_MS, TimeUnit.MILLISECONDS);
log.info("Started");
}
+ private void createDefaultClusterDefinition(ClusterDefinitionStore store) {
+ // Assumes IPv4 is returned.
+ String ip = DistributedClusterStore.getSiteLocalAddress();
+ String ipPrefix = ip.replaceFirst("\\.[0-9]*$", ".*");
+ NodeInfo node = NodeInfo.from(ip, ip, DEFAULT_PORT);
+ try {
+ store.write(ClusterDefinition.from(ImmutableSet.of(node), ipPrefix));
+ } catch (IOException e) {
+ log.warn("Unable to write default cluster definition", e);
+ }
+ }
+
+ /**
+ * Returns the site local address if one can be found, loopback otherwise.
+ *
+ * @return site-local address in string form
+ */
+ public static String getSiteLocalAddress() {
+ try {
+ for (NetworkInterface nif : list(getNetworkInterfaces())) {
+ for (InetAddress address : list(nif.getInetAddresses())) {
+ if (address.getAddress()[0] == (byte) 0xC0) {
+ return address.toString().substring(1);
+ }
+ }
+ }
+ return InetAddress.getLoopbackAddress().toString().substring(1);
+
+ } catch (SocketException e) {
+ log.error("Unable to get network interfaces", e);
+ }
+
+ return null;
+ }
+
@Deactivate
public void deactivate() {
try {
@@ -300,7 +343,7 @@
NetworkInterface.getNetworkInterfaces();
while (interfaces.hasMoreElements()) {
NetworkInterface iface = interfaces.nextElement();
- Enumeration<InetAddress> inetAddresses = iface.getInetAddresses();
+ Enumeration<InetAddress> inetAddresses = iface.getInetAddresses();
while (inetAddresses.hasMoreElements()) {
IpAddress ip = IpAddress.valueOf(inetAddresses.nextElement());
if (AddressUtil.matchInterface(ip.toString(), clusterDefinition.getIpPrefix())) {