removed ClusterCommunicationAdminService and SerializationService
Change-Id: I91da0a5d65128e5ba5179b0eab41839eec706c71
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterCommunicationManager.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterCommunicationManager.java
index 352328b..c7852ae 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterCommunicationManager.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterCommunicationManager.java
@@ -4,8 +4,6 @@
import java.io.IOException;
import java.util.Set;
-import java.util.Timer;
-import java.util.TimerTask;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
@@ -16,9 +14,6 @@
import org.onlab.onos.cluster.ControllerNode;
import org.onlab.onos.cluster.NodeId;
import org.onlab.onos.store.cluster.impl.ClusterMembershipEvent;
-import org.onlab.onos.store.cluster.impl.ClusterMembershipEventType;
-import org.onlab.onos.store.cluster.impl.ClusterNodesDelegate;
-import org.onlab.onos.store.cluster.messaging.ClusterCommunicationAdminService;
import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
import org.onlab.onos.store.cluster.messaging.ClusterMessage;
import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
@@ -39,19 +34,13 @@
@Component(immediate = true)
@Service
public class ClusterCommunicationManager
- implements ClusterCommunicationService, ClusterCommunicationAdminService {
+ implements ClusterCommunicationService {
private final Logger log = LoggerFactory.getLogger(getClass());
- private ControllerNode localNode;
-
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
private ClusterService clusterService;
- private ClusterNodesDelegate nodesDelegate;
- private final Timer timer = new Timer("onos-controller-heatbeats");
- public static final long HEART_BEAT_INTERVAL_MILLIS = 1000L;
-
// TODO: This probably should not be a OSGi service.
private MessagingService messagingService;
@@ -72,7 +61,7 @@
@Activate
public void activate() {
- localNode = clusterService.getLocalNode();
+ ControllerNode localNode = clusterService.getLocalNode();
NettyMessagingService netty = new NettyMessagingService(localNode.tcpPort());
// FIXME: workaround until it becomes a service.
try {
@@ -94,6 +83,7 @@
@Override
public boolean broadcast(ClusterMessage message) throws IOException {
boolean ok = true;
+ final ControllerNode localNode = clusterService.getLocalNode();
for (ControllerNode node : clusterService.getNodes()) {
if (!node.equals(localNode)) {
ok = unicast(message, node.id()) && ok;
@@ -105,6 +95,7 @@
@Override
public boolean multicast(ClusterMessage message, Set<NodeId> nodes) throws IOException {
boolean ok = true;
+ final ControllerNode localNode = clusterService.getLocalNode();
for (NodeId nodeId : nodes) {
if (!nodeId.equals(localNode.id())) {
ok = unicast(message, nodeId) && ok;
@@ -134,65 +125,6 @@
messagingService.registerHandler(subject.value(), new InternalClusterMessageHandler(subscriber));
}
- @Override
- public void initialize(ControllerNode localNode,
- ClusterNodesDelegate delegate) {
- this.localNode = localNode;
- this.nodesDelegate = delegate;
- this.addSubscriber(new MessageSubject("CLUSTER_MEMBERSHIP_EVENT"), new ClusterMemebershipEventHandler());
- timer.schedule(new KeepAlive(), 0, HEART_BEAT_INTERVAL_MILLIS);
- }
-
- @Override
- public void addNode(ControllerNode node) {
- //members.put(node.id(), node);
- }
-
- @Override
- public void removeNode(ControllerNode node) {
-// broadcast(new ClusterMessage(
-// localNode.id(),
-// new MessageSubject("CLUSTER_MEMBERSHIP_EVENT"),
-// SERIALIZER.encode(new ClusterMembershipEvent(ClusterMembershipEventType.LEAVING_MEMBER, node))));
- //members.remove(node.id());
- }
-
- // Sends a heart beat to all peers.
- private class KeepAlive extends TimerTask {
-
- @Override
- public void run() {
- try {
- broadcast(new ClusterMessage(
- localNode.id(),
- new MessageSubject("CLUSTER_MEMBERSHIP_EVENT"),
- SERIALIZER.encode(new ClusterMembershipEvent(ClusterMembershipEventType.HEART_BEAT, localNode))));
- } catch (IOException e) {
- log.warn("I/O error while broadcasting heart beats.", e);
- }
- }
- }
-
- private class ClusterMemebershipEventHandler implements ClusterMessageHandler {
-
- @Override
- public void handle(ClusterMessage message) {
-
- ClusterMembershipEvent event = SERIALIZER.decode(message.payload());
- ControllerNode node = event.node();
- if (event.type() == ClusterMembershipEventType.HEART_BEAT) {
- log.info("Node {} sent a hearbeat", node.id());
- nodesDelegate.nodeDetected(node.id(), node.ip(), node.tcpPort());
- } else if (event.type() == ClusterMembershipEventType.LEAVING_MEMBER) {
- log.info("Node {} is leaving", node.id());
- nodesDelegate.nodeRemoved(node.id());
- } else if (event.type() == ClusterMembershipEventType.UNREACHABLE_MEMBER) {
- log.info("Node {} is unreachable", node.id());
- nodesDelegate.nodeVanished(node.id());
- }
- }
- }
-
private final class InternalClusterMessageHandler implements MessageHandler {
private final ClusterMessageHandler handler;