blob: 3f52cd185fbd4d47d61b28b9228d95645211c5ca [file] [log] [blame]
Brian O'Connor72a034c2014-11-26 18:24:23 -08001/*
2 * Copyright 2014 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Brian O'Connorabafb502014-12-02 22:26:20 -080016package org.onosproject.store.intent.impl;
Brian O'Connor72a034c2014-11-26 18:24:23 -080017
Madan Jampani8d21c792014-12-01 16:31:07 -080018import static com.google.common.base.Preconditions.checkNotNull;
19import static com.google.common.base.Preconditions.checkState;
20import static org.slf4j.LoggerFactory.getLogger;
21
22import java.util.Map;
Praseed Balakrishnan1516f482014-12-03 16:33:44 -080023import java.util.Objects;
Madan Jampani8d21c792014-12-01 16:31:07 -080024import java.util.Set;
alshabiba9819bf2014-11-30 18:15:52 -080025
Brian O'Connor72a034c2014-11-26 18:24:23 -080026import org.apache.felix.scr.annotations.Activate;
27import org.apache.felix.scr.annotations.Component;
28import org.apache.felix.scr.annotations.Deactivate;
29import org.apache.felix.scr.annotations.Reference;
30import org.apache.felix.scr.annotations.ReferenceCardinality;
31import org.apache.felix.scr.annotations.Service;
Brian O'Connorabafb502014-12-02 22:26:20 -080032import org.onosproject.cluster.ClusterService;
33import org.onosproject.cluster.LeadershipEvent;
34import org.onosproject.cluster.LeadershipEventListener;
35import org.onosproject.cluster.LeadershipService;
36import org.onosproject.cluster.NodeId;
37import org.onosproject.core.ApplicationId;
38import org.onosproject.core.CoreService;
39import org.onosproject.event.AbstractListenerRegistry;
40import org.onosproject.event.EventDeliveryService;
41import org.onosproject.net.intent.IntentBatchDelegate;
42import org.onosproject.net.intent.IntentBatchLeaderEvent;
43import org.onosproject.net.intent.IntentBatchListener;
44import org.onosproject.net.intent.IntentBatchService;
45import org.onosproject.net.intent.IntentOperations;
46import org.onosproject.store.hz.SQueue;
47import org.onosproject.store.hz.StoreService;
48import org.onosproject.store.serializers.KryoNamespaces;
49import org.onosproject.store.serializers.KryoSerializer;
50import org.onosproject.store.serializers.StoreSerializer;
Brian O'Connor72a034c2014-11-26 18:24:23 -080051import org.onlab.util.KryoNamespace;
52import org.slf4j.Logger;
53
Madan Jampani8d21c792014-12-01 16:31:07 -080054import com.google.common.collect.Maps;
55import com.google.common.collect.Sets;
56import com.hazelcast.core.HazelcastInstance;
57import com.hazelcast.core.IQueue;
58import com.hazelcast.core.ItemEvent;
59import com.hazelcast.core.ItemListener;
Brian O'Connor72a034c2014-11-26 18:24:23 -080060
61@Component(immediate = true)
62@Service
63public class HazelcastIntentBatchQueue
64 implements IntentBatchService {
65
66 private final Logger log = getLogger(getClass());
67 private static final String TOPIC_BASE = "intent-batch-";
68
69 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
70 protected CoreService coreService;
71
72 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
73 protected ClusterService clusterService;
74
75 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
76 protected LeadershipService leadershipService;
77
78 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
79 protected StoreService storeService;
80
Brian O'Connor86f6f7f2014-12-01 17:02:45 -080081 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
82 protected EventDeliveryService eventDispatcher;
83
84
Brian O'Connor72a034c2014-11-26 18:24:23 -080085 private HazelcastInstance theInstance;
Madan Jampani8d21c792014-12-01 16:31:07 -080086 private NodeId localControllerNodeId;
Brian O'Connor72a034c2014-11-26 18:24:23 -080087 protected StoreSerializer serializer;
88 private IntentBatchDelegate delegate;
89 private InternalLeaderListener leaderListener = new InternalLeaderListener();
90 private final Map<ApplicationId, SQueue<IntentOperations>> batchQueues
91 = Maps.newHashMap(); // FIXME make distributed?
92 private final Set<ApplicationId> myTopics = Sets.newHashSet();
93 private final Map<ApplicationId, IntentOperations> outstandingOps
94 = Maps.newHashMap();
95
Brian O'Connor86f6f7f2014-12-01 17:02:45 -080096 private final AbstractListenerRegistry<IntentBatchLeaderEvent, IntentBatchListener>
97 listenerRegistry = new AbstractListenerRegistry<>();
98
Brian O'Connor72a034c2014-11-26 18:24:23 -080099 @Activate
100 public void activate() {
101 theInstance = storeService.getHazelcastInstance();
Madan Jampani8d21c792014-12-01 16:31:07 -0800102 localControllerNodeId = clusterService.getLocalNode().id();
Brian O'Connor72a034c2014-11-26 18:24:23 -0800103 leadershipService.addListener(leaderListener);
104
105 serializer = new KryoSerializer() {
106
107 @Override
108 protected void setupKryoPool() {
109 serializerPool = KryoNamespace.newBuilder()
110 .setRegistrationRequired(false)
111 .register(KryoNamespaces.API)
112 .nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID)
113 .build();
114 }
115
116 };
Brian O'Connor86f6f7f2014-12-01 17:02:45 -0800117 eventDispatcher.addSink(IntentBatchLeaderEvent.class, listenerRegistry);
Brian O'Connor72a034c2014-11-26 18:24:23 -0800118 log.info("Started");
119 }
120
121 @Deactivate
122 public void deactivate() {
Brian O'Connor86f6f7f2014-12-01 17:02:45 -0800123 eventDispatcher.removeSink(IntentBatchLeaderEvent.class);
Brian O'Connor72a034c2014-11-26 18:24:23 -0800124 leadershipService.removeListener(leaderListener);
alshabiba9819bf2014-11-30 18:15:52 -0800125 for (ApplicationId appId: batchQueues.keySet()) {
126 leadershipService.withdraw(getTopic(appId));
127 }
Brian O'Connor72a034c2014-11-26 18:24:23 -0800128 log.info("Stopped");
129 }
130
131 public static String getTopic(ApplicationId appId) {
132 return TOPIC_BASE + checkNotNull(appId.id());
133 }
134
135 public ApplicationId getAppId(String topic) {
136 checkState(topic.startsWith(TOPIC_BASE),
137 "Trying to get app id for invalid topic: {}", topic);
138 Short id = Short.parseShort(topic.substring(TOPIC_BASE.length()));
139 return coreService.getAppId(id);
140 }
141
142 private SQueue<IntentOperations> getQueue(ApplicationId appId) {
143 SQueue<IntentOperations> queue = batchQueues.get(appId);
144 if (queue == null) {
145 synchronized (this) {
Brian O'Connor72a034c2014-11-26 18:24:23 -0800146 String topic = getTopic(appId);
147 IQueue<byte[]> rawQueue = theInstance.getQueue(topic);
148 queue = new SQueue<>(rawQueue, serializer);
alshabiba9819bf2014-11-30 18:15:52 -0800149 queue.addItemListener(new InternalItemListener(appId), false);
Brian O'Connor72a034c2014-11-26 18:24:23 -0800150 batchQueues.putIfAbsent(appId, queue);
Brian O'Connor72a034c2014-11-26 18:24:23 -0800151 leadershipService.runForLeadership(topic);
152 }
153 }
154 return queue;
155 }
156
157 @Override
158 public void addIntentOperations(IntentOperations ops) {
159 checkNotNull(ops, "Intent operations cannot be null.");
160 ApplicationId appId = ops.appId();
161 getQueue(appId).add(ops); // TODO consider using put here
162 dispatchNextOperation(appId);
163 }
164
165 @Override
166 public void removeIntentOperations(IntentOperations ops) {
167 ApplicationId appId = ops.appId();
168 synchronized (this) {
Praseed Balakrishnan1516f482014-12-03 16:33:44 -0800169 IntentOperations outstanding = outstandingOps.get(appId);
170 if (outstanding != null) {
171 checkState(Objects.equals(ops, outstanding),
172 "Operation {} does not match outstanding operation {}",
173 ops, outstanding);
174 } else {
175 log.warn("Operation {} not found", ops);
176 }
Brian O'Connor72a034c2014-11-26 18:24:23 -0800177 SQueue<IntentOperations> queue = batchQueues.get(appId);
178 // TODO consider alternatives to remove
179 checkState(queue.remove().equals(ops),
180 "Operations are wrong.");
181 outstandingOps.remove(appId);
182 dispatchNextOperation(appId);
183 }
184 }
185
186 /**
187 * Dispatches the next available operations to the delegate, unless
188 * we are not the leader for this application id or there is an
189 * outstanding operations for this application id.
190 *
191 * @param appId application id
192 */
193 private void dispatchNextOperation(ApplicationId appId) {
194 synchronized (this) {
195 if (!myTopics.contains(appId) ||
196 outstandingOps.containsKey(appId)) {
197 return;
198 }
199 IntentOperations ops = batchQueues.get(appId).peek();
200 if (ops != null) {
201 outstandingOps.put(appId, ops);
202 delegate.execute(ops);
203 }
204 }
205 }
206
207 /**
208 * Record the leadership change for the given topic. If we have become the
209 * leader, then dispatch the next operations. If we have lost leadership,
210 * then cancel the last operations.
211 *
212 * @param topic topic based on application id
213 * @param leader true if we have become the leader, false otherwise
214 */
215 private void leaderChanged(String topic, boolean leader) {
216 ApplicationId appId = getAppId(topic);
217 //TODO we are using the event caller's thread, should we use our own?
218 synchronized (this) {
219 if (leader) {
220 myTopics.add(appId);
221 checkState(!outstandingOps.containsKey(appId),
222 "Existing intent ops for app id: {}", appId);
223 dispatchNextOperation(appId);
224 } else {
225 myTopics.remove(appId);
226 IntentOperations ops = outstandingOps.get(appId);
227 if (ops != null) {
228 delegate.cancel(ops);
229 }
230 outstandingOps.remove(appId);
231 }
232 }
233 }
234
alshabiba9819bf2014-11-30 18:15:52 -0800235 private class InternalItemListener implements ItemListener<IntentOperations> {
236
237 private final ApplicationId appId;
238
239 public InternalItemListener(ApplicationId appId) {
240 this.appId = appId;
241 }
242
243 @Override
244 public void itemAdded(ItemEvent<IntentOperations> item) {
245 dispatchNextOperation(appId);
246 }
247
248 @Override
249 public void itemRemoved(ItemEvent<IntentOperations> item) {
250 // no-op
251 }
252 }
253
Brian O'Connor72a034c2014-11-26 18:24:23 -0800254 private class InternalLeaderListener implements LeadershipEventListener {
255 @Override
256 public void event(LeadershipEvent event) {
Yuta HIGUCHI47b2f552014-11-28 20:13:15 -0800257 log.trace("Leadership Event: time = {} type = {} event = {}",
Brian O'Connor72a034c2014-11-26 18:24:23 -0800258 event.time(), event.type(), event);
259
260 String topic = event.subject().topic();
261 if (!topic.startsWith(TOPIC_BASE)) {
262 return; // Not our topic: ignore
263 }
Madan Jampani8d21c792014-12-01 16:31:07 -0800264 if (!event.subject().leader().equals(localControllerNodeId)) {
alshabiba9819bf2014-11-30 18:15:52 -0800265 // run for leadership
266 getQueue(getAppId(topic));
Brian O'Connor72a034c2014-11-26 18:24:23 -0800267 return; // The event is not about this instance: ignore
268 }
269
270 switch (event.type()) {
271 case LEADER_ELECTED:
272 log.info("Elected leader for app {}", getAppId(topic));
273 leaderChanged(topic, true);
274 break;
275 case LEADER_BOOTED:
276 log.info("Lost leader election for app {}", getAppId(topic));
277 leaderChanged(topic, false);
278 break;
279 case LEADER_REELECTED:
280 break;
281 default:
282 break;
283 }
284 }
285 }
286
287 @Override
288 public Set<IntentOperations> getPendingOperations() {
289 Set<IntentOperations> ops = Sets.newHashSet();
290 synchronized (this) {
291 for (SQueue<IntentOperations> queue : batchQueues.values()) {
292 ops.addAll(queue);
293 }
294 return ops;
295 }
296 }
297
298 @Override
Brian O'Connor72a034c2014-11-26 18:24:23 -0800299 public boolean isLocalLeader(ApplicationId applicationId) {
300 return myTopics.contains(applicationId);
301 }
302
303 @Override
304 public void setDelegate(IntentBatchDelegate delegate) {
305 this.delegate = checkNotNull(delegate, "Delegate cannot be null");
306 }
307
308 @Override
309 public void unsetDelegate(IntentBatchDelegate delegate) {
310 if (this.delegate != null && this.delegate.equals(delegate)) {
311 this.delegate = null;
312 }
313 }
Brian O'Connor86f6f7f2014-12-01 17:02:45 -0800314
315 @Override
316 public void addListener(IntentBatchListener listener) {
317 listenerRegistry.addListener(listener);
318 }
319
320 @Override
321 public void removeListener(IntentBatchListener listener) {
322 listenerRegistry.removeListener(listener);
323 }
Brian O'Connor72a034c2014-11-26 18:24:23 -0800324}