In preparation for dynamic clustering support:
- Added Cluster metadata service and metadata store interfaces
- Added a static cluster metadata store implementation that is backed by a local file.
- Consolidated the existing cluster.json and tablets.json metadata files into a single cluster.json file that has all the cluster related metadata.
- Removed dependency on ONOS_NIC env variable.
Change-Id: Ia0a8bb69740caecdcdde71a9408be37c56ae2504
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/ClusterDefinition.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/ClusterDefinition.java
deleted file mode 100644
index 75f05a3..0000000
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/ClusterDefinition.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Copyright 2015 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.cluster.impl;
-
-import java.util.Set;
-
-import com.google.common.collect.ImmutableSet;
-
-/**
- * Cluster definition.
- */
-public class ClusterDefinition {
-
- private Set<NodeInfo> nodes;
- private String ipPrefix;
-
- /**
- * Creates a new cluster definition.
- * @param nodes cluster nodes information
- * @param ipPrefix ip prefix common to all cluster nodes
- * @return cluster definition
- */
- public static ClusterDefinition from(Set<NodeInfo> nodes, String ipPrefix) {
- ClusterDefinition definition = new ClusterDefinition();
- definition.ipPrefix = ipPrefix;
- definition.nodes = ImmutableSet.copyOf(nodes);
- return definition;
- }
-
- /**
- * Returns set of cluster nodes info.
- * @return cluster nodes info
- */
- public Set<NodeInfo> getNodes() {
- return ImmutableSet.copyOf(nodes);
- }
-
- /**
- * Returns ipPrefix in dotted decimal notion.
- * @return ip prefix
- */
- public String getIpPrefix() {
- return ipPrefix;
- }
-}
\ No newline at end of file
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/ClusterDefinitionManager.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/ClusterDefinitionManager.java
deleted file mode 100644
index 8b0001d..0000000
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/ClusterDefinitionManager.java
+++ /dev/null
@@ -1,194 +0,0 @@
-/*
- * Copyright 2015 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.cluster.impl;
-
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Sets;
-import org.apache.felix.scr.annotations.Activate;
-import org.apache.felix.scr.annotations.Component;
-import org.apache.felix.scr.annotations.Deactivate;
-import org.apache.felix.scr.annotations.Service;
-import org.onlab.packet.IpAddress;
-import org.onosproject.cluster.ClusterDefinitionService;
-import org.onosproject.cluster.ControllerNode;
-import org.onosproject.cluster.DefaultControllerNode;
-import org.onosproject.cluster.NodeId;
-import org.onosproject.store.consistent.impl.DatabaseDefinition;
-import org.onosproject.store.consistent.impl.DatabaseDefinitionStore;
-import org.slf4j.Logger;
-
-import java.io.File;
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.NetworkInterface;
-import java.net.SocketException;
-import java.util.Enumeration;
-import java.util.Set;
-import java.util.stream.Collectors;
-
-import static java.net.NetworkInterface.getNetworkInterfaces;
-import static java.util.Collections.list;
-import static org.onosproject.cluster.DefaultControllerNode.DEFAULT_PORT;
-import static org.onosproject.store.consistent.impl.DatabaseManager.PARTITION_DEFINITION_FILE;
-import static org.slf4j.LoggerFactory.getLogger;
-
-/**
- * Implementation of ClusterDefinitionService.
- */
-@Component(immediate = true)
-@Service
-public class ClusterDefinitionManager implements ClusterDefinitionService {
-
- public static final String CLUSTER_DEFINITION_FILE = "../config/cluster.json";
- private static final String ONOS_NIC = "ONOS_NIC";
- private static final Logger log = getLogger(ClusterDefinitionManager.class);
- private ControllerNode localNode;
- private Set<ControllerNode> seedNodes;
-
- @Activate
- public void activate() {
- File clusterDefinitionFile = new File(CLUSTER_DEFINITION_FILE);
- ClusterDefinitionStore clusterDefinitionStore =
- new ClusterDefinitionStore(clusterDefinitionFile.getPath());
-
- if (!clusterDefinitionFile.exists()) {
- createDefaultClusterDefinition(clusterDefinitionStore);
- }
-
- try {
- ClusterDefinition clusterDefinition = clusterDefinitionStore.read();
- establishSelfIdentity(clusterDefinition);
- seedNodes = ImmutableSet
- .copyOf(clusterDefinition.getNodes())
- .stream()
- .filter(n -> !localNode.id().equals(new NodeId(n.getId())))
- .map(n -> new DefaultControllerNode(new NodeId(n.getId()),
- IpAddress.valueOf(n.getIp()),
- n.getTcpPort()))
- .collect(Collectors.toSet());
- } catch (IOException e) {
- throw new IllegalStateException("Failed to read cluster definition.", e);
- }
-
- log.info("Started");
- }
-
- @Deactivate
- public void deactivate() {
- log.info("Stopped");
- }
-
- @Override
- public ControllerNode localNode() {
- return localNode;
- }
-
- @Override
- public Set<ControllerNode> seedNodes() {
- return seedNodes;
- }
-
- @Override
- public void formCluster(Set<ControllerNode> nodes, String ipPrefix) {
- try {
- Set<NodeInfo> infos = Sets.newHashSet();
- nodes.forEach(n -> infos.add(NodeInfo.from(n.id().toString(),
- n.ip().toString(),
- n.tcpPort())));
-
- ClusterDefinition cdef = ClusterDefinition.from(infos, ipPrefix);
- new ClusterDefinitionStore(CLUSTER_DEFINITION_FILE).write(cdef);
-
- DatabaseDefinition ddef = DatabaseDefinition.from(infos);
- new DatabaseDefinitionStore(PARTITION_DEFINITION_FILE).write(ddef);
- } catch (IOException e) {
- log.error("Unable to form cluster", e);
- }
- }
-
- private IpAddress findLocalIp(ClusterDefinition clusterDefinition) throws SocketException {
- Enumeration<NetworkInterface> interfaces =
- NetworkInterface.getNetworkInterfaces();
- while (interfaces.hasMoreElements()) {
- NetworkInterface iface = interfaces.nextElement();
- Enumeration<InetAddress> inetAddresses = iface.getInetAddresses();
- while (inetAddresses.hasMoreElements()) {
- IpAddress ip = IpAddress.valueOf(inetAddresses.nextElement());
- if (clusterDefinition.getNodes().stream()
- .map(NodeInfo::getIp)
- .map(IpAddress::valueOf)
- .anyMatch(nodeIp -> ip.equals(nodeIp))) {
- return ip;
- }
- }
- }
- throw new IllegalStateException("Unable to determine local ip");
- }
-
- private void establishSelfIdentity(ClusterDefinition clusterDefinition) {
- try {
- IpAddress ip = findLocalIp(clusterDefinition);
- localNode = new DefaultControllerNode(new NodeId(ip.toString()), ip);
- } catch (SocketException e) {
- throw new IllegalStateException("Cannot determine local IP", e);
- }
- }
-
- private void createDefaultClusterDefinition(ClusterDefinitionStore store) {
- // Assumes IPv4 is returned.
- String ip = getSiteLocalAddress();
- String ipPrefix = ip.replaceFirst("\\.[0-9]*$", ".*");
- NodeInfo node = NodeInfo.from(ip, ip, DEFAULT_PORT);
- try {
- store.write(ClusterDefinition.from(ImmutableSet.of(node), ipPrefix));
- } catch (IOException e) {
- log.warn("Unable to write default cluster definition", e);
- }
- }
-
- /**
- * Returns the address that matches the IP prefix given in ONOS_NIC
- * environment variable if one was specified, or the first site local
- * address if one can be found or the loopback address otherwise.
- *
- * @return site-local address in string form
- */
- public static String getSiteLocalAddress() {
- try {
- String ipPrefix = System.getenv(ONOS_NIC);
- for (NetworkInterface nif : list(getNetworkInterfaces())) {
- for (InetAddress address : list(nif.getInetAddresses())) {
- IpAddress ip = IpAddress.valueOf(address);
- if (ipPrefix == null && address.isSiteLocalAddress() ||
- ipPrefix != null && matchInterface(ip.toString(), ipPrefix)) {
- return ip.toString();
- }
- }
- }
- } catch (SocketException e) {
- log.error("Unable to get network interfaces", e);
- }
-
- return IpAddress.valueOf(InetAddress.getLoopbackAddress()).toString();
- }
-
- // Indicates whether the specified interface address matches the given prefix.
- // FIXME: Add a facility to IpPrefix to make this more robust
- private static boolean matchInterface(String ip, String ipPrefix) {
- String s = ipPrefix.replaceAll("\\.\\*", "");
- return ip.startsWith(s);
- }
-}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/ClusterDefinitionStore.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/ClusterDefinitionStore.java
deleted file mode 100644
index 2a2f4dc..0000000
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/ClusterDefinitionStore.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Copyright 2014-2015 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.cluster.impl;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.io.Files;
-
-import java.io.File;
-import java.io.IOException;
-
-/**
- * Allows for reading and writing cluster definition as a JSON file.
- */
-public class ClusterDefinitionStore {
-
- private final File file;
-
- /**
- * Creates a reader/writer of the cluster definition file.
- * @param filePath location of the definition file
- */
- public ClusterDefinitionStore(String filePath) {
- file = new File(filePath);
- }
-
- /**
- * Returns the cluster definition.
- * @return cluster definition
- * @throws IOException when I/O exception of some sort has occurred
- */
- public ClusterDefinition read() throws IOException {
- ObjectMapper mapper = new ObjectMapper();
- return mapper.readValue(file, ClusterDefinition.class);
- }
-
- /**
- * Writes the specified cluster definition to file.
- * @param definition cluster definition
- * @throws IOException when I/O exception of some sort has occurred
- */
- public void write(ClusterDefinition definition) throws IOException {
- checkNotNull(definition);
- // write back to file
- Files.createParentDirs(file);
- ObjectMapper mapper = new ObjectMapper();
- mapper.writeValue(file, definition);
- }
-}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedClusterStore.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedClusterStore.java
index 859efeb..3bb6a70 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedClusterStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedClusterStore.java
@@ -27,8 +27,8 @@
import org.joda.time.DateTime;
import org.onlab.packet.IpAddress;
import org.onlab.util.KryoNamespace;
-import org.onosproject.cluster.ClusterDefinitionService;
import org.onosproject.cluster.ClusterEvent;
+import org.onosproject.cluster.ClusterMetadataService;
import org.onosproject.cluster.ClusterStore;
import org.onosproject.cluster.ClusterStoreDelegate;
import org.onosproject.cluster.ControllerNode;
@@ -99,14 +99,14 @@
private ControllerNode localNode;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected ClusterDefinitionService clusterDefinitionService;
+ protected ClusterMetadataService clusterMetadataService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected MessagingService messagingService;
@Activate
public void activate() {
- localNode = clusterDefinitionService.localNode();
+ localNode = clusterMetadataService.getLocalNode();
messagingService.registerHandler(HEARTBEAT_MESSAGE,
new HeartbeatMessageHandler(), heartBeatMessageHandler);
@@ -116,9 +116,6 @@
heartBeatSender.scheduleWithFixedDelay(this::heartbeat, 0,
HEARTBEAT_INTERVAL_MS, TimeUnit.MILLISECONDS);
- addNode(localNode);
- updateState(localNode.id(), State.ACTIVE);
-
log.info("Started");
}
@@ -188,7 +185,7 @@
private void addNode(ControllerNode node) {
allNodes.put(node.id(), node);
- updateState(node.id(), State.INACTIVE);
+ updateState(node.id(), node.equals(localNode) ? State.ACTIVE : State.INACTIVE);
notifyDelegate(new ClusterEvent(ClusterEvent.Type.INSTANCE_ADDED, node));
}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/StaticClusterMetadataStore.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/StaticClusterMetadataStore.java
new file mode 100644
index 0000000..9f6c413
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/StaticClusterMetadataStore.java
@@ -0,0 +1,221 @@
+package org.onosproject.store.cluster.impl;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static java.net.NetworkInterface.getNetworkInterfaces;
+import static org.slf4j.LoggerFactory.getLogger;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.NetworkInterface;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Service;
+import org.onlab.packet.IpAddress;
+import org.onosproject.cluster.ClusterMetadata;
+import org.onosproject.cluster.ClusterMetadataEvent;
+import org.onosproject.cluster.ClusterMetadataStore;
+import org.onosproject.cluster.ClusterMetadataStoreDelegate;
+import org.onosproject.cluster.ControllerNode;
+import org.onosproject.cluster.DefaultControllerNode;
+import org.onosproject.cluster.NodeId;
+import org.onosproject.cluster.Partition;
+import org.onosproject.store.AbstractStore;
+import org.onosproject.store.service.Versioned;
+import org.slf4j.Logger;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.DeserializationContext;
+import com.fasterxml.jackson.databind.JsonDeserializer;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.JsonSerializer;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializerProvider;
+import com.fasterxml.jackson.databind.module.SimpleModule;
+import com.google.common.base.Throwables;
+import com.google.common.collect.Lists;
+import com.google.common.io.Files;
+
+/**
+ * ClusterMetadataStore backed by a local file.
+ */
+@Component(immediate = true, enabled = true)
+@Service
+public class StaticClusterMetadataStore
+ extends AbstractStore<ClusterMetadataEvent, ClusterMetadataStoreDelegate>
+ implements ClusterMetadataStore {
+
+ private final Logger log = getLogger(getClass());
+ private static final String CLUSTER_METADATA_FILE = "../config/cluster.json";
+ private static final int DEFAULT_ONOS_PORT = 9876;
+ private final File metadataFile = new File(CLUSTER_METADATA_FILE);
+ private AtomicReference<ClusterMetadata> metadata = new AtomicReference<>();
+ private ObjectMapper mapper;
+ private long version;
+
+ @Activate
+ public void activate() {
+ mapper = new ObjectMapper();
+ SimpleModule module = new SimpleModule();
+ module.addSerializer(NodeId.class, new NodeIdSerializer());
+ module.addDeserializer(NodeId.class, new NodeIdDeserializer());
+ module.addSerializer(ControllerNode.class, new ControllerNodeSerializer());
+ module.addDeserializer(ControllerNode.class, new ControllerNodeDeserializer());
+ mapper.registerModule(module);
+ File metadataFile = new File(CLUSTER_METADATA_FILE);
+ if (metadataFile.exists()) {
+ try {
+ metadata.set(mapper.readValue(metadataFile, ClusterMetadata.class));
+ version = metadataFile.lastModified();
+ } catch (IOException e) {
+ Throwables.propagate(e);
+ }
+ } else {
+ String localIp = getSiteLocalAddress();
+ ControllerNode localNode =
+ new DefaultControllerNode(new NodeId(localIp), IpAddress.valueOf(localIp), DEFAULT_ONOS_PORT);
+ metadata.set(ClusterMetadata.builder()
+ .withName("default")
+ .withControllerNodes(Arrays.asList(localNode))
+ .withPartitions(Lists.newArrayList(new Partition("p1", Lists.newArrayList(localNode.id()))))
+ .build());
+ version = System.currentTimeMillis();
+ }
+ log.info("Started");
+ }
+
+ @Deactivate
+ public void deactivate() {
+ log.info("Stopped");
+ }
+
+ @Override
+ public void setDelegate(ClusterMetadataStoreDelegate delegate) {
+ checkNotNull(delegate, "Delegate cannot be null");
+ this.delegate = delegate;
+ }
+
+ @Override
+ public void unsetDelegate(ClusterMetadataStoreDelegate delegate) {
+ this.delegate = null;
+ }
+
+ @Override
+ public boolean hasDelegate() {
+ return this.delegate != null;
+ }
+
+ @Override
+ public Versioned<ClusterMetadata> getClusterMetadata() {
+ return new Versioned<>(metadata.get(), version);
+ }
+
+ @Override
+ public void setClusterMetadata(ClusterMetadata metadata) {
+ checkNotNull(metadata);
+ try {
+ Files.createParentDirs(metadataFile);
+ mapper.writeValue(metadataFile, metadata);
+ this.metadata.set(metadata);
+ } catch (IOException e) {
+ Throwables.propagate(e);
+ }
+ }
+
+ @Override
+ public void setActiveReplica(String partitionId, NodeId nodeId) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void unsetActiveReplica(String partitionId, NodeId nodeId) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Collection<NodeId> getActiveReplicas(String partitionId) {
+ return metadata.get().getPartitions()
+ .stream()
+ .filter(r -> r.getName().equals(partitionId))
+ .findFirst()
+ .map(r -> r.getMembers())
+ .orElse(null);
+ }
+
+ private static class ControllerNodeSerializer extends JsonSerializer<ControllerNode> {
+ @Override
+ public void serialize(ControllerNode node, JsonGenerator jgen, SerializerProvider provider)
+ throws IOException, JsonProcessingException {
+ jgen.writeStartObject();
+ jgen.writeStringField("id", node.id().toString());
+ jgen.writeStringField("ip", node.ip().toString());
+ jgen.writeNumberField("port", node.tcpPort());
+ jgen.writeEndObject();
+ }
+ }
+
+ private static class ControllerNodeDeserializer extends JsonDeserializer<ControllerNode> {
+ @Override
+ public ControllerNode deserialize(JsonParser jp, DeserializationContext ctxt)
+ throws IOException, JsonProcessingException {
+ JsonNode node = jp.getCodec().readTree(jp);
+ NodeId nodeId = new NodeId(node.get("id").textValue());
+ IpAddress ip = IpAddress.valueOf(node.get("ip").textValue());
+ int port = node.get("port").asInt();
+ return new DefaultControllerNode(nodeId, ip, port);
+ }
+ }
+
+ private static class NodeIdSerializer extends JsonSerializer<NodeId> {
+ @Override
+ public void serialize(NodeId nodeId, JsonGenerator jgen, SerializerProvider provider)
+ throws IOException, JsonProcessingException {
+ jgen.writeString(nodeId.toString());
+ }
+ }
+
+ private class NodeIdDeserializer extends JsonDeserializer<NodeId> {
+ @Override
+ public NodeId deserialize(JsonParser jp, DeserializationContext ctxt)
+ throws IOException, JsonProcessingException {
+ JsonNode node = jp.getCodec().readTree(jp);
+ return new NodeId(node.asText());
+ }
+ }
+
+
+ private static String getSiteLocalAddress() {
+ Function<NetworkInterface, IpAddress> ipLookup = nif -> {
+ for (InetAddress address : Collections.list(nif.getInetAddresses())) {
+ if (address.isSiteLocalAddress()) {
+ return IpAddress.valueOf(address);
+ }
+ }
+ return null;
+ };
+ try {
+ IpAddress ip = ipLookup.apply(NetworkInterface.getByName("eth0"));
+ if (ip != null) {
+ return ip.toString();
+ }
+ for (NetworkInterface nif : Collections.list(getNetworkInterfaces())) {
+ ip = ipLookup.apply(nif);
+ if (ip != null) {
+ return ip.toString();
+ }
+ }
+ } catch (Exception e) {
+ throw new IllegalStateException("Unable to get network interfaces", e);
+ }
+ return IpAddress.valueOf(InetAddress.getLoopbackAddress()).toString();
+ }
+}
\ No newline at end of file