blob: 085e9b706ea48539f3de6bd94147e5396ca68059 [file] [log] [blame]
Brian O'Connor72a034c2014-11-26 18:24:23 -08001/*
2 * Copyright 2014 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Brian O'Connorabafb502014-12-02 22:26:20 -080016package org.onosproject.store.intent.impl;
Brian O'Connor72a034c2014-11-26 18:24:23 -080017
Madan Jampani8d21c792014-12-01 16:31:07 -080018import static com.google.common.base.Preconditions.checkNotNull;
19import static com.google.common.base.Preconditions.checkState;
20import static org.slf4j.LoggerFactory.getLogger;
21
22import java.util.Map;
Praseed Balakrishnan1516f482014-12-03 16:33:44 -080023import java.util.Objects;
Madan Jampani8d21c792014-12-01 16:31:07 -080024import java.util.Set;
alshabiba9819bf2014-11-30 18:15:52 -080025
Brian O'Connor72a034c2014-11-26 18:24:23 -080026import org.apache.felix.scr.annotations.Activate;
27import org.apache.felix.scr.annotations.Component;
28import org.apache.felix.scr.annotations.Deactivate;
29import org.apache.felix.scr.annotations.Reference;
30import org.apache.felix.scr.annotations.ReferenceCardinality;
31import org.apache.felix.scr.annotations.Service;
Brian O'Connorabafb502014-12-02 22:26:20 -080032import org.onosproject.cluster.ClusterService;
33import org.onosproject.cluster.LeadershipEvent;
34import org.onosproject.cluster.LeadershipEventListener;
35import org.onosproject.cluster.LeadershipService;
36import org.onosproject.cluster.NodeId;
37import org.onosproject.core.ApplicationId;
38import org.onosproject.core.CoreService;
39import org.onosproject.event.AbstractListenerRegistry;
40import org.onosproject.event.EventDeliveryService;
41import org.onosproject.net.intent.IntentBatchDelegate;
42import org.onosproject.net.intent.IntentBatchLeaderEvent;
43import org.onosproject.net.intent.IntentBatchListener;
44import org.onosproject.net.intent.IntentBatchService;
45import org.onosproject.net.intent.IntentOperations;
46import org.onosproject.store.hz.SQueue;
47import org.onosproject.store.hz.StoreService;
48import org.onosproject.store.serializers.KryoNamespaces;
49import org.onosproject.store.serializers.KryoSerializer;
50import org.onosproject.store.serializers.StoreSerializer;
Brian O'Connor72a034c2014-11-26 18:24:23 -080051import org.onlab.util.KryoNamespace;
52import org.slf4j.Logger;
53
Madan Jampani8d21c792014-12-01 16:31:07 -080054import com.google.common.collect.Maps;
55import com.google.common.collect.Sets;
56import com.hazelcast.core.HazelcastInstance;
57import com.hazelcast.core.IQueue;
58import com.hazelcast.core.ItemEvent;
59import com.hazelcast.core.ItemListener;
Brian O'Connor72a034c2014-11-26 18:24:23 -080060
61@Component(immediate = true)
62@Service
63public class HazelcastIntentBatchQueue
64 implements IntentBatchService {
65
66 private final Logger log = getLogger(getClass());
67 private static final String TOPIC_BASE = "intent-batch-";
68
69 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
70 protected CoreService coreService;
71
72 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
73 protected ClusterService clusterService;
74
75 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
76 protected LeadershipService leadershipService;
77
78 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
79 protected StoreService storeService;
80
Brian O'Connor86f6f7f2014-12-01 17:02:45 -080081 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
82 protected EventDeliveryService eventDispatcher;
83
84
Brian O'Connor72a034c2014-11-26 18:24:23 -080085 private HazelcastInstance theInstance;
Madan Jampani8d21c792014-12-01 16:31:07 -080086 private NodeId localControllerNodeId;
Brian O'Connor72a034c2014-11-26 18:24:23 -080087 protected StoreSerializer serializer;
88 private IntentBatchDelegate delegate;
89 private InternalLeaderListener leaderListener = new InternalLeaderListener();
90 private final Map<ApplicationId, SQueue<IntentOperations>> batchQueues
Brian O'Connor44008532014-12-04 16:41:36 -080091 = Maps.newHashMap();
Brian O'Connor72a034c2014-11-26 18:24:23 -080092 private final Set<ApplicationId> myTopics = Sets.newHashSet();
93 private final Map<ApplicationId, IntentOperations> outstandingOps
94 = Maps.newHashMap();
95
Brian O'Connor86f6f7f2014-12-01 17:02:45 -080096 private final AbstractListenerRegistry<IntentBatchLeaderEvent, IntentBatchListener>
97 listenerRegistry = new AbstractListenerRegistry<>();
98
Brian O'Connor72a034c2014-11-26 18:24:23 -080099 @Activate
100 public void activate() {
101 theInstance = storeService.getHazelcastInstance();
Madan Jampani8d21c792014-12-01 16:31:07 -0800102 localControllerNodeId = clusterService.getLocalNode().id();
Brian O'Connor72a034c2014-11-26 18:24:23 -0800103 leadershipService.addListener(leaderListener);
104
105 serializer = new KryoSerializer() {
106
107 @Override
108 protected void setupKryoPool() {
109 serializerPool = KryoNamespace.newBuilder()
110 .setRegistrationRequired(false)
111 .register(KryoNamespaces.API)
112 .nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID)
113 .build();
114 }
115
116 };
Brian O'Connor86f6f7f2014-12-01 17:02:45 -0800117 eventDispatcher.addSink(IntentBatchLeaderEvent.class, listenerRegistry);
Brian O'Connor72a034c2014-11-26 18:24:23 -0800118 log.info("Started");
119 }
120
121 @Deactivate
122 public void deactivate() {
Brian O'Connor86f6f7f2014-12-01 17:02:45 -0800123 eventDispatcher.removeSink(IntentBatchLeaderEvent.class);
Brian O'Connor72a034c2014-11-26 18:24:23 -0800124 leadershipService.removeListener(leaderListener);
alshabiba9819bf2014-11-30 18:15:52 -0800125 for (ApplicationId appId: batchQueues.keySet()) {
126 leadershipService.withdraw(getTopic(appId));
127 }
Brian O'Connor72a034c2014-11-26 18:24:23 -0800128 log.info("Stopped");
129 }
130
131 public static String getTopic(ApplicationId appId) {
132 return TOPIC_BASE + checkNotNull(appId.id());
133 }
134
135 public ApplicationId getAppId(String topic) {
136 checkState(topic.startsWith(TOPIC_BASE),
137 "Trying to get app id for invalid topic: {}", topic);
138 Short id = Short.parseShort(topic.substring(TOPIC_BASE.length()));
139 return coreService.getAppId(id);
140 }
141
142 private SQueue<IntentOperations> getQueue(ApplicationId appId) {
143 SQueue<IntentOperations> queue = batchQueues.get(appId);
144 if (queue == null) {
145 synchronized (this) {
Brian O'Connor72a034c2014-11-26 18:24:23 -0800146 String topic = getTopic(appId);
147 IQueue<byte[]> rawQueue = theInstance.getQueue(topic);
148 queue = new SQueue<>(rawQueue, serializer);
alshabiba9819bf2014-11-30 18:15:52 -0800149 queue.addItemListener(new InternalItemListener(appId), false);
Brian O'Connor72a034c2014-11-26 18:24:23 -0800150 batchQueues.putIfAbsent(appId, queue);
Brian O'Connor72a034c2014-11-26 18:24:23 -0800151 leadershipService.runForLeadership(topic);
152 }
153 }
154 return queue;
155 }
156
157 @Override
158 public void addIntentOperations(IntentOperations ops) {
159 checkNotNull(ops, "Intent operations cannot be null.");
160 ApplicationId appId = ops.appId();
Brian O'Connor44008532014-12-04 16:41:36 -0800161 getQueue(appId).add(ops);
Brian O'Connor72a034c2014-11-26 18:24:23 -0800162 dispatchNextOperation(appId);
163 }
164
165 @Override
166 public void removeIntentOperations(IntentOperations ops) {
167 ApplicationId appId = ops.appId();
168 synchronized (this) {
Praseed Balakrishnan1516f482014-12-03 16:33:44 -0800169 IntentOperations outstanding = outstandingOps.get(appId);
170 if (outstanding != null) {
171 checkState(Objects.equals(ops, outstanding),
172 "Operation {} does not match outstanding operation {}",
173 ops, outstanding);
174 } else {
175 log.warn("Operation {} not found", ops);
176 }
Brian O'Connor72a034c2014-11-26 18:24:23 -0800177 SQueue<IntentOperations> queue = batchQueues.get(appId);
Brian O'Connor72a034c2014-11-26 18:24:23 -0800178 checkState(queue.remove().equals(ops),
179 "Operations are wrong.");
180 outstandingOps.remove(appId);
181 dispatchNextOperation(appId);
182 }
183 }
184
185 /**
186 * Dispatches the next available operations to the delegate, unless
187 * we are not the leader for this application id or there is an
188 * outstanding operations for this application id.
189 *
190 * @param appId application id
191 */
192 private void dispatchNextOperation(ApplicationId appId) {
193 synchronized (this) {
194 if (!myTopics.contains(appId) ||
195 outstandingOps.containsKey(appId)) {
196 return;
197 }
198 IntentOperations ops = batchQueues.get(appId).peek();
199 if (ops != null) {
200 outstandingOps.put(appId, ops);
201 delegate.execute(ops);
202 }
203 }
204 }
205
206 /**
207 * Record the leadership change for the given topic. If we have become the
208 * leader, then dispatch the next operations. If we have lost leadership,
209 * then cancel the last operations.
210 *
211 * @param topic topic based on application id
212 * @param leader true if we have become the leader, false otherwise
213 */
214 private void leaderChanged(String topic, boolean leader) {
215 ApplicationId appId = getAppId(topic);
Brian O'Connor72a034c2014-11-26 18:24:23 -0800216 synchronized (this) {
217 if (leader) {
218 myTopics.add(appId);
219 checkState(!outstandingOps.containsKey(appId),
220 "Existing intent ops for app id: {}", appId);
221 dispatchNextOperation(appId);
222 } else {
223 myTopics.remove(appId);
224 IntentOperations ops = outstandingOps.get(appId);
225 if (ops != null) {
226 delegate.cancel(ops);
227 }
228 outstandingOps.remove(appId);
229 }
230 }
231 }
232
alshabiba9819bf2014-11-30 18:15:52 -0800233 private class InternalItemListener implements ItemListener<IntentOperations> {
234
235 private final ApplicationId appId;
236
237 public InternalItemListener(ApplicationId appId) {
238 this.appId = appId;
239 }
240
241 @Override
242 public void itemAdded(ItemEvent<IntentOperations> item) {
243 dispatchNextOperation(appId);
244 }
245
246 @Override
247 public void itemRemoved(ItemEvent<IntentOperations> item) {
248 // no-op
249 }
250 }
251
Brian O'Connor72a034c2014-11-26 18:24:23 -0800252 private class InternalLeaderListener implements LeadershipEventListener {
253 @Override
254 public void event(LeadershipEvent event) {
Yuta HIGUCHI47b2f552014-11-28 20:13:15 -0800255 log.trace("Leadership Event: time = {} type = {} event = {}",
Brian O'Connor72a034c2014-11-26 18:24:23 -0800256 event.time(), event.type(), event);
257
258 String topic = event.subject().topic();
259 if (!topic.startsWith(TOPIC_BASE)) {
260 return; // Not our topic: ignore
261 }
Madan Jampani8d21c792014-12-01 16:31:07 -0800262 if (!event.subject().leader().equals(localControllerNodeId)) {
alshabiba9819bf2014-11-30 18:15:52 -0800263 // run for leadership
264 getQueue(getAppId(topic));
Brian O'Connor72a034c2014-11-26 18:24:23 -0800265 return; // The event is not about this instance: ignore
266 }
267
268 switch (event.type()) {
269 case LEADER_ELECTED:
270 log.info("Elected leader for app {}", getAppId(topic));
271 leaderChanged(topic, true);
272 break;
273 case LEADER_BOOTED:
274 log.info("Lost leader election for app {}", getAppId(topic));
275 leaderChanged(topic, false);
276 break;
277 case LEADER_REELECTED:
278 break;
279 default:
280 break;
281 }
282 }
283 }
284
285 @Override
286 public Set<IntentOperations> getPendingOperations() {
287 Set<IntentOperations> ops = Sets.newHashSet();
288 synchronized (this) {
289 for (SQueue<IntentOperations> queue : batchQueues.values()) {
290 ops.addAll(queue);
291 }
292 return ops;
293 }
294 }
295
296 @Override
Brian O'Connor72a034c2014-11-26 18:24:23 -0800297 public boolean isLocalLeader(ApplicationId applicationId) {
298 return myTopics.contains(applicationId);
299 }
300
301 @Override
302 public void setDelegate(IntentBatchDelegate delegate) {
303 this.delegate = checkNotNull(delegate, "Delegate cannot be null");
304 }
305
306 @Override
307 public void unsetDelegate(IntentBatchDelegate delegate) {
308 if (this.delegate != null && this.delegate.equals(delegate)) {
309 this.delegate = null;
310 }
311 }
Brian O'Connor86f6f7f2014-12-01 17:02:45 -0800312
313 @Override
314 public void addListener(IntentBatchListener listener) {
315 listenerRegistry.addListener(listener);
316 }
317
318 @Override
319 public void removeListener(IntentBatchListener listener) {
320 listenerRegistry.removeListener(listener);
321 }
Brian O'Connor72a034c2014-11-26 18:24:23 -0800322}