removed ClusterCommunicationAdminService and SerializationService
Change-Id: I91da0a5d65128e5ba5179b0eab41839eec706c71
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/DistributedClusterStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/DistributedClusterStore.java.bak
similarity index 96%
rename from core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/DistributedClusterStore.java
rename to core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/DistributedClusterStore.java.bak
index 5e64a39..5708e77 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/DistributedClusterStore.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/DistributedClusterStore.java.bak
@@ -18,7 +18,6 @@
import org.onlab.onos.cluster.NodeId;
import org.onlab.onos.store.AbstractStore;
import org.onlab.onos.store.cluster.messaging.ClusterCommunicationAdminService;
-import org.onlab.onos.store.cluster.messaging.impl.ClusterCommunicationManager;
import org.onlab.packet.IpPrefix;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -48,7 +47,7 @@
private final Map<NodeId, State> states = new ConcurrentHashMap<>();
private final Cache<NodeId, ControllerNode> livenessCache = CacheBuilder.newBuilder()
.maximumSize(1000)
- .expireAfterWrite(ClusterCommunicationManager.HEART_BEAT_INTERVAL_MILLIS * 3, TimeUnit.MILLISECONDS)
+ .expireAfterWrite(/*ClusterCommunicationManager.HEART_BEAT_INTERVAL_MILLIS * */3, TimeUnit.MILLISECONDS)
.removalListener(new LivenessCacheRemovalListener()).build();
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/ClusterCommunicationAdminService.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/ClusterCommunicationAdminService.java
deleted file mode 100644
index 0bc31fa..0000000
--- a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/ClusterCommunicationAdminService.java
+++ /dev/null
@@ -1,31 +0,0 @@
-package org.onlab.onos.store.cluster.messaging;
-
-import org.onlab.onos.cluster.ControllerNode;
-import org.onlab.onos.store.cluster.impl.ClusterNodesDelegate;
-
-// TODO: This service interface can be removed, once we properly start
-// using ClusterService
-/**
- * Service for administering communications manager.
- */
-public interface ClusterCommunicationAdminService {
-
- /**
- * Initialize.
- */
- void initialize(ControllerNode localNode, ClusterNodesDelegate nodesDelegate);
-
- /**
- * Adds the node to the list of monitored nodes.
- *
- * @param node node to be added
- */
- void addNode(ControllerNode node);
-
- /**
- * Removes the node from the list of monitored nodes.
- *
- * @param node node to be removed
- */
- void removeNode(ControllerNode node);
-}
\ No newline at end of file
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/SerializationService.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/SerializationService.java
deleted file mode 100644
index 3082718..0000000
--- a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/SerializationService.java
+++ /dev/null
@@ -1,25 +0,0 @@
-package org.onlab.onos.store.cluster.messaging;
-
-// FIXME: not used any more? remove
-/**
- * Service for encoding & decoding intra-cluster message payload.
- */
-public interface SerializationService {
-
- /**
- * Decodes the specified byte buffer to obtain the message within.
- *
- * @param buffer byte buffer with message(s)
- * @return parsed message
- */
- <T> T decode(byte[] data);
-
- /**
- * Encodes the specified message into the given byte buffer.
- *
- * @param message message to be encoded
- * @param buffer byte buffer to receive the message data
- */
- byte[] encode(Object message);
-
-}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterCommunicationManager.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterCommunicationManager.java
index 352328b..c7852ae 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterCommunicationManager.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterCommunicationManager.java
@@ -4,8 +4,6 @@
import java.io.IOException;
import java.util.Set;
-import java.util.Timer;
-import java.util.TimerTask;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
@@ -16,9 +14,6 @@
import org.onlab.onos.cluster.ControllerNode;
import org.onlab.onos.cluster.NodeId;
import org.onlab.onos.store.cluster.impl.ClusterMembershipEvent;
-import org.onlab.onos.store.cluster.impl.ClusterMembershipEventType;
-import org.onlab.onos.store.cluster.impl.ClusterNodesDelegate;
-import org.onlab.onos.store.cluster.messaging.ClusterCommunicationAdminService;
import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
import org.onlab.onos.store.cluster.messaging.ClusterMessage;
import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
@@ -39,19 +34,13 @@
@Component(immediate = true)
@Service
public class ClusterCommunicationManager
- implements ClusterCommunicationService, ClusterCommunicationAdminService {
+ implements ClusterCommunicationService {
private final Logger log = LoggerFactory.getLogger(getClass());
- private ControllerNode localNode;
-
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
private ClusterService clusterService;
- private ClusterNodesDelegate nodesDelegate;
- private final Timer timer = new Timer("onos-controller-heatbeats");
- public static final long HEART_BEAT_INTERVAL_MILLIS = 1000L;
-
// TODO: This probably should not be a OSGi service.
private MessagingService messagingService;
@@ -72,7 +61,7 @@
@Activate
public void activate() {
- localNode = clusterService.getLocalNode();
+ ControllerNode localNode = clusterService.getLocalNode();
NettyMessagingService netty = new NettyMessagingService(localNode.tcpPort());
// FIXME: workaround until it becomes a service.
try {
@@ -94,6 +83,7 @@
@Override
public boolean broadcast(ClusterMessage message) throws IOException {
boolean ok = true;
+ final ControllerNode localNode = clusterService.getLocalNode();
for (ControllerNode node : clusterService.getNodes()) {
if (!node.equals(localNode)) {
ok = unicast(message, node.id()) && ok;
@@ -105,6 +95,7 @@
@Override
public boolean multicast(ClusterMessage message, Set<NodeId> nodes) throws IOException {
boolean ok = true;
+ final ControllerNode localNode = clusterService.getLocalNode();
for (NodeId nodeId : nodes) {
if (!nodeId.equals(localNode.id())) {
ok = unicast(message, nodeId) && ok;
@@ -134,65 +125,6 @@
messagingService.registerHandler(subject.value(), new InternalClusterMessageHandler(subscriber));
}
- @Override
- public void initialize(ControllerNode localNode,
- ClusterNodesDelegate delegate) {
- this.localNode = localNode;
- this.nodesDelegate = delegate;
- this.addSubscriber(new MessageSubject("CLUSTER_MEMBERSHIP_EVENT"), new ClusterMemebershipEventHandler());
- timer.schedule(new KeepAlive(), 0, HEART_BEAT_INTERVAL_MILLIS);
- }
-
- @Override
- public void addNode(ControllerNode node) {
- //members.put(node.id(), node);
- }
-
- @Override
- public void removeNode(ControllerNode node) {
-// broadcast(new ClusterMessage(
-// localNode.id(),
-// new MessageSubject("CLUSTER_MEMBERSHIP_EVENT"),
-// SERIALIZER.encode(new ClusterMembershipEvent(ClusterMembershipEventType.LEAVING_MEMBER, node))));
- //members.remove(node.id());
- }
-
- // Sends a heart beat to all peers.
- private class KeepAlive extends TimerTask {
-
- @Override
- public void run() {
- try {
- broadcast(new ClusterMessage(
- localNode.id(),
- new MessageSubject("CLUSTER_MEMBERSHIP_EVENT"),
- SERIALIZER.encode(new ClusterMembershipEvent(ClusterMembershipEventType.HEART_BEAT, localNode))));
- } catch (IOException e) {
- log.warn("I/O error while broadcasting heart beats.", e);
- }
- }
- }
-
- private class ClusterMemebershipEventHandler implements ClusterMessageHandler {
-
- @Override
- public void handle(ClusterMessage message) {
-
- ClusterMembershipEvent event = SERIALIZER.decode(message.payload());
- ControllerNode node = event.node();
- if (event.type() == ClusterMembershipEventType.HEART_BEAT) {
- log.info("Node {} sent a hearbeat", node.id());
- nodesDelegate.nodeDetected(node.id(), node.ip(), node.tcpPort());
- } else if (event.type() == ClusterMembershipEventType.LEAVING_MEMBER) {
- log.info("Node {} is leaving", node.id());
- nodesDelegate.nodeRemoved(node.id());
- } else if (event.type() == ClusterMembershipEventType.UNREACHABLE_MEMBER) {
- log.info("Node {} is unreachable", node.id());
- nodesDelegate.nodeVanished(node.id());
- }
- }
- }
-
private final class InternalClusterMessageHandler implements MessageHandler {
private final ClusterMessageHandler handler;
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/MessageSerializer.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/MessageSerializer.java
deleted file mode 100644
index 07a97bc..0000000
--- a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/MessageSerializer.java
+++ /dev/null
@@ -1,64 +0,0 @@
-package org.onlab.onos.store.cluster.messaging.impl;
-
-import org.apache.felix.scr.annotations.Activate;
-import org.apache.felix.scr.annotations.Component;
-import org.apache.felix.scr.annotations.Deactivate;
-import org.apache.felix.scr.annotations.Service;
-import org.onlab.onos.store.cluster.messaging.MessageSubject;
-import org.onlab.onos.store.cluster.messaging.SerializationService;
-import org.onlab.onos.store.serializers.KryoPoolUtil;
-import org.onlab.util.KryoPool;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-//FIXME: not used any more? remove
-/**
- * Factory for parsing messages sent between cluster members.
- */
-@Component(immediate = true)
-@Service
-public class MessageSerializer implements SerializationService {
-
- private final Logger log = LoggerFactory.getLogger(getClass());
-
- private static final int METADATA_LENGTH = 12; // 8 + 4
- private static final int LENGTH_OFFSET = 8;
-
- private static final long MARKER = 0xfeedcafebeaddeadL;
-
- private KryoPool serializerPool;
-
- @Activate
- public void activate() {
- setupKryoPool();
- log.info("Started");
- }
-
- @Deactivate
- public void deactivate() {
- log.info("Stopped");
- }
-
- /**
- * Sets up the common serialzers pool.
- */
- protected void setupKryoPool() {
- serializerPool = KryoPool.newBuilder()
- .register(KryoPoolUtil.API)
- // TODO: Should MessageSubject be in API bundle?
- .register(MessageSubject.class)
- .build()
- .populate(1);
- }
-
-
- @Override
- public <T> T decode(byte[] data) {
- return serializerPool.deserialize(data);
- }
-
- @Override
- public byte[] encode(Object payload) {
- return serializerPool.serialize(payload);
- }
-}
diff --git a/core/store/dist/src/test/java/org/onlab/onos/store/cluster/messaging/impl/ClusterCommunicationManagerTest.java b/core/store/dist/src/test/java/org/onlab/onos/store/cluster/messaging/impl/ClusterCommunicationManagerTest.java
index dcbe437..34e5b3b 100644
--- a/core/store/dist/src/test/java/org/onlab/onos/store/cluster/messaging/impl/ClusterCommunicationManagerTest.java
+++ b/core/store/dist/src/test/java/org/onlab/onos/store/cluster/messaging/impl/ClusterCommunicationManagerTest.java
@@ -40,22 +40,18 @@
@Before
public void setUp() throws Exception {
- MessageSerializer messageSerializer = new MessageSerializer();
- messageSerializer.activate();
NettyMessagingService messagingService = new NettyMessagingService();
messagingService.activate();
ccm1 = new ClusterCommunicationManager();
-// ccm1.serializationService = messageSerializer;
ccm1.activate();
ccm2 = new ClusterCommunicationManager();
-// ccm2.serializationService = messageSerializer;
ccm2.activate();
- ccm1.initialize(node1, cnd1);
- ccm2.initialize(node2, cnd2);
+// ccm1.initialize(node1, cnd1);
+// ccm2.initialize(node2, cnd2);
}
@After
@@ -70,7 +66,7 @@
cnd1.latch = new CountDownLatch(1);
cnd2.latch = new CountDownLatch(1);
- ccm1.addNode(node2);
+// ccm1.addNode(node2);
validateDelegateEvent(cnd1, Op.DETECTED, node2.id());
validateDelegateEvent(cnd2, Op.DETECTED, node1.id());
}
@@ -81,7 +77,7 @@
cnd1.latch = new CountDownLatch(1);
cnd2.latch = new CountDownLatch(1);
- ccm1.addNode(node2);
+// ccm1.addNode(node2);
validateDelegateEvent(cnd1, Op.DETECTED, node2.id());
validateDelegateEvent(cnd2, Op.DETECTED, node1.id());