blob: 9bd25b47ec72827ace8b4b8861df8de165ae6e99 [file] [log] [blame]
Madan Jampani890bc352014-10-01 22:35:29 -07001package org.onlab.onos.store.cluster.messaging.impl;
2
3import static com.google.common.base.Preconditions.checkArgument;
4
5import java.io.IOException;
6import java.util.HashMap;
7import java.util.Map;
8import java.util.Set;
9import java.util.Timer;
10import java.util.TimerTask;
11
12import org.apache.felix.scr.annotations.Activate;
13import org.apache.felix.scr.annotations.Component;
14import org.apache.felix.scr.annotations.Deactivate;
15import org.apache.felix.scr.annotations.Reference;
16import org.apache.felix.scr.annotations.ReferenceCardinality;
17import org.apache.felix.scr.annotations.Service;
18import org.onlab.onos.cluster.ControllerNode;
19import org.onlab.onos.cluster.NodeId;
20import org.onlab.onos.store.cluster.impl.ClusterNodesDelegate;
21import org.onlab.onos.store.cluster.messaging.ClusterCommunicationAdminService;
22import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
23import org.onlab.onos.store.cluster.messaging.ClusterMessage;
24import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
25import org.onlab.onos.store.cluster.messaging.MessageSubject;
26import org.onlab.onos.store.messaging.Endpoint;
27import org.onlab.onos.store.messaging.Message;
28import org.onlab.onos.store.messaging.MessageHandler;
29import org.onlab.onos.store.messaging.MessagingService;
30import org.slf4j.Logger;
31import org.slf4j.LoggerFactory;
32
33@Component(immediate = true)
34@Service
35public class OnosClusterCommunicationManager
36 implements ClusterCommunicationService, ClusterCommunicationAdminService {
37
38 private final Logger log = LoggerFactory.getLogger(getClass());
39
40 private ControllerNode localNode;
41 private ClusterNodesDelegate nodesDelegate;
42 private Map<NodeId, ControllerNode> members = new HashMap<>();
43 private final Timer timer = new Timer("onos-controller-heatbeats");
44 public static final long HEART_BEAT_INTERVAL_MILLIS = 1000L;
45
46 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
47 private MessagingService messagingService;
48
49 @Activate
50 public void activate() {
51 log.info("Started");
52 }
53
54 @Deactivate
55 public void deactivate() {
56 log.info("Stopped");
57 }
58
59 @Override
60 public boolean broadcast(ClusterMessage message) {
61 boolean ok = true;
62 for (ControllerNode node : members.values()) {
63 if (!node.equals(localNode)) {
64 ok = unicast(message, node.id()) && ok;
65 }
66 }
67 return ok;
68 }
69
70 @Override
71 public boolean multicast(ClusterMessage message, Set<NodeId> nodes) {
72 boolean ok = true;
73 for (NodeId nodeId : nodes) {
74 if (!nodeId.equals(localNode.id())) {
75 ok = unicast(message, nodeId) && ok;
76 }
77 }
78 return ok;
79 }
80
81 @Override
82 public boolean unicast(ClusterMessage message, NodeId toNodeId) {
83 ControllerNode node = members.get(toNodeId);
84 checkArgument(node != null, "Unknown nodeId: %s", toNodeId);
85 Endpoint nodeEp = new Endpoint(node.ip().toString(), node.tcpPort());
86 try {
87 messagingService.sendAsync(nodeEp, message.subject().value(), message);
88 return true;
89 } catch (IOException e) {
90 log.error("Failed to send cluster message to nodeId: " + toNodeId, e);
91 }
92
93 return false;
94 }
95
96 @Override
97 public void addSubscriber(MessageSubject subject,
98 ClusterMessageHandler subscriber) {
99 messagingService.registerHandler(subject.value(), new InternalClusterMessageHandler(subscriber));
100 }
101
102 @Override
103 public void initialize(ControllerNode localNode,
104 ClusterNodesDelegate delegate) {
105 this.localNode = localNode;
106 this.nodesDelegate = delegate;
107 this.addSubscriber(new MessageSubject("CLUSTER_MEMBERSHIP_EVENT"), new ClusterMemebershipEventHandler());
108 timer.schedule(new KeepAlive(), 0, HEART_BEAT_INTERVAL_MILLIS);
109 }
110
111 @Override
112 public void addNode(ControllerNode node) {
113 members.put(node.id(), node);
114 }
115
116 @Override
117 public void removeNode(ControllerNode node) {
118 broadcast(new ClusterMessage(
119 localNode.id(),
120 new MessageSubject("CLUSTER_MEMBERSHIP_EVENT"),
121 new ClusterMembershipEvent(ClusterMembershipEventType.LEAVING_MEMBER, node)));
122 members.remove(node.id());
123 }
124
125 // Sends a heart beat to all peers.
126 private class KeepAlive extends TimerTask {
127
128 @Override
129 public void run() {
130 broadcast(new ClusterMessage(
131 localNode.id(),
132 new MessageSubject("CLUSTER_MEMBERSHIP_EVENT"),
133 new ClusterMembershipEvent(ClusterMembershipEventType.HEART_BEAT, localNode)));
134 }
135 }
136
137 private class ClusterMemebershipEventHandler implements ClusterMessageHandler {
138
139 @Override
140 public void handle(ClusterMessage message) {
141
142 ClusterMembershipEvent event = (ClusterMembershipEvent) message.payload();
143 ControllerNode node = event.node();
144 if (event.type() == ClusterMembershipEventType.HEART_BEAT) {
145 log.info("Node {} sent a hearbeat", node.id());
146 nodesDelegate.nodeDetected(node.id(), node.ip(), node.tcpPort());
147 } else if (event.type() == ClusterMembershipEventType.LEAVING_MEMBER) {
148 log.info("Node {} is leaving", node.id());
149 nodesDelegate.nodeRemoved(node.id());
150 } else if (event.type() == ClusterMembershipEventType.UNREACHABLE_MEMBER) {
151 log.info("Node {} is unreachable", node.id());
152 nodesDelegate.nodeVanished(node.id());
153 }
154 }
155 }
156
157 private static class InternalClusterMessageHandler implements MessageHandler {
158
159 private final ClusterMessageHandler handler;
160
161 public InternalClusterMessageHandler(ClusterMessageHandler handler) {
162 this.handler = handler;
163 }
164
165 @Override
166 public void handle(Message message) {
167 handler.handle((ClusterMessage) message.payload());
168 }
169 }
170}