blob: 9d9b8339ef6a6cd0c2ed5bc5370d9f174ba9b1f4 [file] [log] [blame]
Brian O'Connor72a034c2014-11-26 18:24:23 -08001/*
2 * Copyright 2014 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onlab.onos.store.intent.impl;
17
18import com.google.common.collect.Maps;
19import com.google.common.collect.Sets;
20import com.hazelcast.core.HazelcastInstance;
21import com.hazelcast.core.IQueue;
alshabiba9819bf2014-11-30 18:15:52 -080022import com.hazelcast.core.ItemEvent;
23import com.hazelcast.core.ItemListener;
24
Brian O'Connor72a034c2014-11-26 18:24:23 -080025import org.apache.felix.scr.annotations.Activate;
26import org.apache.felix.scr.annotations.Component;
27import org.apache.felix.scr.annotations.Deactivate;
28import org.apache.felix.scr.annotations.Reference;
29import org.apache.felix.scr.annotations.ReferenceCardinality;
30import org.apache.felix.scr.annotations.Service;
31import org.onlab.onos.cluster.ClusterService;
32import org.onlab.onos.cluster.ControllerNode;
33import org.onlab.onos.cluster.LeadershipEvent;
34import org.onlab.onos.cluster.LeadershipEventListener;
35import org.onlab.onos.cluster.LeadershipService;
36import org.onlab.onos.core.ApplicationId;
37import org.onlab.onos.core.CoreService;
Brian O'Connor86f6f7f2014-12-01 17:02:45 -080038import org.onlab.onos.event.AbstractListenerRegistry;
39import org.onlab.onos.event.EventDeliveryService;
Brian O'Connor72a034c2014-11-26 18:24:23 -080040import org.onlab.onos.net.intent.IntentBatchDelegate;
Brian O'Connor86f6f7f2014-12-01 17:02:45 -080041import org.onlab.onos.net.intent.IntentBatchLeaderEvent;
42import org.onlab.onos.net.intent.IntentBatchListener;
Brian O'Connor72a034c2014-11-26 18:24:23 -080043import org.onlab.onos.net.intent.IntentBatchService;
44import org.onlab.onos.net.intent.IntentOperations;
45import org.onlab.onos.store.hz.SQueue;
46import org.onlab.onos.store.hz.StoreService;
47import org.onlab.onos.store.serializers.KryoNamespaces;
48import org.onlab.onos.store.serializers.KryoSerializer;
49import org.onlab.onos.store.serializers.StoreSerializer;
50import org.onlab.util.KryoNamespace;
51import org.slf4j.Logger;
52
Brian O'Connor72a034c2014-11-26 18:24:23 -080053import java.util.Map;
54import java.util.Set;
55
56import static com.google.common.base.Preconditions.checkNotNull;
57import static com.google.common.base.Preconditions.checkState;
58import static org.slf4j.LoggerFactory.getLogger;
59
60@Component(immediate = true)
61@Service
62public class HazelcastIntentBatchQueue
63 implements IntentBatchService {
64
65 private final Logger log = getLogger(getClass());
66 private static final String TOPIC_BASE = "intent-batch-";
67
68 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
69 protected CoreService coreService;
70
71 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
72 protected ClusterService clusterService;
73
74 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
75 protected LeadershipService leadershipService;
76
77 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
78 protected StoreService storeService;
79
Brian O'Connor86f6f7f2014-12-01 17:02:45 -080080 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
81 protected EventDeliveryService eventDispatcher;
82
83
Brian O'Connor72a034c2014-11-26 18:24:23 -080084 private HazelcastInstance theInstance;
85 private ControllerNode localControllerNode;
86 protected StoreSerializer serializer;
87 private IntentBatchDelegate delegate;
88 private InternalLeaderListener leaderListener = new InternalLeaderListener();
89 private final Map<ApplicationId, SQueue<IntentOperations>> batchQueues
90 = Maps.newHashMap(); // FIXME make distributed?
91 private final Set<ApplicationId> myTopics = Sets.newHashSet();
92 private final Map<ApplicationId, IntentOperations> outstandingOps
93 = Maps.newHashMap();
94
Brian O'Connor86f6f7f2014-12-01 17:02:45 -080095 private final AbstractListenerRegistry<IntentBatchLeaderEvent, IntentBatchListener>
96 listenerRegistry = new AbstractListenerRegistry<>();
97
Brian O'Connor72a034c2014-11-26 18:24:23 -080098 @Activate
99 public void activate() {
100 theInstance = storeService.getHazelcastInstance();
101 localControllerNode = clusterService.getLocalNode();
102 leadershipService.addListener(leaderListener);
103
104 serializer = new KryoSerializer() {
105
106 @Override
107 protected void setupKryoPool() {
108 serializerPool = KryoNamespace.newBuilder()
109 .setRegistrationRequired(false)
110 .register(KryoNamespaces.API)
111 .nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID)
112 .build();
113 }
114
115 };
Brian O'Connor86f6f7f2014-12-01 17:02:45 -0800116 eventDispatcher.addSink(IntentBatchLeaderEvent.class, listenerRegistry);
Brian O'Connor72a034c2014-11-26 18:24:23 -0800117 log.info("Started");
118 }
119
120 @Deactivate
121 public void deactivate() {
Brian O'Connor86f6f7f2014-12-01 17:02:45 -0800122 eventDispatcher.removeSink(IntentBatchLeaderEvent.class);
Brian O'Connor72a034c2014-11-26 18:24:23 -0800123 leadershipService.removeListener(leaderListener);
alshabiba9819bf2014-11-30 18:15:52 -0800124 for (ApplicationId appId: batchQueues.keySet()) {
125 leadershipService.withdraw(getTopic(appId));
126 }
Brian O'Connor72a034c2014-11-26 18:24:23 -0800127 log.info("Stopped");
128 }
129
130 public static String getTopic(ApplicationId appId) {
131 return TOPIC_BASE + checkNotNull(appId.id());
132 }
133
134 public ApplicationId getAppId(String topic) {
135 checkState(topic.startsWith(TOPIC_BASE),
136 "Trying to get app id for invalid topic: {}", topic);
137 Short id = Short.parseShort(topic.substring(TOPIC_BASE.length()));
138 return coreService.getAppId(id);
139 }
140
141 private SQueue<IntentOperations> getQueue(ApplicationId appId) {
142 SQueue<IntentOperations> queue = batchQueues.get(appId);
143 if (queue == null) {
144 synchronized (this) {
Brian O'Connor72a034c2014-11-26 18:24:23 -0800145 String topic = getTopic(appId);
146 IQueue<byte[]> rawQueue = theInstance.getQueue(topic);
147 queue = new SQueue<>(rawQueue, serializer);
alshabiba9819bf2014-11-30 18:15:52 -0800148 queue.addItemListener(new InternalItemListener(appId), false);
Brian O'Connor72a034c2014-11-26 18:24:23 -0800149 batchQueues.putIfAbsent(appId, queue);
Brian O'Connor72a034c2014-11-26 18:24:23 -0800150 leadershipService.runForLeadership(topic);
151 }
152 }
153 return queue;
154 }
155
156 @Override
157 public void addIntentOperations(IntentOperations ops) {
158 checkNotNull(ops, "Intent operations cannot be null.");
159 ApplicationId appId = ops.appId();
160 getQueue(appId).add(ops); // TODO consider using put here
161 dispatchNextOperation(appId);
162 }
163
164 @Override
165 public void removeIntentOperations(IntentOperations ops) {
166 ApplicationId appId = ops.appId();
167 synchronized (this) {
168 checkState(outstandingOps.get(appId).equals(ops),
169 "Operations not found.");
170 SQueue<IntentOperations> queue = batchQueues.get(appId);
171 // TODO consider alternatives to remove
172 checkState(queue.remove().equals(ops),
173 "Operations are wrong.");
174 outstandingOps.remove(appId);
175 dispatchNextOperation(appId);
176 }
177 }
178
179 /**
180 * Dispatches the next available operations to the delegate, unless
181 * we are not the leader for this application id or there is an
182 * outstanding operations for this application id.
183 *
184 * @param appId application id
185 */
186 private void dispatchNextOperation(ApplicationId appId) {
187 synchronized (this) {
188 if (!myTopics.contains(appId) ||
189 outstandingOps.containsKey(appId)) {
190 return;
191 }
192 IntentOperations ops = batchQueues.get(appId).peek();
193 if (ops != null) {
194 outstandingOps.put(appId, ops);
195 delegate.execute(ops);
196 }
197 }
198 }
199
200 /**
201 * Record the leadership change for the given topic. If we have become the
202 * leader, then dispatch the next operations. If we have lost leadership,
203 * then cancel the last operations.
204 *
205 * @param topic topic based on application id
206 * @param leader true if we have become the leader, false otherwise
207 */
208 private void leaderChanged(String topic, boolean leader) {
209 ApplicationId appId = getAppId(topic);
210 //TODO we are using the event caller's thread, should we use our own?
211 synchronized (this) {
212 if (leader) {
213 myTopics.add(appId);
214 checkState(!outstandingOps.containsKey(appId),
215 "Existing intent ops for app id: {}", appId);
216 dispatchNextOperation(appId);
217 } else {
218 myTopics.remove(appId);
219 IntentOperations ops = outstandingOps.get(appId);
220 if (ops != null) {
221 delegate.cancel(ops);
222 }
223 outstandingOps.remove(appId);
224 }
225 }
226 }
227
alshabiba9819bf2014-11-30 18:15:52 -0800228 private class InternalItemListener implements ItemListener<IntentOperations> {
229
230 private final ApplicationId appId;
231
232 public InternalItemListener(ApplicationId appId) {
233 this.appId = appId;
234 }
235
236 @Override
237 public void itemAdded(ItemEvent<IntentOperations> item) {
238 dispatchNextOperation(appId);
239 }
240
241 @Override
242 public void itemRemoved(ItemEvent<IntentOperations> item) {
243 // no-op
244 }
245 }
246
Brian O'Connor72a034c2014-11-26 18:24:23 -0800247 private class InternalLeaderListener implements LeadershipEventListener {
248 @Override
249 public void event(LeadershipEvent event) {
Yuta HIGUCHI47b2f552014-11-28 20:13:15 -0800250 log.trace("Leadership Event: time = {} type = {} event = {}",
Brian O'Connor72a034c2014-11-26 18:24:23 -0800251 event.time(), event.type(), event);
252
253 String topic = event.subject().topic();
254 if (!topic.startsWith(TOPIC_BASE)) {
255 return; // Not our topic: ignore
256 }
257 if (!event.subject().leader().id().equals(localControllerNode.id())) {
alshabiba9819bf2014-11-30 18:15:52 -0800258 // run for leadership
259 getQueue(getAppId(topic));
Brian O'Connor72a034c2014-11-26 18:24:23 -0800260 return; // The event is not about this instance: ignore
261 }
262
263 switch (event.type()) {
264 case LEADER_ELECTED:
265 log.info("Elected leader for app {}", getAppId(topic));
266 leaderChanged(topic, true);
267 break;
268 case LEADER_BOOTED:
269 log.info("Lost leader election for app {}", getAppId(topic));
270 leaderChanged(topic, false);
271 break;
272 case LEADER_REELECTED:
273 break;
274 default:
275 break;
276 }
277 }
278 }
279
280 @Override
281 public Set<IntentOperations> getPendingOperations() {
282 Set<IntentOperations> ops = Sets.newHashSet();
283 synchronized (this) {
284 for (SQueue<IntentOperations> queue : batchQueues.values()) {
285 ops.addAll(queue);
286 }
287 return ops;
288 }
289 }
290
291 @Override
Brian O'Connor72a034c2014-11-26 18:24:23 -0800292 public boolean isLocalLeader(ApplicationId applicationId) {
293 return myTopics.contains(applicationId);
294 }
295
296 @Override
297 public void setDelegate(IntentBatchDelegate delegate) {
298 this.delegate = checkNotNull(delegate, "Delegate cannot be null");
299 }
300
301 @Override
302 public void unsetDelegate(IntentBatchDelegate delegate) {
303 if (this.delegate != null && this.delegate.equals(delegate)) {
304 this.delegate = null;
305 }
306 }
Brian O'Connor86f6f7f2014-12-01 17:02:45 -0800307
308 @Override
309 public void addListener(IntentBatchListener listener) {
310 listenerRegistry.addListener(listener);
311 }
312
313 @Override
314 public void removeListener(IntentBatchListener listener) {
315 listenerRegistry.removeListener(listener);
316 }
Brian O'Connor72a034c2014-11-26 18:24:23 -0800317}