blob: 279dc94c65befa3d74bafdc206834a4763e6af5f [file] [log] [blame]
Jonathan Hart5573d322015-01-21 10:13:25 -08001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.intent.impl;
17
18import com.google.common.collect.ImmutableList;
19import com.google.common.collect.Lists;
20import com.google.common.collect.Sets;
21import org.apache.felix.scr.annotations.Activate;
22import org.apache.felix.scr.annotations.Component;
23import org.apache.felix.scr.annotations.Deactivate;
24import org.apache.felix.scr.annotations.Reference;
25import org.apache.felix.scr.annotations.ReferenceCardinality;
26import org.apache.felix.scr.annotations.Service;
27import org.onlab.util.KryoNamespace;
28import org.onosproject.cluster.ClusterService;
29import org.onosproject.cluster.NodeId;
30import org.onosproject.net.intent.BatchWrite;
31import org.onosproject.net.intent.Intent;
32import org.onosproject.net.intent.IntentClockService;
33import org.onosproject.net.intent.IntentEvent;
34import org.onosproject.net.intent.IntentId;
35import org.onosproject.net.intent.IntentState;
36import org.onosproject.net.intent.IntentStore;
37import org.onosproject.net.intent.IntentStoreDelegate;
38import org.onosproject.store.AbstractStore;
39import org.onosproject.store.Timestamp;
40import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
41import org.onosproject.store.cluster.messaging.ClusterMessage;
42import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
43import org.onosproject.store.cluster.messaging.MessageSubject;
44import org.onosproject.store.impl.Timestamped;
45import org.onosproject.store.serializers.KryoSerializer;
46import org.onosproject.store.serializers.impl.DistributedStoreSerializers;
47import org.slf4j.Logger;
48
49import java.io.IOException;
50import java.util.ArrayList;
51import java.util.List;
52import java.util.Set;
53import java.util.concurrent.ConcurrentHashMap;
54import java.util.concurrent.ConcurrentMap;
55import java.util.concurrent.ExecutorService;
56import java.util.concurrent.Executors;
57import java.util.concurrent.ScheduledExecutorService;
58import java.util.concurrent.TimeUnit;
59
60import static com.google.common.base.Preconditions.checkArgument;
61import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
62import static org.onlab.util.Tools.minPriority;
63import static org.onlab.util.Tools.namedThreads;
64import static org.onosproject.net.intent.IntentState.INSTALL_REQ;
65import static org.onosproject.store.intent.impl.GossipIntentStoreMessageSubjects.INTENT_ANTI_ENTROPY_ADVERTISEMENT;
66import static org.onosproject.store.intent.impl.GossipIntentStoreMessageSubjects.INTENT_SET_INSTALLABLES_MSG;
67import static org.onosproject.store.intent.impl.GossipIntentStoreMessageSubjects.INTENT_UPDATED_MSG;
68import static org.slf4j.LoggerFactory.getLogger;
69
70/**
71 * Manages inventory of Intents in a distributed data store that uses optimistic
72 * replication and gossip based techniques.
73 */
74@Component(immediate = true, enabled = false)
75@Service
76public class GossipIntentStore
77 extends AbstractStore<IntentEvent, IntentStoreDelegate>
78 implements IntentStore {
79
80 private final Logger log = getLogger(getClass());
81
82 private final ConcurrentMap<IntentId, Intent> intents =
83 new ConcurrentHashMap<>();
84
85 private final ConcurrentMap<IntentId, Timestamped<IntentState>> intentStates
86 = new ConcurrentHashMap<>();
87
88 private final Set<IntentId> withdrawRequestedIntents
89 = Sets.newConcurrentHashSet();
90
91 private ConcurrentMap<IntentId, Timestamped<List<Intent>>> installables
92 = new ConcurrentHashMap<>();
93
94 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
95 protected IntentClockService intentClockService;
96
97 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
98 protected ClusterCommunicationService clusterCommunicator;
99
100 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
101 protected ClusterService clusterService;
102
103 private static final KryoSerializer SERIALIZER = new KryoSerializer() {
104 @Override
105 protected void setupKryoPool() {
106 serializerPool = KryoNamespace.newBuilder()
107 .register(DistributedStoreSerializers.STORE_COMMON)
108 .nextId(DistributedStoreSerializers.STORE_CUSTOM_BEGIN)
109 .register(InternalIntentEvent.class)
110 .register(InternalSetInstallablesEvent.class)
111 //.register(InternalIntentAntiEntropyEvent.class)
112 //.register(IntentAntiEntropyAdvertisement.class)
113 .build();
114 }
115 };
116
117 private ExecutorService executor;
118
119 private ScheduledExecutorService backgroundExecutor;
120
121 // TODO: Make these anti-entropy params configurable
122 private long initialDelaySec = 5;
123 private long periodSec = 5;
124
125 @Activate
126 public void activate() {
127 clusterCommunicator.addSubscriber(INTENT_UPDATED_MSG,
128 new InternalIntentCreateOrUpdateEventListener());
129 clusterCommunicator.addSubscriber(INTENT_SET_INSTALLABLES_MSG,
130 new InternalIntentSetInstallablesListener());
131 clusterCommunicator.addSubscriber(
132 INTENT_ANTI_ENTROPY_ADVERTISEMENT,
133 new InternalIntentAntiEntropyAdvertisementListener());
134
Thomas Vachuska9ea3e6f2015-01-23 16:34:22 -0800135 executor = Executors.newCachedThreadPool(namedThreads("onos-intent-fg-%d"));
Jonathan Hart5573d322015-01-21 10:13:25 -0800136
137 backgroundExecutor =
Thomas Vachuska9ea3e6f2015-01-23 16:34:22 -0800138 newSingleThreadScheduledExecutor(minPriority(namedThreads("onos-intent-bg-%d")));
Jonathan Hart5573d322015-01-21 10:13:25 -0800139
140 // start anti-entropy thread
141 //backgroundExecutor.scheduleAtFixedRate(new SendAdvertisementTask(),
142 //initialDelaySec, periodSec, TimeUnit.SECONDS);
143
144 log.info("Started");
145 }
146
147 @Deactivate
148 public void deactivate() {
149 executor.shutdownNow();
150 backgroundExecutor.shutdownNow();
151 try {
152 if (!backgroundExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
153 log.error("Timeout during executor shutdown");
154 }
155 } catch (InterruptedException e) {
156 log.error("Error during executor shutdown", e);
157 }
158
159 intents.clear();
160
161 log.info("Stopped");
162 }
163
164 @Override
165 public long getIntentCount() {
166 return intents.size();
167 }
168
169 @Override
170 public Iterable<Intent> getIntents() {
171 // TODO don't actually need to copy intents, they are immutable
172 return ImmutableList.copyOf(intents.values());
173 }
174
175 @Override
176 public Intent getIntent(IntentId intentId) {
177 return intents.get(intentId);
178 }
179
180 @Override
181 public IntentState getIntentState(IntentId intentId) {
182 Timestamped<IntentState> state = intentStates.get(intentId);
183 if (state != null) {
184 return state.value();
185 }
186 return null;
187 }
188
Jonathan Hart5573d322015-01-21 10:13:25 -0800189 private IntentEvent setStateInternal(IntentId intentId, IntentState newState, Timestamp timestamp) {
190 switch (newState) {
191 case WITHDRAW_REQ:
192 withdrawRequestedIntents.add(intentId);
193 break;
194 case INSTALL_REQ:
195 case COMPILING:
196 case INSTALLING:
197 case INSTALLED:
198 case RECOMPILING:
199 case WITHDRAWING:
200 case WITHDRAWN:
201 case FAILED:
202 synchronized (intentStates) {
203 Timestamped<IntentState> existing = intentStates.get(intentId);
204 if (existing == null || !existing.isNewer(timestamp)) {
205 intentStates.put(intentId, new Timestamped<>(newState, timestamp));
206 }
207 }
208 break;
209 default:
210 log.warn("Unknown intent state {}", newState);
211 break;
212 }
213
214 try {
215 // TODO make sure it's OK if the intent is null
216 return IntentEvent.getEvent(newState, intents.get(intentId));
217 } catch (IllegalArgumentException e) {
218 // Transient states can't be used for events, so don't send one
219 return null;
220 }
221 }
222
Jonathan Hart5573d322015-01-21 10:13:25 -0800223 private void setInstallableIntentsInternal(IntentId intentId,
224 List<Intent> installableIntents,
225 Timestamp timestamp) {
226 synchronized (installables) {
227 Timestamped<List<Intent>> existing = installables.get(intentId);
228 if (existing == null || !existing.isNewer(timestamp)) {
229 installables.put(intentId,
230 new Timestamped<>(installableIntents, timestamp));
231 }
232 }
233 }
234
235 @Override
236 public List<Intent> getInstallableIntents(IntentId intentId) {
237 Timestamped<List<Intent>> tInstallables = installables.get(intentId);
238 if (tInstallables != null) {
239 return tInstallables.value();
240 }
241 return null;
242 }
243
244 @Override
Jonathan Hart5573d322015-01-21 10:13:25 -0800245 public List<BatchWrite.Operation> batchWrite(BatchWrite batch) {
246
247 List<IntentEvent> events = Lists.newArrayList();
248 List<BatchWrite.Operation> failed = new ArrayList<>();
249
250 for (BatchWrite.Operation op : batch.operations()) {
251 switch (op.type()) {
252 case CREATE_INTENT:
253 checkArgument(op.args().size() == 1,
254 "CREATE_INTENT takes 1 argument. %s", op);
255 Intent intent = op.arg(0);
256
257 events.add(createIntentInternal(intent));
258 notifyPeers(new InternalIntentEvent(
259 intent.id(), intent, INSTALL_REQ, null));
260
261 break;
262 case REMOVE_INTENT:
263 checkArgument(op.args().size() == 1,
264 "REMOVE_INTENT takes 1 argument. %s", op);
265 IntentId intentId = (IntentId) op.arg(0);
266 // TODO implement
267
268 break;
269 case SET_STATE:
270 checkArgument(op.args().size() == 2,
271 "SET_STATE takes 2 arguments. %s", op);
272 intent = op.arg(0);
273 IntentState newState = op.arg(1);
274
275 Timestamp timestamp = intentClockService.getTimestamp(
276 intent.id());
277 IntentEvent externalEvent = setStateInternal(intent.id(), newState, timestamp);
278 events.add(externalEvent);
279 notifyPeers(new InternalIntentEvent(intent.id(), null, newState, timestamp));
280
281 break;
282 case SET_INSTALLABLE:
283 checkArgument(op.args().size() == 2,
284 "SET_INSTALLABLE takes 2 arguments. %s", op);
285 intentId = op.arg(0);
286 List<Intent> installableIntents = op.arg(1);
287
288 Timestamp timestamp1 = intentClockService.getTimestamp(intentId);
289 setInstallableIntentsInternal(
290 intentId, installableIntents, timestamp1);
291
292 notifyPeers(new InternalSetInstallablesEvent(intentId, installableIntents, timestamp1));
293
294 break;
295 case REMOVE_INSTALLED:
296 checkArgument(op.args().size() == 1,
297 "REMOVE_INSTALLED takes 1 argument. %s", op);
298 intentId = op.arg(0);
299 // TODO implement
300 break;
301 default:
302 log.warn("Unknown Operation encountered: {}", op);
303 failed.add(op);
304 break;
305 }
306 }
307
308 notifyDelegate(events);
309 return failed;
310 }
311
312 private IntentEvent createIntentInternal(Intent intent) {
313 Intent oldValue = intents.putIfAbsent(intent.id(), intent);
314 if (oldValue == null) {
315 return IntentEvent.getEvent(INSTALL_REQ, intent);
316 }
317
318 log.warn("Intent ID {} already in store, throwing new update away",
319 intent.id());
320 return null;
321 }
322
323 private void notifyPeers(InternalIntentEvent event) {
324 try {
325 broadcastMessage(INTENT_UPDATED_MSG, event);
326 } catch (IOException e) {
327 // TODO this won't happen; remove from API
328 log.debug("IOException broadcasting update", e);
329 }
330 }
331
332 private void notifyPeers(InternalSetInstallablesEvent event) {
333 try {
334 broadcastMessage(INTENT_SET_INSTALLABLES_MSG, event);
335 } catch (IOException e) {
336 // TODO this won't happen; remove from API
337 log.debug("IOException broadcasting update", e);
338 }
339 }
340
341 private void broadcastMessage(MessageSubject subject, Object event) throws
342 IOException {
343 ClusterMessage message = new ClusterMessage(
344 clusterService.getLocalNode().id(),
345 subject,
346 SERIALIZER.encode(event));
347 clusterCommunicator.broadcast(message);
348 }
349
350 private void unicastMessage(NodeId peer,
351 MessageSubject subject,
352 Object event) throws IOException {
353 ClusterMessage message = new ClusterMessage(
354 clusterService.getLocalNode().id(),
355 subject,
356 SERIALIZER.encode(event));
357 clusterCommunicator.unicast(message, peer);
358 }
359
360 private void notifyDelegateIfNotNull(IntentEvent event) {
361 if (event != null) {
362 notifyDelegate(event);
363 }
364 }
365
366 private final class InternalIntentCreateOrUpdateEventListener
367 implements ClusterMessageHandler {
368 @Override
369 public void handle(ClusterMessage message) {
370
371 log.debug("Received intent update event from peer: {}", message.sender());
372 InternalIntentEvent event = SERIALIZER.decode(message.payload());
373
374 IntentId intentId = event.intentId();
375 Intent intent = event.intent();
376 IntentState state = event.state();
377 Timestamp timestamp = event.timestamp();
378
379 executor.submit(() -> {
380 try {
381 switch (state) {
382 case INSTALL_REQ:
383 notifyDelegateIfNotNull(createIntentInternal(intent));
384 break;
385 default:
386 notifyDelegateIfNotNull(setStateInternal(intentId, state, timestamp));
387 break;
388 }
389 } catch (Exception e) {
390 log.warn("Exception thrown handling intent create or update", e);
391 }
392 });
393 }
394 }
395
396 private final class InternalIntentSetInstallablesListener
397 implements ClusterMessageHandler {
398 @Override
399 public void handle(ClusterMessage message) {
400 log.debug("Received intent set installables event from peer: {}", message.sender());
401 InternalSetInstallablesEvent event = SERIALIZER.decode(message.payload());
402
403 IntentId intentId = event.intentId();
404 List<Intent> installables = event.installables();
405 Timestamp timestamp = event.timestamp();
406
407 executor.submit(() -> {
408 try {
409 setInstallableIntentsInternal(intentId, installables, timestamp);
410 } catch (Exception e) {
411 log.warn("Exception thrown handling intent set installables", e);
412 }
413 });
414 }
415 }
416
417 private final class InternalIntentAntiEntropyAdvertisementListener
418 implements ClusterMessageHandler {
419
420 @Override
421 public void handle(ClusterMessage message) {
422 log.trace("Received intent Anti-Entropy advertisement from peer: {}", message.sender());
423 // TODO implement
424 //IntentAntiEntropyAdvertisement advertisement = SERIALIZER.decode(message.payload());
425 backgroundExecutor.submit(() -> {
426 try {
427 log.debug("something");
428 //handleAntiEntropyAdvertisement(advertisement);
429 } catch (Exception e) {
430 log.warn("Exception thrown handling intent advertisements", e);
431 }
432 });
433 }
434 }
435}
436