blob: 52d166cb733b6985ba8913a2b47f272fa3723a71 [file] [log] [blame]
Brian O'Connor72a034c2014-11-26 18:24:23 -08001/*
2 * Copyright 2014 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onlab.onos.store.intent.impl;
17
18import com.google.common.collect.Maps;
19import com.google.common.collect.Sets;
20import com.hazelcast.core.HazelcastInstance;
21import com.hazelcast.core.IQueue;
alshabiba9819bf2014-11-30 18:15:52 -080022import com.hazelcast.core.ItemEvent;
23import com.hazelcast.core.ItemListener;
24
Brian O'Connor72a034c2014-11-26 18:24:23 -080025import org.apache.felix.scr.annotations.Activate;
26import org.apache.felix.scr.annotations.Component;
27import org.apache.felix.scr.annotations.Deactivate;
28import org.apache.felix.scr.annotations.Reference;
29import org.apache.felix.scr.annotations.ReferenceCardinality;
30import org.apache.felix.scr.annotations.Service;
31import org.onlab.onos.cluster.ClusterService;
32import org.onlab.onos.cluster.ControllerNode;
33import org.onlab.onos.cluster.LeadershipEvent;
34import org.onlab.onos.cluster.LeadershipEventListener;
35import org.onlab.onos.cluster.LeadershipService;
36import org.onlab.onos.core.ApplicationId;
37import org.onlab.onos.core.CoreService;
38import org.onlab.onos.net.intent.IntentBatchDelegate;
39import org.onlab.onos.net.intent.IntentBatchService;
40import org.onlab.onos.net.intent.IntentOperations;
41import org.onlab.onos.store.hz.SQueue;
42import org.onlab.onos.store.hz.StoreService;
43import org.onlab.onos.store.serializers.KryoNamespaces;
44import org.onlab.onos.store.serializers.KryoSerializer;
45import org.onlab.onos.store.serializers.StoreSerializer;
46import org.onlab.util.KryoNamespace;
47import org.slf4j.Logger;
48
49import java.util.Collections;
50import java.util.Map;
51import java.util.Set;
52
53import static com.google.common.base.Preconditions.checkNotNull;
54import static com.google.common.base.Preconditions.checkState;
55import static org.slf4j.LoggerFactory.getLogger;
56
57@Component(immediate = true)
58@Service
59public class HazelcastIntentBatchQueue
60 implements IntentBatchService {
61
62 private final Logger log = getLogger(getClass());
63 private static final String TOPIC_BASE = "intent-batch-";
64
65 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
66 protected CoreService coreService;
67
68 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
69 protected ClusterService clusterService;
70
71 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
72 protected LeadershipService leadershipService;
73
74 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
75 protected StoreService storeService;
76
77 private HazelcastInstance theInstance;
78 private ControllerNode localControllerNode;
79 protected StoreSerializer serializer;
80 private IntentBatchDelegate delegate;
81 private InternalLeaderListener leaderListener = new InternalLeaderListener();
82 private final Map<ApplicationId, SQueue<IntentOperations>> batchQueues
83 = Maps.newHashMap(); // FIXME make distributed?
84 private final Set<ApplicationId> myTopics = Sets.newHashSet();
85 private final Map<ApplicationId, IntentOperations> outstandingOps
86 = Maps.newHashMap();
87
88 @Activate
89 public void activate() {
90 theInstance = storeService.getHazelcastInstance();
91 localControllerNode = clusterService.getLocalNode();
92 leadershipService.addListener(leaderListener);
93
94 serializer = new KryoSerializer() {
95
96 @Override
97 protected void setupKryoPool() {
98 serializerPool = KryoNamespace.newBuilder()
99 .setRegistrationRequired(false)
100 .register(KryoNamespaces.API)
101 .nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID)
102 .build();
103 }
104
105 };
106 log.info("Started");
107 }
108
109 @Deactivate
110 public void deactivate() {
111 leadershipService.removeListener(leaderListener);
alshabiba9819bf2014-11-30 18:15:52 -0800112 for (ApplicationId appId: batchQueues.keySet()) {
113 leadershipService.withdraw(getTopic(appId));
114 }
Brian O'Connor72a034c2014-11-26 18:24:23 -0800115 log.info("Stopped");
116 }
117
118 public static String getTopic(ApplicationId appId) {
119 return TOPIC_BASE + checkNotNull(appId.id());
120 }
121
122 public ApplicationId getAppId(String topic) {
123 checkState(topic.startsWith(TOPIC_BASE),
124 "Trying to get app id for invalid topic: {}", topic);
125 Short id = Short.parseShort(topic.substring(TOPIC_BASE.length()));
126 return coreService.getAppId(id);
127 }
128
129 private SQueue<IntentOperations> getQueue(ApplicationId appId) {
130 SQueue<IntentOperations> queue = batchQueues.get(appId);
131 if (queue == null) {
132 synchronized (this) {
Brian O'Connor72a034c2014-11-26 18:24:23 -0800133 String topic = getTopic(appId);
134 IQueue<byte[]> rawQueue = theInstance.getQueue(topic);
135 queue = new SQueue<>(rawQueue, serializer);
alshabiba9819bf2014-11-30 18:15:52 -0800136 queue.addItemListener(new InternalItemListener(appId), false);
Brian O'Connor72a034c2014-11-26 18:24:23 -0800137 batchQueues.putIfAbsent(appId, queue);
Brian O'Connor72a034c2014-11-26 18:24:23 -0800138 leadershipService.runForLeadership(topic);
139 }
140 }
141 return queue;
142 }
143
144 @Override
145 public void addIntentOperations(IntentOperations ops) {
146 checkNotNull(ops, "Intent operations cannot be null.");
147 ApplicationId appId = ops.appId();
148 getQueue(appId).add(ops); // TODO consider using put here
149 dispatchNextOperation(appId);
150 }
151
152 @Override
153 public void removeIntentOperations(IntentOperations ops) {
154 ApplicationId appId = ops.appId();
155 synchronized (this) {
156 checkState(outstandingOps.get(appId).equals(ops),
157 "Operations not found.");
158 SQueue<IntentOperations> queue = batchQueues.get(appId);
159 // TODO consider alternatives to remove
160 checkState(queue.remove().equals(ops),
161 "Operations are wrong.");
162 outstandingOps.remove(appId);
163 dispatchNextOperation(appId);
164 }
165 }
166
167 /**
168 * Dispatches the next available operations to the delegate, unless
169 * we are not the leader for this application id or there is an
170 * outstanding operations for this application id.
171 *
172 * @param appId application id
173 */
174 private void dispatchNextOperation(ApplicationId appId) {
175 synchronized (this) {
176 if (!myTopics.contains(appId) ||
177 outstandingOps.containsKey(appId)) {
178 return;
179 }
180 IntentOperations ops = batchQueues.get(appId).peek();
181 if (ops != null) {
182 outstandingOps.put(appId, ops);
183 delegate.execute(ops);
184 }
185 }
186 }
187
188 /**
189 * Record the leadership change for the given topic. If we have become the
190 * leader, then dispatch the next operations. If we have lost leadership,
191 * then cancel the last operations.
192 *
193 * @param topic topic based on application id
194 * @param leader true if we have become the leader, false otherwise
195 */
196 private void leaderChanged(String topic, boolean leader) {
197 ApplicationId appId = getAppId(topic);
198 //TODO we are using the event caller's thread, should we use our own?
199 synchronized (this) {
200 if (leader) {
201 myTopics.add(appId);
202 checkState(!outstandingOps.containsKey(appId),
203 "Existing intent ops for app id: {}", appId);
204 dispatchNextOperation(appId);
205 } else {
206 myTopics.remove(appId);
207 IntentOperations ops = outstandingOps.get(appId);
208 if (ops != null) {
209 delegate.cancel(ops);
210 }
211 outstandingOps.remove(appId);
212 }
213 }
214 }
215
alshabiba9819bf2014-11-30 18:15:52 -0800216 private class InternalItemListener implements ItemListener<IntentOperations> {
217
218 private final ApplicationId appId;
219
220 public InternalItemListener(ApplicationId appId) {
221 this.appId = appId;
222 }
223
224 @Override
225 public void itemAdded(ItemEvent<IntentOperations> item) {
226 dispatchNextOperation(appId);
227 }
228
229 @Override
230 public void itemRemoved(ItemEvent<IntentOperations> item) {
231 // no-op
232 }
233 }
234
Brian O'Connor72a034c2014-11-26 18:24:23 -0800235 private class InternalLeaderListener implements LeadershipEventListener {
236 @Override
237 public void event(LeadershipEvent event) {
Yuta HIGUCHI47b2f552014-11-28 20:13:15 -0800238 log.trace("Leadership Event: time = {} type = {} event = {}",
Brian O'Connor72a034c2014-11-26 18:24:23 -0800239 event.time(), event.type(), event);
240
241 String topic = event.subject().topic();
242 if (!topic.startsWith(TOPIC_BASE)) {
243 return; // Not our topic: ignore
244 }
245 if (!event.subject().leader().id().equals(localControllerNode.id())) {
alshabiba9819bf2014-11-30 18:15:52 -0800246 // run for leadership
247 getQueue(getAppId(topic));
Brian O'Connor72a034c2014-11-26 18:24:23 -0800248 return; // The event is not about this instance: ignore
249 }
250
251 switch (event.type()) {
252 case LEADER_ELECTED:
253 log.info("Elected leader for app {}", getAppId(topic));
254 leaderChanged(topic, true);
255 break;
256 case LEADER_BOOTED:
257 log.info("Lost leader election for app {}", getAppId(topic));
258 leaderChanged(topic, false);
259 break;
260 case LEADER_REELECTED:
261 break;
262 default:
263 break;
264 }
265 }
266 }
267
268 @Override
269 public Set<IntentOperations> getPendingOperations() {
270 Set<IntentOperations> ops = Sets.newHashSet();
271 synchronized (this) {
272 for (SQueue<IntentOperations> queue : batchQueues.values()) {
273 ops.addAll(queue);
274 }
275 return ops;
276 }
277 }
278
279 @Override
280 public Set<IntentOperations> getCurrentOperations() {
281 //FIXME this is not really implemented
282 return Collections.emptySet();
283 }
284
285 @Override
286 public boolean isLocalLeader(ApplicationId applicationId) {
287 return myTopics.contains(applicationId);
288 }
289
290 @Override
291 public void setDelegate(IntentBatchDelegate delegate) {
292 this.delegate = checkNotNull(delegate, "Delegate cannot be null");
293 }
294
295 @Override
296 public void unsetDelegate(IntentBatchDelegate delegate) {
297 if (this.delegate != null && this.delegate.equals(delegate)) {
298 this.delegate = null;
299 }
300 }
301}