blob: 4f65979df49e0d4448c50d981650f9954eeca155 [file] [log] [blame]
Jonathan Hart5573d322015-01-21 10:13:25 -08001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.intent.impl;
17
18import com.google.common.collect.ImmutableList;
19import com.google.common.collect.Lists;
20import com.google.common.collect.Sets;
21import org.apache.felix.scr.annotations.Activate;
22import org.apache.felix.scr.annotations.Component;
23import org.apache.felix.scr.annotations.Deactivate;
24import org.apache.felix.scr.annotations.Reference;
25import org.apache.felix.scr.annotations.ReferenceCardinality;
26import org.apache.felix.scr.annotations.Service;
27import org.onlab.util.KryoNamespace;
28import org.onosproject.cluster.ClusterService;
29import org.onosproject.cluster.NodeId;
30import org.onosproject.net.intent.BatchWrite;
31import org.onosproject.net.intent.Intent;
32import org.onosproject.net.intent.IntentClockService;
33import org.onosproject.net.intent.IntentEvent;
34import org.onosproject.net.intent.IntentId;
35import org.onosproject.net.intent.IntentState;
36import org.onosproject.net.intent.IntentStore;
37import org.onosproject.net.intent.IntentStoreDelegate;
38import org.onosproject.store.AbstractStore;
39import org.onosproject.store.Timestamp;
40import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
41import org.onosproject.store.cluster.messaging.ClusterMessage;
42import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
43import org.onosproject.store.cluster.messaging.MessageSubject;
44import org.onosproject.store.impl.Timestamped;
45import org.onosproject.store.serializers.KryoSerializer;
46import org.onosproject.store.serializers.impl.DistributedStoreSerializers;
47import org.slf4j.Logger;
48
49import java.io.IOException;
50import java.util.ArrayList;
51import java.util.List;
52import java.util.Set;
53import java.util.concurrent.ConcurrentHashMap;
54import java.util.concurrent.ConcurrentMap;
55import java.util.concurrent.ExecutorService;
56import java.util.concurrent.Executors;
57import java.util.concurrent.ScheduledExecutorService;
58import java.util.concurrent.TimeUnit;
59
60import static com.google.common.base.Preconditions.checkArgument;
61import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
62import static org.onlab.util.Tools.minPriority;
63import static org.onlab.util.Tools.namedThreads;
64import static org.onosproject.net.intent.IntentState.INSTALL_REQ;
65import static org.onosproject.store.intent.impl.GossipIntentStoreMessageSubjects.INTENT_ANTI_ENTROPY_ADVERTISEMENT;
66import static org.onosproject.store.intent.impl.GossipIntentStoreMessageSubjects.INTENT_SET_INSTALLABLES_MSG;
67import static org.onosproject.store.intent.impl.GossipIntentStoreMessageSubjects.INTENT_UPDATED_MSG;
68import static org.slf4j.LoggerFactory.getLogger;
69
70/**
71 * Manages inventory of Intents in a distributed data store that uses optimistic
72 * replication and gossip based techniques.
73 */
74@Component(immediate = true, enabled = false)
75@Service
76public class GossipIntentStore
77 extends AbstractStore<IntentEvent, IntentStoreDelegate>
78 implements IntentStore {
79
80 private final Logger log = getLogger(getClass());
81
82 private final ConcurrentMap<IntentId, Intent> intents =
83 new ConcurrentHashMap<>();
84
85 private final ConcurrentMap<IntentId, Timestamped<IntentState>> intentStates
86 = new ConcurrentHashMap<>();
87
88 private final Set<IntentId> withdrawRequestedIntents
89 = Sets.newConcurrentHashSet();
90
91 private ConcurrentMap<IntentId, Timestamped<List<Intent>>> installables
92 = new ConcurrentHashMap<>();
93
94 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
95 protected IntentClockService intentClockService;
96
97 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
98 protected ClusterCommunicationService clusterCommunicator;
99
100 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
101 protected ClusterService clusterService;
102
103 private static final KryoSerializer SERIALIZER = new KryoSerializer() {
104 @Override
105 protected void setupKryoPool() {
106 serializerPool = KryoNamespace.newBuilder()
107 .register(DistributedStoreSerializers.STORE_COMMON)
108 .nextId(DistributedStoreSerializers.STORE_CUSTOM_BEGIN)
109 .register(InternalIntentEvent.class)
110 .register(InternalSetInstallablesEvent.class)
111 //.register(InternalIntentAntiEntropyEvent.class)
112 //.register(IntentAntiEntropyAdvertisement.class)
113 .build();
114 }
115 };
116
117 private ExecutorService executor;
118
119 private ScheduledExecutorService backgroundExecutor;
120
121 // TODO: Make these anti-entropy params configurable
122 private long initialDelaySec = 5;
123 private long periodSec = 5;
124
125 @Activate
126 public void activate() {
127 clusterCommunicator.addSubscriber(INTENT_UPDATED_MSG,
128 new InternalIntentCreateOrUpdateEventListener());
129 clusterCommunicator.addSubscriber(INTENT_SET_INSTALLABLES_MSG,
130 new InternalIntentSetInstallablesListener());
131 clusterCommunicator.addSubscriber(
132 INTENT_ANTI_ENTROPY_ADVERTISEMENT,
133 new InternalIntentAntiEntropyAdvertisementListener());
134
135 executor = Executors.newCachedThreadPool(namedThreads("intent-fg-%d"));
136
137 backgroundExecutor =
138 newSingleThreadScheduledExecutor(minPriority(namedThreads("intent-bg-%d")));
139
140 // start anti-entropy thread
141 //backgroundExecutor.scheduleAtFixedRate(new SendAdvertisementTask(),
142 //initialDelaySec, periodSec, TimeUnit.SECONDS);
143
144 log.info("Started");
145 }
146
147 @Deactivate
148 public void deactivate() {
149 executor.shutdownNow();
150 backgroundExecutor.shutdownNow();
151 try {
152 if (!backgroundExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
153 log.error("Timeout during executor shutdown");
154 }
155 } catch (InterruptedException e) {
156 log.error("Error during executor shutdown", e);
157 }
158
159 intents.clear();
160
161 log.info("Stopped");
162 }
163
164 @Override
165 public long getIntentCount() {
166 return intents.size();
167 }
168
169 @Override
170 public Iterable<Intent> getIntents() {
171 // TODO don't actually need to copy intents, they are immutable
172 return ImmutableList.copyOf(intents.values());
173 }
174
175 @Override
176 public Intent getIntent(IntentId intentId) {
177 return intents.get(intentId);
178 }
179
180 @Override
181 public IntentState getIntentState(IntentId intentId) {
182 Timestamped<IntentState> state = intentStates.get(intentId);
183 if (state != null) {
184 return state.value();
185 }
186 return null;
187 }
188
189 @Override
190 public void setState(Intent intent, IntentState newState) {
191 // TODO implement
192 }
193
194 private IntentEvent setStateInternal(IntentId intentId, IntentState newState, Timestamp timestamp) {
195 switch (newState) {
196 case WITHDRAW_REQ:
197 withdrawRequestedIntents.add(intentId);
198 break;
199 case INSTALL_REQ:
200 case COMPILING:
201 case INSTALLING:
202 case INSTALLED:
203 case RECOMPILING:
204 case WITHDRAWING:
205 case WITHDRAWN:
206 case FAILED:
207 synchronized (intentStates) {
208 Timestamped<IntentState> existing = intentStates.get(intentId);
209 if (existing == null || !existing.isNewer(timestamp)) {
210 intentStates.put(intentId, new Timestamped<>(newState, timestamp));
211 }
212 }
213 break;
214 default:
215 log.warn("Unknown intent state {}", newState);
216 break;
217 }
218
219 try {
220 // TODO make sure it's OK if the intent is null
221 return IntentEvent.getEvent(newState, intents.get(intentId));
222 } catch (IllegalArgumentException e) {
223 // Transient states can't be used for events, so don't send one
224 return null;
225 }
226 }
227
228 @Override
229 public void setInstallableIntents(IntentId intentId,
230 List<Intent> installableIntents) {
231 // TODO implement
232 }
233
234 private void setInstallableIntentsInternal(IntentId intentId,
235 List<Intent> installableIntents,
236 Timestamp timestamp) {
237 synchronized (installables) {
238 Timestamped<List<Intent>> existing = installables.get(intentId);
239 if (existing == null || !existing.isNewer(timestamp)) {
240 installables.put(intentId,
241 new Timestamped<>(installableIntents, timestamp));
242 }
243 }
244 }
245
246 @Override
247 public List<Intent> getInstallableIntents(IntentId intentId) {
248 Timestamped<List<Intent>> tInstallables = installables.get(intentId);
249 if (tInstallables != null) {
250 return tInstallables.value();
251 }
252 return null;
253 }
254
255 @Override
256 public void removeInstalledIntents(IntentId intentId) {
257 // TODO implement
258 }
259
260 @Override
261 public List<BatchWrite.Operation> batchWrite(BatchWrite batch) {
262
263 List<IntentEvent> events = Lists.newArrayList();
264 List<BatchWrite.Operation> failed = new ArrayList<>();
265
266 for (BatchWrite.Operation op : batch.operations()) {
267 switch (op.type()) {
268 case CREATE_INTENT:
269 checkArgument(op.args().size() == 1,
270 "CREATE_INTENT takes 1 argument. %s", op);
271 Intent intent = op.arg(0);
272
273 events.add(createIntentInternal(intent));
274 notifyPeers(new InternalIntentEvent(
275 intent.id(), intent, INSTALL_REQ, null));
276
277 break;
278 case REMOVE_INTENT:
279 checkArgument(op.args().size() == 1,
280 "REMOVE_INTENT takes 1 argument. %s", op);
281 IntentId intentId = (IntentId) op.arg(0);
282 // TODO implement
283
284 break;
285 case SET_STATE:
286 checkArgument(op.args().size() == 2,
287 "SET_STATE takes 2 arguments. %s", op);
288 intent = op.arg(0);
289 IntentState newState = op.arg(1);
290
291 Timestamp timestamp = intentClockService.getTimestamp(
292 intent.id());
293 IntentEvent externalEvent = setStateInternal(intent.id(), newState, timestamp);
294 events.add(externalEvent);
295 notifyPeers(new InternalIntentEvent(intent.id(), null, newState, timestamp));
296
297 break;
298 case SET_INSTALLABLE:
299 checkArgument(op.args().size() == 2,
300 "SET_INSTALLABLE takes 2 arguments. %s", op);
301 intentId = op.arg(0);
302 List<Intent> installableIntents = op.arg(1);
303
304 Timestamp timestamp1 = intentClockService.getTimestamp(intentId);
305 setInstallableIntentsInternal(
306 intentId, installableIntents, timestamp1);
307
308 notifyPeers(new InternalSetInstallablesEvent(intentId, installableIntents, timestamp1));
309
310 break;
311 case REMOVE_INSTALLED:
312 checkArgument(op.args().size() == 1,
313 "REMOVE_INSTALLED takes 1 argument. %s", op);
314 intentId = op.arg(0);
315 // TODO implement
316 break;
317 default:
318 log.warn("Unknown Operation encountered: {}", op);
319 failed.add(op);
320 break;
321 }
322 }
323
324 notifyDelegate(events);
325 return failed;
326 }
327
328 private IntentEvent createIntentInternal(Intent intent) {
329 Intent oldValue = intents.putIfAbsent(intent.id(), intent);
330 if (oldValue == null) {
331 return IntentEvent.getEvent(INSTALL_REQ, intent);
332 }
333
334 log.warn("Intent ID {} already in store, throwing new update away",
335 intent.id());
336 return null;
337 }
338
339 private void notifyPeers(InternalIntentEvent event) {
340 try {
341 broadcastMessage(INTENT_UPDATED_MSG, event);
342 } catch (IOException e) {
343 // TODO this won't happen; remove from API
344 log.debug("IOException broadcasting update", e);
345 }
346 }
347
348 private void notifyPeers(InternalSetInstallablesEvent event) {
349 try {
350 broadcastMessage(INTENT_SET_INSTALLABLES_MSG, event);
351 } catch (IOException e) {
352 // TODO this won't happen; remove from API
353 log.debug("IOException broadcasting update", e);
354 }
355 }
356
357 private void broadcastMessage(MessageSubject subject, Object event) throws
358 IOException {
359 ClusterMessage message = new ClusterMessage(
360 clusterService.getLocalNode().id(),
361 subject,
362 SERIALIZER.encode(event));
363 clusterCommunicator.broadcast(message);
364 }
365
366 private void unicastMessage(NodeId peer,
367 MessageSubject subject,
368 Object event) throws IOException {
369 ClusterMessage message = new ClusterMessage(
370 clusterService.getLocalNode().id(),
371 subject,
372 SERIALIZER.encode(event));
373 clusterCommunicator.unicast(message, peer);
374 }
375
376 private void notifyDelegateIfNotNull(IntentEvent event) {
377 if (event != null) {
378 notifyDelegate(event);
379 }
380 }
381
382 private final class InternalIntentCreateOrUpdateEventListener
383 implements ClusterMessageHandler {
384 @Override
385 public void handle(ClusterMessage message) {
386
387 log.debug("Received intent update event from peer: {}", message.sender());
388 InternalIntentEvent event = SERIALIZER.decode(message.payload());
389
390 IntentId intentId = event.intentId();
391 Intent intent = event.intent();
392 IntentState state = event.state();
393 Timestamp timestamp = event.timestamp();
394
395 executor.submit(() -> {
396 try {
397 switch (state) {
398 case INSTALL_REQ:
399 notifyDelegateIfNotNull(createIntentInternal(intent));
400 break;
401 default:
402 notifyDelegateIfNotNull(setStateInternal(intentId, state, timestamp));
403 break;
404 }
405 } catch (Exception e) {
406 log.warn("Exception thrown handling intent create or update", e);
407 }
408 });
409 }
410 }
411
412 private final class InternalIntentSetInstallablesListener
413 implements ClusterMessageHandler {
414 @Override
415 public void handle(ClusterMessage message) {
416 log.debug("Received intent set installables event from peer: {}", message.sender());
417 InternalSetInstallablesEvent event = SERIALIZER.decode(message.payload());
418
419 IntentId intentId = event.intentId();
420 List<Intent> installables = event.installables();
421 Timestamp timestamp = event.timestamp();
422
423 executor.submit(() -> {
424 try {
425 setInstallableIntentsInternal(intentId, installables, timestamp);
426 } catch (Exception e) {
427 log.warn("Exception thrown handling intent set installables", e);
428 }
429 });
430 }
431 }
432
433 private final class InternalIntentAntiEntropyAdvertisementListener
434 implements ClusterMessageHandler {
435
436 @Override
437 public void handle(ClusterMessage message) {
438 log.trace("Received intent Anti-Entropy advertisement from peer: {}", message.sender());
439 // TODO implement
440 //IntentAntiEntropyAdvertisement advertisement = SERIALIZER.decode(message.payload());
441 backgroundExecutor.submit(() -> {
442 try {
443 log.debug("something");
444 //handleAntiEntropyAdvertisement(advertisement);
445 } catch (Exception e) {
446 log.warn("Exception thrown handling intent advertisements", e);
447 }
448 });
449 }
450 }
451}
452