blob: b75e6a46874bb47e1917ca27b994e24cc22c0ffe [file] [log] [blame]
Brian O'Connor72a034c2014-11-26 18:24:23 -08001/*
2 * Copyright 2014 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onlab.onos.store.intent.impl;
17
18import com.google.common.collect.Maps;
19import com.google.common.collect.Sets;
20import com.hazelcast.core.HazelcastInstance;
21import com.hazelcast.core.IQueue;
22import org.apache.felix.scr.annotations.Activate;
23import org.apache.felix.scr.annotations.Component;
24import org.apache.felix.scr.annotations.Deactivate;
25import org.apache.felix.scr.annotations.Reference;
26import org.apache.felix.scr.annotations.ReferenceCardinality;
27import org.apache.felix.scr.annotations.Service;
28import org.onlab.onos.cluster.ClusterService;
29import org.onlab.onos.cluster.ControllerNode;
30import org.onlab.onos.cluster.LeadershipEvent;
31import org.onlab.onos.cluster.LeadershipEventListener;
32import org.onlab.onos.cluster.LeadershipService;
33import org.onlab.onos.core.ApplicationId;
34import org.onlab.onos.core.CoreService;
35import org.onlab.onos.net.intent.IntentBatchDelegate;
36import org.onlab.onos.net.intent.IntentBatchService;
37import org.onlab.onos.net.intent.IntentOperations;
38import org.onlab.onos.store.hz.SQueue;
39import org.onlab.onos.store.hz.StoreService;
40import org.onlab.onos.store.serializers.KryoNamespaces;
41import org.onlab.onos.store.serializers.KryoSerializer;
42import org.onlab.onos.store.serializers.StoreSerializer;
43import org.onlab.util.KryoNamespace;
44import org.slf4j.Logger;
45
46import java.util.Collections;
47import java.util.Map;
48import java.util.Set;
49
alshabib978d2412014-11-29 15:29:15 -080050
Brian O'Connor72a034c2014-11-26 18:24:23 -080051import static com.google.common.base.Preconditions.checkNotNull;
52import static com.google.common.base.Preconditions.checkState;
53import static org.slf4j.LoggerFactory.getLogger;
54
55@Component(immediate = true)
56@Service
57public class HazelcastIntentBatchQueue
58 implements IntentBatchService {
59
60 private final Logger log = getLogger(getClass());
61 private static final String TOPIC_BASE = "intent-batch-";
62
63 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
64 protected CoreService coreService;
65
66 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
67 protected ClusterService clusterService;
68
69 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
70 protected LeadershipService leadershipService;
71
72 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
73 protected StoreService storeService;
74
75 private HazelcastInstance theInstance;
76 private ControllerNode localControllerNode;
77 protected StoreSerializer serializer;
78 private IntentBatchDelegate delegate;
79 private InternalLeaderListener leaderListener = new InternalLeaderListener();
80 private final Map<ApplicationId, SQueue<IntentOperations>> batchQueues
81 = Maps.newHashMap(); // FIXME make distributed?
82 private final Set<ApplicationId> myTopics = Sets.newHashSet();
83 private final Map<ApplicationId, IntentOperations> outstandingOps
84 = Maps.newHashMap();
85
86 @Activate
87 public void activate() {
88 theInstance = storeService.getHazelcastInstance();
89 localControllerNode = clusterService.getLocalNode();
90 leadershipService.addListener(leaderListener);
91
92 serializer = new KryoSerializer() {
93
94 @Override
95 protected void setupKryoPool() {
96 serializerPool = KryoNamespace.newBuilder()
97 .setRegistrationRequired(false)
98 .register(KryoNamespaces.API)
99 .nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID)
100 .build();
101 }
102
103 };
104 log.info("Started");
105 }
106
107 @Deactivate
108 public void deactivate() {
109 leadershipService.removeListener(leaderListener);
110 log.info("Stopped");
111 }
112
113 public static String getTopic(ApplicationId appId) {
114 return TOPIC_BASE + checkNotNull(appId.id());
115 }
116
117 public ApplicationId getAppId(String topic) {
118 checkState(topic.startsWith(TOPIC_BASE),
119 "Trying to get app id for invalid topic: {}", topic);
120 Short id = Short.parseShort(topic.substring(TOPIC_BASE.length()));
121 return coreService.getAppId(id);
122 }
123
124 private SQueue<IntentOperations> getQueue(ApplicationId appId) {
125 SQueue<IntentOperations> queue = batchQueues.get(appId);
126 if (queue == null) {
127 synchronized (this) {
128 // FIXME how will other instances find out about new queues
129 String topic = getTopic(appId);
130 IQueue<byte[]> rawQueue = theInstance.getQueue(topic);
131 queue = new SQueue<>(rawQueue, serializer);
132 batchQueues.putIfAbsent(appId, queue);
133 // TODO others should run for leadership when they hear about this topic
134 leadershipService.runForLeadership(topic);
135 }
136 }
137 return queue;
138 }
139
140 @Override
141 public void addIntentOperations(IntentOperations ops) {
142 checkNotNull(ops, "Intent operations cannot be null.");
143 ApplicationId appId = ops.appId();
144 getQueue(appId).add(ops); // TODO consider using put here
145 dispatchNextOperation(appId);
146 }
147
148 @Override
149 public void removeIntentOperations(IntentOperations ops) {
150 ApplicationId appId = ops.appId();
151 synchronized (this) {
152 checkState(outstandingOps.get(appId).equals(ops),
153 "Operations not found.");
154 SQueue<IntentOperations> queue = batchQueues.get(appId);
155 // TODO consider alternatives to remove
156 checkState(queue.remove().equals(ops),
157 "Operations are wrong.");
158 outstandingOps.remove(appId);
159 dispatchNextOperation(appId);
160 }
161 }
162
163 /**
164 * Dispatches the next available operations to the delegate, unless
165 * we are not the leader for this application id or there is an
166 * outstanding operations for this application id.
167 *
168 * @param appId application id
169 */
170 private void dispatchNextOperation(ApplicationId appId) {
171 synchronized (this) {
172 if (!myTopics.contains(appId) ||
173 outstandingOps.containsKey(appId)) {
174 return;
175 }
176 IntentOperations ops = batchQueues.get(appId).peek();
177 if (ops != null) {
178 outstandingOps.put(appId, ops);
179 delegate.execute(ops);
180 }
181 }
182 }
183
184 /**
185 * Record the leadership change for the given topic. If we have become the
186 * leader, then dispatch the next operations. If we have lost leadership,
187 * then cancel the last operations.
188 *
189 * @param topic topic based on application id
190 * @param leader true if we have become the leader, false otherwise
191 */
192 private void leaderChanged(String topic, boolean leader) {
193 ApplicationId appId = getAppId(topic);
194 //TODO we are using the event caller's thread, should we use our own?
195 synchronized (this) {
196 if (leader) {
197 myTopics.add(appId);
198 checkState(!outstandingOps.containsKey(appId),
199 "Existing intent ops for app id: {}", appId);
200 dispatchNextOperation(appId);
201 } else {
202 myTopics.remove(appId);
203 IntentOperations ops = outstandingOps.get(appId);
204 if (ops != null) {
205 delegate.cancel(ops);
206 }
207 outstandingOps.remove(appId);
208 }
209 }
210 }
211
212 private class InternalLeaderListener implements LeadershipEventListener {
213 @Override
214 public void event(LeadershipEvent event) {
Yuta HIGUCHI47b2f552014-11-28 20:13:15 -0800215 log.trace("Leadership Event: time = {} type = {} event = {}",
Brian O'Connor72a034c2014-11-26 18:24:23 -0800216 event.time(), event.type(), event);
217
218 String topic = event.subject().topic();
219 if (!topic.startsWith(TOPIC_BASE)) {
220 return; // Not our topic: ignore
221 }
222 if (!event.subject().leader().id().equals(localControllerNode.id())) {
223 return; // The event is not about this instance: ignore
224 }
225
226 switch (event.type()) {
227 case LEADER_ELECTED:
228 log.info("Elected leader for app {}", getAppId(topic));
229 leaderChanged(topic, true);
230 break;
231 case LEADER_BOOTED:
232 log.info("Lost leader election for app {}", getAppId(topic));
233 leaderChanged(topic, false);
234 break;
235 case LEADER_REELECTED:
236 break;
237 default:
238 break;
239 }
240 }
241 }
242
243 @Override
244 public Set<IntentOperations> getPendingOperations() {
245 Set<IntentOperations> ops = Sets.newHashSet();
246 synchronized (this) {
247 for (SQueue<IntentOperations> queue : batchQueues.values()) {
248 ops.addAll(queue);
249 }
250 return ops;
251 }
252 }
253
254 @Override
255 public Set<IntentOperations> getCurrentOperations() {
256 //FIXME this is not really implemented
257 return Collections.emptySet();
258 }
259
260 @Override
261 public boolean isLocalLeader(ApplicationId applicationId) {
262 return myTopics.contains(applicationId);
263 }
264
265 @Override
266 public void setDelegate(IntentBatchDelegate delegate) {
267 this.delegate = checkNotNull(delegate, "Delegate cannot be null");
268 }
269
270 @Override
271 public void unsetDelegate(IntentBatchDelegate delegate) {
272 if (this.delegate != null && this.delegate.equals(delegate)) {
273 this.delegate = null;
274 }
275 }
276}