blob: 62eed8ef8194f39b942a567ee9f7ac155c6c79a6 [file] [log] [blame]
tomcb43d602014-09-28 22:46:16 -07001package org.onlab.onos.ccc;
2
3import com.google.common.collect.ImmutableSet;
4import org.apache.felix.scr.annotations.Activate;
5import org.apache.felix.scr.annotations.Component;
6import org.apache.felix.scr.annotations.Deactivate;
7import org.apache.felix.scr.annotations.Service;
8import org.onlab.nio.AcceptorLoop;
9import org.onlab.nio.IOLoop;
10import org.onlab.nio.MessageStream;
11import org.onlab.onos.cluster.ClusterEvent;
12import org.onlab.onos.cluster.ClusterStore;
13import org.onlab.onos.cluster.ClusterStoreDelegate;
14import org.onlab.onos.cluster.ControllerNode;
15import org.onlab.onos.cluster.DefaultControllerNode;
16import org.onlab.onos.cluster.NodeId;
17import org.onlab.onos.store.AbstractStore;
18import org.onlab.packet.IpPrefix;
19import org.slf4j.Logger;
20import org.slf4j.LoggerFactory;
21
22import java.io.IOException;
23import java.net.InetSocketAddress;
24import java.nio.channels.ByteChannel;
25import java.nio.channels.ServerSocketChannel;
26import java.util.ArrayList;
27import java.util.List;
28import java.util.Map;
29import java.util.Set;
30import java.util.concurrent.ConcurrentHashMap;
31import java.util.concurrent.ExecutorService;
32import java.util.concurrent.Executors;
33
34import static java.net.InetAddress.getByAddress;
35import static org.onlab.onos.cluster.ControllerNode.State;
36import static org.onlab.packet.IpPrefix.valueOf;
37import static org.onlab.util.Tools.namedThreads;
38
39/**
40 * Distributed implementation of the cluster nodes store.
41 */
42@Component(immediate = true)
43@Service
44public class DistributedClusterStore
45 extends AbstractStore<ClusterEvent, ClusterStoreDelegate>
46 implements ClusterStore {
47
48 private final Logger log = LoggerFactory.getLogger(getClass());
49
50 private static final long SELECT_TIMEOUT = 50;
51 private static final int WORKERS = 3;
52 private static final int COMM_BUFFER_SIZE = 16 * 1024;
53 private static final int COMM_IDLE_TIME = 500;
54
55 private DefaultControllerNode self;
56 private final Map<NodeId, DefaultControllerNode> nodes = new ConcurrentHashMap<>();
57 private final Map<NodeId, State> states = new ConcurrentHashMap<>();
58
59 private final ExecutorService listenExecutor =
60 Executors.newSingleThreadExecutor(namedThreads("onos-listen"));
61 private final ExecutorService commExecutors =
62 Executors.newFixedThreadPool(WORKERS, namedThreads("onos-cluster"));
63 private final ExecutorService heartbeatExecutor =
64 Executors.newSingleThreadExecutor(namedThreads("onos-heartbeat"));
65
66 private ListenLoop listenLoop;
67 private List<CommLoop> commLoops = new ArrayList<>(WORKERS);
68
69 @Activate
70 public void activate() {
71 establishIdentity();
72 startCommunications();
73 startListening();
74 log.info("Started");
75 }
76
77 private void startCommunications() {
78 for (int i = 0; i < WORKERS; i++) {
79 try {
80 CommLoop loop = new CommLoop();
81 commLoops.add(loop);
82 commExecutors.execute(loop);
83 } catch (IOException e) {
84 log.warn("Unable to start comm IO loop", e);
85 }
86 }
87 }
88
89 // Starts listening for connections from peer cluster members.
90 private void startListening() {
91 try {
92 listenLoop = new ListenLoop(self.ip(), self.tcpPort());
93 listenExecutor.execute(listenLoop);
94 } catch (IOException e) {
95 log.error("Unable to listen for cluster connections", e);
96 }
97 }
98
99 // Establishes the controller's own identity.
100 private void establishIdentity() {
101 // For now rely on env. variable.
102 IpPrefix ip = valueOf(System.getenv("ONOS_NIC"));
103 self = new DefaultControllerNode(new NodeId(ip.toString()), ip);
104 }
105
106 @Deactivate
107 public void deactivate() {
108 listenLoop.shutdown();
109 for (CommLoop loop : commLoops) {
110 loop.shutdown();
111 }
112 log.info("Stopped");
113 }
114
115 @Override
116 public ControllerNode getLocalNode() {
117 return self;
118 }
119
120 @Override
121 public Set<ControllerNode> getNodes() {
122 ImmutableSet.Builder<ControllerNode> builder = ImmutableSet.builder();
123 return builder.addAll(nodes.values()).build();
124 }
125
126 @Override
127 public ControllerNode getNode(NodeId nodeId) {
128 return nodes.get(nodeId);
129 }
130
131 @Override
132 public State getState(NodeId nodeId) {
133 State state = states.get(nodeId);
134 return state == null ? State.INACTIVE : state;
135 }
136
137 @Override
138 public ControllerNode addNode(NodeId nodeId, IpPrefix ip, int tcpPort) {
139 DefaultControllerNode node = new DefaultControllerNode(nodeId, ip, tcpPort);
140 nodes.put(nodeId, node);
141 return node;
142 }
143
144 @Override
145 public void removeNode(NodeId nodeId) {
146 nodes.remove(nodeId);
147 }
148
149 // Listens and accepts inbound connections from other cluster nodes.
150 private class ListenLoop extends AcceptorLoop {
151 ListenLoop(IpPrefix ip, int tcpPort) throws IOException {
152 super(SELECT_TIMEOUT, new InetSocketAddress(getByAddress(ip.toOctets()), tcpPort));
153 }
154
155 @Override
156 protected void acceptConnection(ServerSocketChannel channel) throws IOException {
157
158 }
159 }
160
161 private class CommLoop extends IOLoop<TLVMessage, TLVMessageStream> {
162 CommLoop() throws IOException {
163 super(SELECT_TIMEOUT);
164 }
165
166 @Override
167 protected TLVMessageStream createStream(ByteChannel byteChannel) {
168 return new TLVMessageStream(this, byteChannel, COMM_BUFFER_SIZE, COMM_IDLE_TIME);
169 }
170
171 @Override
172 protected void processMessages(List<TLVMessage> messages, MessageStream<TLVMessage> stream) {
173
174 }
175 }
176}