blob: 6b4f8027a88fe801025a2f730a74c66d2e734191 [file] [log] [blame]
Brian O'Connor72a034c2014-11-26 18:24:23 -08001/*
2 * Copyright 2014 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onlab.onos.store.intent.impl;
17
18import com.google.common.collect.Maps;
19import com.google.common.collect.Sets;
20import com.hazelcast.core.HazelcastInstance;
21import com.hazelcast.core.IQueue;
22import org.apache.felix.scr.annotations.Activate;
23import org.apache.felix.scr.annotations.Component;
24import org.apache.felix.scr.annotations.Deactivate;
25import org.apache.felix.scr.annotations.Reference;
26import org.apache.felix.scr.annotations.ReferenceCardinality;
27import org.apache.felix.scr.annotations.Service;
28import org.onlab.onos.cluster.ClusterService;
29import org.onlab.onos.cluster.ControllerNode;
30import org.onlab.onos.cluster.LeadershipEvent;
31import org.onlab.onos.cluster.LeadershipEventListener;
32import org.onlab.onos.cluster.LeadershipService;
33import org.onlab.onos.core.ApplicationId;
34import org.onlab.onos.core.CoreService;
35import org.onlab.onos.net.intent.IntentBatchDelegate;
36import org.onlab.onos.net.intent.IntentBatchService;
37import org.onlab.onos.net.intent.IntentOperations;
38import org.onlab.onos.store.hz.SQueue;
39import org.onlab.onos.store.hz.StoreService;
40import org.onlab.onos.store.serializers.KryoNamespaces;
41import org.onlab.onos.store.serializers.KryoSerializer;
42import org.onlab.onos.store.serializers.StoreSerializer;
43import org.onlab.util.KryoNamespace;
44import org.slf4j.Logger;
45
46import java.util.Collections;
47import java.util.Map;
48import java.util.Set;
49
50import static com.google.common.base.Preconditions.checkNotNull;
51import static com.google.common.base.Preconditions.checkState;
52import static org.slf4j.LoggerFactory.getLogger;
53
54@Component(immediate = true)
55@Service
56public class HazelcastIntentBatchQueue
57 implements IntentBatchService {
58
59 private final Logger log = getLogger(getClass());
60 private static final String TOPIC_BASE = "intent-batch-";
61
62 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
63 protected CoreService coreService;
64
65 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
66 protected ClusterService clusterService;
67
68 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
69 protected LeadershipService leadershipService;
70
71 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
72 protected StoreService storeService;
73
74 private HazelcastInstance theInstance;
75 private ControllerNode localControllerNode;
76 protected StoreSerializer serializer;
77 private IntentBatchDelegate delegate;
78 private InternalLeaderListener leaderListener = new InternalLeaderListener();
79 private final Map<ApplicationId, SQueue<IntentOperations>> batchQueues
80 = Maps.newHashMap(); // FIXME make distributed?
81 private final Set<ApplicationId> myTopics = Sets.newHashSet();
82 private final Map<ApplicationId, IntentOperations> outstandingOps
83 = Maps.newHashMap();
84
85 @Activate
86 public void activate() {
87 theInstance = storeService.getHazelcastInstance();
88 localControllerNode = clusterService.getLocalNode();
89 leadershipService.addListener(leaderListener);
90
91 serializer = new KryoSerializer() {
92
93 @Override
94 protected void setupKryoPool() {
95 serializerPool = KryoNamespace.newBuilder()
96 .setRegistrationRequired(false)
97 .register(KryoNamespaces.API)
98 .nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID)
99 .build();
100 }
101
102 };
103 log.info("Started");
104 }
105
106 @Deactivate
107 public void deactivate() {
108 leadershipService.removeListener(leaderListener);
109 log.info("Stopped");
110 }
111
112 public static String getTopic(ApplicationId appId) {
113 return TOPIC_BASE + checkNotNull(appId.id());
114 }
115
116 public ApplicationId getAppId(String topic) {
117 checkState(topic.startsWith(TOPIC_BASE),
118 "Trying to get app id for invalid topic: {}", topic);
119 Short id = Short.parseShort(topic.substring(TOPIC_BASE.length()));
120 return coreService.getAppId(id);
121 }
122
123 private SQueue<IntentOperations> getQueue(ApplicationId appId) {
124 SQueue<IntentOperations> queue = batchQueues.get(appId);
125 if (queue == null) {
126 synchronized (this) {
127 // FIXME how will other instances find out about new queues
128 String topic = getTopic(appId);
129 IQueue<byte[]> rawQueue = theInstance.getQueue(topic);
130 queue = new SQueue<>(rawQueue, serializer);
131 batchQueues.putIfAbsent(appId, queue);
132 // TODO others should run for leadership when they hear about this topic
133 leadershipService.runForLeadership(topic);
134 }
135 }
136 return queue;
137 }
138
139 @Override
140 public void addIntentOperations(IntentOperations ops) {
141 checkNotNull(ops, "Intent operations cannot be null.");
142 ApplicationId appId = ops.appId();
143 getQueue(appId).add(ops); // TODO consider using put here
144 dispatchNextOperation(appId);
145 }
146
147 @Override
148 public void removeIntentOperations(IntentOperations ops) {
149 ApplicationId appId = ops.appId();
150 synchronized (this) {
151 checkState(outstandingOps.get(appId).equals(ops),
152 "Operations not found.");
153 SQueue<IntentOperations> queue = batchQueues.get(appId);
154 // TODO consider alternatives to remove
155 checkState(queue.remove().equals(ops),
156 "Operations are wrong.");
157 outstandingOps.remove(appId);
158 dispatchNextOperation(appId);
159 }
160 }
161
162 /**
163 * Dispatches the next available operations to the delegate, unless
164 * we are not the leader for this application id or there is an
165 * outstanding operations for this application id.
166 *
167 * @param appId application id
168 */
169 private void dispatchNextOperation(ApplicationId appId) {
170 synchronized (this) {
171 if (!myTopics.contains(appId) ||
172 outstandingOps.containsKey(appId)) {
173 return;
174 }
175 IntentOperations ops = batchQueues.get(appId).peek();
176 if (ops != null) {
177 outstandingOps.put(appId, ops);
178 delegate.execute(ops);
179 }
180 }
181 }
182
183 /**
184 * Record the leadership change for the given topic. If we have become the
185 * leader, then dispatch the next operations. If we have lost leadership,
186 * then cancel the last operations.
187 *
188 * @param topic topic based on application id
189 * @param leader true if we have become the leader, false otherwise
190 */
191 private void leaderChanged(String topic, boolean leader) {
192 ApplicationId appId = getAppId(topic);
193 //TODO we are using the event caller's thread, should we use our own?
194 synchronized (this) {
195 if (leader) {
196 myTopics.add(appId);
197 checkState(!outstandingOps.containsKey(appId),
198 "Existing intent ops for app id: {}", appId);
199 dispatchNextOperation(appId);
200 } else {
201 myTopics.remove(appId);
202 IntentOperations ops = outstandingOps.get(appId);
203 if (ops != null) {
204 delegate.cancel(ops);
205 }
206 outstandingOps.remove(appId);
207 }
208 }
209 }
210
211 private class InternalLeaderListener implements LeadershipEventListener {
212 @Override
213 public void event(LeadershipEvent event) {
214 log.debug("Leadership Event: time = {} type = {} event = {}",
215 event.time(), event.type(), event);
216
217 String topic = event.subject().topic();
218 if (!topic.startsWith(TOPIC_BASE)) {
219 return; // Not our topic: ignore
220 }
221 if (!event.subject().leader().id().equals(localControllerNode.id())) {
222 return; // The event is not about this instance: ignore
223 }
224
225 switch (event.type()) {
226 case LEADER_ELECTED:
227 log.info("Elected leader for app {}", getAppId(topic));
228 leaderChanged(topic, true);
229 break;
230 case LEADER_BOOTED:
231 log.info("Lost leader election for app {}", getAppId(topic));
232 leaderChanged(topic, false);
233 break;
234 case LEADER_REELECTED:
235 break;
236 default:
237 break;
238 }
239 }
240 }
241
242 @Override
243 public Set<IntentOperations> getPendingOperations() {
244 Set<IntentOperations> ops = Sets.newHashSet();
245 synchronized (this) {
246 for (SQueue<IntentOperations> queue : batchQueues.values()) {
247 ops.addAll(queue);
248 }
249 return ops;
250 }
251 }
252
253 @Override
254 public Set<IntentOperations> getCurrentOperations() {
255 //FIXME this is not really implemented
256 return Collections.emptySet();
257 }
258
259 @Override
260 public boolean isLocalLeader(ApplicationId applicationId) {
261 return myTopics.contains(applicationId);
262 }
263
264 @Override
265 public void setDelegate(IntentBatchDelegate delegate) {
266 this.delegate = checkNotNull(delegate, "Delegate cannot be null");
267 }
268
269 @Override
270 public void unsetDelegate(IntentBatchDelegate delegate) {
271 if (this.delegate != null && this.delegate.equals(delegate)) {
272 this.delegate = null;
273 }
274 }
275}