ONOS-1982: MessagingService is now a OSGi service. Has implementations based on Netty and IOLoop
Change-Id: Ia4c99de18e91be1b49bd1fddd86fe89fb83e859c
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/ClusterDefinitionManager.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/ClusterDefinitionManager.java
new file mode 100644
index 0000000..4d12723
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/ClusterDefinitionManager.java
@@ -0,0 +1,172 @@
+package org.onosproject.store.cluster.impl;
+
+import static com.hazelcast.util.AddressUtil.matchInterface;
+import static java.net.NetworkInterface.getNetworkInterfaces;
+import static java.util.Collections.list;
+import static org.onosproject.cluster.DefaultControllerNode.DEFAULT_PORT;
+import static org.onosproject.store.consistent.impl.DatabaseManager.PARTITION_DEFINITION_FILE;
+import static org.slf4j.LoggerFactory.getLogger;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.NetworkInterface;
+import java.net.SocketException;
+import java.util.Enumeration;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Service;
+import org.onlab.packet.IpAddress;
+import org.onosproject.cluster.ClusterDefinitionService;
+import org.onosproject.cluster.ControllerNode;
+import org.onosproject.cluster.DefaultControllerNode;
+import org.onosproject.cluster.NodeId;
+import org.onosproject.store.consistent.impl.DatabaseDefinition;
+import org.onosproject.store.consistent.impl.DatabaseDefinitionStore;
+import org.slf4j.Logger;
+
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
+import com.hazelcast.util.AddressUtil;
+
+/**
+ * Implementation of ClusterDefinitionService.
+ */
+@Component(immediate = true)
+@Service
+public class ClusterDefinitionManager implements ClusterDefinitionService {
+
+ public static final String CLUSTER_DEFINITION_FILE = "../config/cluster.json";
+ private static final String ONOS_NIC = "ONOS_NIC";
+ private static final Logger log = getLogger(ClusterDefinitionManager.class);
+ private ControllerNode localNode;
+ private Set<ControllerNode> seedNodes;
+
+ @Activate
+ public void activate() {
+ File clusterDefinitionFile = new File(CLUSTER_DEFINITION_FILE);
+ ClusterDefinitionStore clusterDefinitionStore =
+ new ClusterDefinitionStore(clusterDefinitionFile.getPath());
+
+ if (!clusterDefinitionFile.exists()) {
+ createDefaultClusterDefinition(clusterDefinitionStore);
+ }
+
+ try {
+ ClusterDefinition clusterDefinition = clusterDefinitionStore.read();
+ establishSelfIdentity(clusterDefinition);
+ seedNodes = ImmutableSet
+ .copyOf(clusterDefinition.getNodes())
+ .stream()
+ .filter(n -> !localNode.id().equals(new NodeId(n.getId())))
+ .map(n -> new DefaultControllerNode(new NodeId(n.getId()),
+ IpAddress.valueOf(n.getIp()),
+ n.getTcpPort()))
+ .collect(Collectors.toSet());
+ } catch (IOException e) {
+ throw new IllegalStateException("Failed to read cluster definition.", e);
+ }
+
+ log.info("Started");
+ }
+
+ @Deactivate
+ public void deactivate() {
+ log.info("Stopped");
+ }
+
+ @Override
+ public ControllerNode localNode() {
+ return localNode;
+ }
+
+ @Override
+ public Set<ControllerNode> seedNodes() {
+ return seedNodes;
+ }
+
+ @Override
+ public void formCluster(Set<ControllerNode> nodes, String ipPrefix) {
+ try {
+ Set<NodeInfo> infos = Sets.newHashSet();
+ nodes.forEach(n -> infos.add(NodeInfo.from(n.id().toString(),
+ n.ip().toString(),
+ n.tcpPort())));
+
+ ClusterDefinition cdef = ClusterDefinition.from(infos, ipPrefix);
+ new ClusterDefinitionStore(CLUSTER_DEFINITION_FILE).write(cdef);
+
+ DatabaseDefinition ddef = DatabaseDefinition.from(infos);
+ new DatabaseDefinitionStore(PARTITION_DEFINITION_FILE).write(ddef);
+ } catch (IOException e) {
+ log.error("Unable to form cluster", e);
+ }
+ }
+
+ private IpAddress findLocalIp(ClusterDefinition clusterDefinition) throws SocketException {
+ Enumeration<NetworkInterface> interfaces =
+ NetworkInterface.getNetworkInterfaces();
+ while (interfaces.hasMoreElements()) {
+ NetworkInterface iface = interfaces.nextElement();
+ Enumeration<InetAddress> inetAddresses = iface.getInetAddresses();
+ while (inetAddresses.hasMoreElements()) {
+ IpAddress ip = IpAddress.valueOf(inetAddresses.nextElement());
+ if (AddressUtil.matchInterface(ip.toString(), clusterDefinition.getIpPrefix())) {
+ return ip;
+ }
+ }
+ }
+ throw new IllegalStateException("Unable to determine local ip");
+ }
+
+ private void establishSelfIdentity(ClusterDefinition clusterDefinition) {
+ try {
+ IpAddress ip = findLocalIp(clusterDefinition);
+ localNode = new DefaultControllerNode(new NodeId(ip.toString()), ip);
+ } catch (SocketException e) {
+ throw new IllegalStateException("Cannot determine local IP", e);
+ }
+ }
+
+ private void createDefaultClusterDefinition(ClusterDefinitionStore store) {
+ // Assumes IPv4 is returned.
+ String ip = getSiteLocalAddress();
+ String ipPrefix = ip.replaceFirst("\\.[0-9]*$", ".*");
+ NodeInfo node = NodeInfo.from(ip, ip, DEFAULT_PORT);
+ try {
+ store.write(ClusterDefinition.from(ImmutableSet.of(node), ipPrefix));
+ } catch (IOException e) {
+ log.warn("Unable to write default cluster definition", e);
+ }
+ }
+
+ /**
+ * Returns the address that matches the IP prefix given in ONOS_NIC
+ * environment variable if one was specified, or the first site local
+ * address if one can be found or the loopback address otherwise.
+ *
+ * @return site-local address in string form
+ */
+ public static String getSiteLocalAddress() {
+ try {
+ String ipPrefix = System.getenv(ONOS_NIC);
+ for (NetworkInterface nif : list(getNetworkInterfaces())) {
+ for (InetAddress address : list(nif.getInetAddresses())) {
+ IpAddress ip = IpAddress.valueOf(address);
+ if (ipPrefix == null && address.isSiteLocalAddress() ||
+ ipPrefix != null && matchInterface(ip.toString(), ipPrefix)) {
+ return ip.toString();
+ }
+ }
+ }
+ } catch (SocketException e) {
+ log.error("Unable to get network interfaces", e);
+ }
+
+ return IpAddress.valueOf(InetAddress.getLoopbackAddress()).toString();
+ }
+}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedClusterStore.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedClusterStore.java
index 149a858..ec297e2 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedClusterStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedClusterStore.java
@@ -17,17 +17,17 @@
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-import com.hazelcast.util.AddressUtil;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.joda.time.DateTime;
-import org.onlab.netty.NettyMessagingManager;
import org.onlab.packet.IpAddress;
import org.onlab.util.KryoNamespace;
+import org.onosproject.cluster.ClusterDefinitionService;
import org.onosproject.cluster.ClusterEvent;
import org.onosproject.cluster.ClusterStore;
import org.onosproject.cluster.ClusterStoreDelegate;
@@ -37,18 +37,12 @@
import org.onosproject.cluster.NodeId;
import org.onosproject.store.AbstractStore;
import org.onosproject.store.cluster.messaging.Endpoint;
-import org.onosproject.store.consistent.impl.DatabaseDefinition;
-import org.onosproject.store.consistent.impl.DatabaseDefinitionStore;
+import org.onosproject.store.cluster.messaging.MessagingService;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.serializers.KryoSerializer;
import org.slf4j.Logger;
-import java.io.File;
import java.io.IOException;
-import java.net.InetAddress;
-import java.net.NetworkInterface;
-import java.net.SocketException;
-import java.util.Enumeration;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
@@ -59,12 +53,7 @@
import java.util.stream.Collectors;
import static com.google.common.base.Preconditions.checkNotNull;
-import static com.hazelcast.util.AddressUtil.matchInterface;
-import static java.net.NetworkInterface.getNetworkInterfaces;
-import static java.util.Collections.list;
import static org.onlab.util.Tools.groupedThreads;
-import static org.onosproject.cluster.DefaultControllerNode.DEFAULT_PORT;
-import static org.onosproject.store.consistent.impl.DatabaseManager.PARTITION_DEFINITION_FILE;
import static org.slf4j.LoggerFactory.getLogger;
@Component(immediate = true)
@@ -79,11 +68,9 @@
private static final Logger log = getLogger(DistributedClusterStore.class);
- public static final String CLUSTER_DEFINITION_FILE = "../config/cluster.json";
public static final String HEARTBEAT_MESSAGE = "onos-cluster-heartbeat";
// TODO: make these configurable.
- private static final int HEARTBEAT_FD_PORT = 2419;
private static final int HEARTBEAT_INTERVAL_MS = 100;
private static final int PHI_FAILURE_THRESHOLD = 10;
@@ -99,16 +86,10 @@
};
private static final String INSTANCE_ID_NULL = "Instance ID cannot be null";
- private static final byte SITE_LOCAL_BYTE = (byte) 0xC0;
- private static final String ONOS_NIC = "ONOS_NIC";
- private ClusterDefinition clusterDefinition;
-
- private Set<ControllerNode> seedNodes;
private final Map<NodeId, ControllerNode> allNodes = Maps.newConcurrentMap();
private final Map<NodeId, State> nodeStates = Maps.newConcurrentMap();
private final Map<NodeId, DateTime> nodeStateLastUpdatedTimes = Maps.newConcurrentMap();
- private NettyMessagingManager messagingService;
private ScheduledExecutorService heartBeatSender = Executors.newSingleThreadScheduledExecutor(
groupedThreads("onos/cluster/membership", "heartbeat-sender"));
private ExecutorService heartBeatMessageHandler = Executors.newSingleThreadExecutor(
@@ -118,45 +99,16 @@
private ControllerNode localNode;
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected ClusterDefinitionService clusterDefinitionService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected MessagingService messagingService;
+
@Activate
public void activate() {
- File clusterDefinitionFile = new File(CLUSTER_DEFINITION_FILE);
- ClusterDefinitionStore clusterDefinitionStore =
- new ClusterDefinitionStore(clusterDefinitionFile.getPath());
+ localNode = clusterDefinitionService.localNode();
- if (!clusterDefinitionFile.exists()) {
- createDefaultClusterDefinition(clusterDefinitionStore);
- }
-
- try {
- clusterDefinition = clusterDefinitionStore.read();
- seedNodes = ImmutableSet
- .copyOf(clusterDefinition.getNodes())
- .stream()
- .map(n -> new DefaultControllerNode(new NodeId(n.getId()),
- IpAddress.valueOf(n.getIp()),
- n.getTcpPort()))
- .collect(Collectors.toSet());
- } catch (IOException e) {
- throw new IllegalStateException("Failed to read cluster definition.", e);
- }
-
- seedNodes.forEach(node -> {
- allNodes.put(node.id(), node);
- updateState(node.id(), State.INACTIVE);
- });
-
- establishSelfIdentity();
-
- messagingService = new NettyMessagingManager(HEARTBEAT_FD_PORT);
- try {
- messagingService.activate();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new IllegalStateException(
- "Failed to cleanly initialize membership and"
- + " failure detector communication channel.", e);
- }
messagingService.registerHandler(HEARTBEAT_MESSAGE,
new HeartbeatMessageHandler(), heartBeatMessageHandler);
@@ -165,56 +117,15 @@
heartBeatSender.scheduleWithFixedDelay(this::heartbeat, 0,
HEARTBEAT_INTERVAL_MS, TimeUnit.MILLISECONDS);
+ addNode(localNode);
+ updateState(localNode.id(), State.ACTIVE);
+
log.info("Started");
}
- private void createDefaultClusterDefinition(ClusterDefinitionStore store) {
- // Assumes IPv4 is returned.
- String ip = DistributedClusterStore.getSiteLocalAddress();
- String ipPrefix = ip.replaceFirst("\\.[0-9]*$", ".*");
- NodeInfo node = NodeInfo.from(ip, ip, DEFAULT_PORT);
- try {
- store.write(ClusterDefinition.from(ImmutableSet.of(node), ipPrefix));
- } catch (IOException e) {
- log.warn("Unable to write default cluster definition", e);
- }
- }
-
- /**
- * Returns the address that matches the IP prefix given in ONOS_NIC
- * environment variable if one was specified, or the first site local
- * address if one can be found or the loopback address otherwise.
- *
- * @return site-local address in string form
- */
- public static String getSiteLocalAddress() {
- try {
- String ipPrefix = System.getenv(ONOS_NIC);
- for (NetworkInterface nif : list(getNetworkInterfaces())) {
- for (InetAddress address : list(nif.getInetAddresses())) {
- IpAddress ip = IpAddress.valueOf(address);
- if (ipPrefix == null && address.isSiteLocalAddress() ||
- ipPrefix != null && matchInterface(ip.toString(), ipPrefix)) {
- return ip.toString();
- }
- }
- }
-
- } catch (SocketException e) {
- log.error("Unable to get network interfaces", e);
- }
-
- return IpAddress.valueOf(InetAddress.getLoopbackAddress()).toString();
- }
-
@Deactivate
public void deactivate() {
- try {
- messagingService.deactivate();
- } catch (Exception e) {
- log.trace("Failed to cleanly shutdown cluster membership messaging", e);
- }
-
+ messagingService.unregisterHandler(HEARTBEAT_MESSAGE);
heartBeatSender.shutdownNow();
heartBeatMessageHandler.shutdownNow();
@@ -262,9 +173,7 @@
@Override
public ControllerNode addNode(NodeId nodeId, IpAddress ip, int tcpPort) {
ControllerNode node = new DefaultControllerNode(nodeId, ip, tcpPort);
- allNodes.put(node.id(), node);
- updateState(nodeId, State.INACTIVE);
- notifyDelegate(new ClusterEvent(ClusterEvent.Type.INSTANCE_ADDED, node));
+ addNode(node);
return node;
}
@@ -278,22 +187,10 @@
}
}
- @Override
- public void formCluster(Set<ControllerNode> nodes, String ipPrefix) {
- try {
- Set<NodeInfo> infos = Sets.newHashSet();
- nodes.forEach(n -> infos.add(NodeInfo.from(n.id().toString(),
- n.ip().toString(),
- n.tcpPort())));
-
- ClusterDefinition cdef = ClusterDefinition.from(infos, ipPrefix);
- new ClusterDefinitionStore(CLUSTER_DEFINITION_FILE).write(cdef);
-
- DatabaseDefinition ddef = DatabaseDefinition.from(infos);
- new DatabaseDefinitionStore(PARTITION_DEFINITION_FILE).write(ddef);
- } catch (IOException e) {
- log.error("Unable to form cluster", e);
- }
+ private void addNode(ControllerNode node) {
+ allNodes.put(node.id(), node);
+ updateState(node.id(), State.INACTIVE);
+ notifyDelegate(new ClusterEvent(ClusterEvent.Type.INSTANCE_ADDED, node));
}
private void updateState(NodeId nodeId, State newState) {
@@ -301,18 +198,6 @@
nodeStateLastUpdatedTimes.put(nodeId, DateTime.now());
}
- private void establishSelfIdentity() {
- try {
- IpAddress ip = findLocalIp();
- localNode = new DefaultControllerNode(new NodeId(ip.toString()), ip);
- allNodes.put(localNode.id(), localNode);
- updateState(localNode.id(), State.ACTIVE);
- log.info("Local Node: {}", localNode);
- } catch (SocketException e) {
- throw new IllegalStateException("Cannot determine local IP", e);
- }
- }
-
private void heartbeat() {
try {
Set<ControllerNode> peers = allNodes.values()
@@ -351,7 +236,7 @@
}
private void heartbeatToPeer(byte[] messagePayload, ControllerNode peer) {
- Endpoint remoteEp = new Endpoint(peer.ip(), HEARTBEAT_FD_PORT);
+ Endpoint remoteEp = new Endpoint(peer.ip(), peer.tcpPort());
try {
messagingService.sendAsync(remoteEp, HEARTBEAT_MESSAGE, messagePayload);
} catch (IOException e) {
@@ -359,22 +244,6 @@
}
}
- private IpAddress findLocalIp() throws SocketException {
- Enumeration<NetworkInterface> interfaces =
- NetworkInterface.getNetworkInterfaces();
- while (interfaces.hasMoreElements()) {
- NetworkInterface iface = interfaces.nextElement();
- Enumeration<InetAddress> inetAddresses = iface.getInetAddresses();
- while (inetAddresses.hasMoreElements()) {
- IpAddress ip = IpAddress.valueOf(inetAddresses.nextElement());
- if (AddressUtil.matchInterface(ip.toString(), clusterDefinition.getIpPrefix())) {
- return ip;
- }
- }
- }
- throw new IllegalStateException("Unable to determine local ip");
- }
-
private class HeartbeatMessageHandler implements Consumer<byte[]> {
@Override
public void accept(byte[] message) {
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/HazelcastClusterStore.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/HazelcastClusterStore.java
index e88e2d2..c7be0db 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/HazelcastClusterStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/HazelcastClusterStore.java
@@ -130,11 +130,6 @@
}
@Override
- public void formCluster(Set<ControllerNode> nodes, String ipPrefix) {
- throw new UnsupportedOperationException("formCluster not implemented");
- }
-
- @Override
public ControllerNode addNode(NodeId nodeId, IpAddress ip, int tcpPort) {
return addNode(new DefaultControllerNode(nodeId, ip, tcpPort));
}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManager.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManager.java
index 1a4512d..7475819 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManager.java
@@ -21,8 +21,6 @@
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
-import org.onlab.netty.NettyMessagingManager;
-import org.onlab.nio.service.IOLoopMessagingManager;
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.ControllerNode;
import org.onosproject.cluster.NodeId;
@@ -60,47 +58,16 @@
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
private ClusterService clusterService;
- // TODO: This probably should not be a OSGi service.
- private MessagingService messagingService;
-
- private final boolean useNetty = true;
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected MessagingService messagingService;
@Activate
public void activate() {
- ControllerNode localNode = clusterService.getLocalNode();
- if (useNetty) {
- NettyMessagingManager netty = new NettyMessagingManager(localNode.ip(), localNode.tcpPort());
- try {
- netty.activate();
- messagingService = netty;
- } catch (Exception e) {
- log.error("NettyMessagingService#activate", e);
- }
- } else {
- IOLoopMessagingManager ioLoop = new IOLoopMessagingManager(localNode.ip(), localNode.tcpPort());
- try {
- ioLoop.activate();
- messagingService = ioLoop;
- } catch (Exception e) {
- log.error("IOLoopMessagingService#activate", e);
- }
- }
- log.info("Started on {}:{}", localNode.ip(), localNode.tcpPort());
+ log.info("Started");
}
@Deactivate
public void deactivate() {
- // TODO: cleanup messageingService if needed.
- // FIXME: workaround until it becomes a service.
- try {
- if (useNetty) {
- ((NettyMessagingManager) messagingService).deactivate();
- } else {
- ((IOLoopMessagingManager) messagingService).deactivate();
- }
- } catch (Exception e) {
- log.error("MessagingService#deactivate", e);
- }
log.info("Stopped");
}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/IOLoopMessagingManager.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/IOLoopMessagingManager.java
new file mode 100644
index 0000000..9e52c3e
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/IOLoopMessagingManager.java
@@ -0,0 +1,40 @@
+package org.onosproject.store.cluster.messaging.impl;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.apache.felix.scr.annotations.Service;
+import org.onlab.nio.service.IOLoopMessaging;
+import org.onosproject.cluster.ClusterDefinitionService;
+import org.onosproject.cluster.ControllerNode;
+import org.onosproject.store.cluster.messaging.Endpoint;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * IOLoop based MessagingService.
+ */
+@Component(immediate = true, enabled = false)
+@Service
+public class IOLoopMessagingManager extends IOLoopMessaging {
+
+ private final Logger log = LoggerFactory.getLogger(getClass());
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected ClusterDefinitionService clusterDefinitionService;
+
+ @Activate
+ public void activate() throws Exception {
+ ControllerNode localNode = clusterDefinitionService.localNode();
+ super.start(new Endpoint(localNode.ip(), localNode.tcpPort()));
+ log.info("Started");
+ }
+
+ @Deactivate
+ public void deactivate() throws Exception {
+ super.stop();
+ log.info("Stopped");
+ }
+}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/NettyMessagingManager.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/NettyMessagingManager.java
new file mode 100644
index 0000000..4777fdb
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/NettyMessagingManager.java
@@ -0,0 +1,40 @@
+package org.onosproject.store.cluster.messaging.impl;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.apache.felix.scr.annotations.Service;
+import org.onlab.netty.NettyMessaging;
+import org.onosproject.cluster.ClusterDefinitionService;
+import org.onosproject.cluster.ControllerNode;
+import org.onosproject.store.cluster.messaging.Endpoint;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Netty based MessagingService.
+ */
+@Component(immediate = true, enabled = true)
+@Service
+public class NettyMessagingManager extends NettyMessaging {
+
+ private final Logger log = LoggerFactory.getLogger(getClass());
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected ClusterDefinitionService clusterDefinitionService;
+
+ @Activate
+ public void activate() throws Exception {
+ ControllerNode localNode = clusterDefinitionService.localNode();
+ super.start(new Endpoint(localNode.ip(), localNode.tcpPort()));
+ log.info("Started");
+ }
+
+ @Deactivate
+ public void deactivate() throws Exception {
+ super.stop();
+ log.info("Stopped");
+ }
+}
\ No newline at end of file
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseManager.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseManager.java
index 71d1961..c06ebdf 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseManager.java
@@ -45,7 +45,7 @@
import org.apache.felix.scr.annotations.Service;
import org.onosproject.cluster.ClusterService;
import org.onosproject.core.IdGenerator;
-import org.onosproject.store.cluster.impl.DistributedClusterStore;
+import org.onosproject.store.cluster.impl.ClusterDefinitionManager;
import org.onosproject.store.cluster.impl.NodeInfo;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
import org.onosproject.store.ecmap.EventuallyConsistentMapBuilderImpl;
@@ -193,7 +193,7 @@
private void createDefaultDatabaseDefinition(DatabaseDefinitionStore store) {
// Assumes IPv4 is returned.
- String ip = DistributedClusterStore.getSiteLocalAddress();
+ String ip = ClusterDefinitionManager.getSiteLocalAddress();
NodeInfo node = NodeInfo.from(ip, ip, COPYCAT_TCP_PORT);
try {
store.write(DatabaseDefinition.from(ImmutableSet.of(node)));
diff --git a/core/store/dist/src/main/java/org/onosproject/store/hz/StoreManager.java b/core/store/dist/src/main/java/org/onosproject/store/hz/StoreManager.java
index dc159af..5cb5757 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/hz/StoreManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/hz/StoreManager.java
@@ -26,7 +26,7 @@
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Service;
-import org.onosproject.store.cluster.impl.DistributedClusterStore;
+import org.onosproject.store.cluster.impl.ClusterDefinitionManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -66,7 +66,7 @@
}
private void createDefaultHazelcastFile(File hazelcastFile) {
- String ip = DistributedClusterStore.getSiteLocalAddress();
+ String ip = ClusterDefinitionManager.getSiteLocalAddress();
String ipPrefix = ip.replaceFirst("\\.[0-9]*$", ".*");
InputStream his = getClass().getResourceAsStream("/hazelcast.xml");
try {
diff --git a/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/ConsistentDeviceMastershipStore.java b/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/ConsistentDeviceMastershipStore.java
index 292330c..43af1b1 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/ConsistentDeviceMastershipStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/ConsistentDeviceMastershipStore.java
@@ -140,7 +140,7 @@
localNodeId = clusterService.getLocalNode().id();
leadershipService.addListener(leadershipEventListener);
- log.info("Started.");
+ log.info("Started");
}
@Deactivate
@@ -151,7 +151,7 @@
transferExecutor.shutdown();
leadershipService.removeListener(leadershipEventListener);
- log.info("Stoppped.");
+ log.info("Stopped");
}
@Override
diff --git a/core/store/dist/src/test/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManagerTest.java b/core/store/dist/src/test/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManagerTest.java
index 1a106ba..0dcc6a1 100644
--- a/core/store/dist/src/test/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManagerTest.java
+++ b/core/store/dist/src/test/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManagerTest.java
@@ -22,7 +22,6 @@
import org.onosproject.cluster.DefaultControllerNode;
import org.onosproject.cluster.NodeId;
import org.onosproject.store.cluster.impl.ClusterNodesDelegate;
-import org.onlab.netty.NettyMessagingManager;
import org.onlab.packet.IpAddress;
import java.util.concurrent.CountDownLatch;
diff --git a/core/store/trivial/src/main/java/org/onosproject/store/trivial/impl/SimpleClusterStore.java b/core/store/trivial/src/main/java/org/onosproject/store/trivial/impl/SimpleClusterStore.java
index ffc67e3..8fde858 100644
--- a/core/store/trivial/src/main/java/org/onosproject/store/trivial/impl/SimpleClusterStore.java
+++ b/core/store/trivial/src/main/java/org/onosproject/store/trivial/impl/SimpleClusterStore.java
@@ -93,11 +93,6 @@
}
@Override
- public void formCluster(Set<ControllerNode> nodes, String ipPrefix) {
-
- }
-
- @Override
public ControllerNode addNode(NodeId nodeId, IpAddress ip, int tcpPort) {
return null;
}