blob: 4ffd4bf5a8575466c46e51d2d3d31ce021667945 [file] [log] [blame]
Jonathan Hart5573d322015-01-21 10:13:25 -08001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.intent.impl;
17
18import com.google.common.collect.ImmutableList;
19import com.google.common.collect.Lists;
20import com.google.common.collect.Sets;
21import org.apache.felix.scr.annotations.Activate;
22import org.apache.felix.scr.annotations.Component;
23import org.apache.felix.scr.annotations.Deactivate;
24import org.apache.felix.scr.annotations.Reference;
25import org.apache.felix.scr.annotations.ReferenceCardinality;
26import org.apache.felix.scr.annotations.Service;
27import org.onlab.util.KryoNamespace;
28import org.onosproject.cluster.ClusterService;
29import org.onosproject.cluster.NodeId;
30import org.onosproject.net.intent.BatchWrite;
31import org.onosproject.net.intent.Intent;
32import org.onosproject.net.intent.IntentClockService;
33import org.onosproject.net.intent.IntentEvent;
34import org.onosproject.net.intent.IntentId;
35import org.onosproject.net.intent.IntentState;
36import org.onosproject.net.intent.IntentStore;
37import org.onosproject.net.intent.IntentStoreDelegate;
38import org.onosproject.store.AbstractStore;
39import org.onosproject.store.Timestamp;
40import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
41import org.onosproject.store.cluster.messaging.ClusterMessage;
42import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
43import org.onosproject.store.cluster.messaging.MessageSubject;
44import org.onosproject.store.impl.Timestamped;
45import org.onosproject.store.serializers.KryoSerializer;
46import org.onosproject.store.serializers.impl.DistributedStoreSerializers;
47import org.slf4j.Logger;
48
49import java.io.IOException;
50import java.util.ArrayList;
Jonathan Hart229794b2015-01-23 16:15:07 -080051import java.util.Collections;
Jonathan Hart5573d322015-01-21 10:13:25 -080052import java.util.List;
53import java.util.Set;
54import java.util.concurrent.ConcurrentHashMap;
55import java.util.concurrent.ConcurrentMap;
56import java.util.concurrent.ExecutorService;
57import java.util.concurrent.Executors;
58import java.util.concurrent.ScheduledExecutorService;
59import java.util.concurrent.TimeUnit;
60
61import static com.google.common.base.Preconditions.checkArgument;
62import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
63import static org.onlab.util.Tools.minPriority;
64import static org.onlab.util.Tools.namedThreads;
65import static org.onosproject.net.intent.IntentState.INSTALL_REQ;
66import static org.onosproject.store.intent.impl.GossipIntentStoreMessageSubjects.INTENT_ANTI_ENTROPY_ADVERTISEMENT;
67import static org.onosproject.store.intent.impl.GossipIntentStoreMessageSubjects.INTENT_SET_INSTALLABLES_MSG;
68import static org.onosproject.store.intent.impl.GossipIntentStoreMessageSubjects.INTENT_UPDATED_MSG;
69import static org.slf4j.LoggerFactory.getLogger;
70
71/**
72 * Manages inventory of Intents in a distributed data store that uses optimistic
73 * replication and gossip based techniques.
74 */
75@Component(immediate = true, enabled = false)
76@Service
77public class GossipIntentStore
78 extends AbstractStore<IntentEvent, IntentStoreDelegate>
79 implements IntentStore {
80
81 private final Logger log = getLogger(getClass());
82
83 private final ConcurrentMap<IntentId, Intent> intents =
84 new ConcurrentHashMap<>();
85
86 private final ConcurrentMap<IntentId, Timestamped<IntentState>> intentStates
87 = new ConcurrentHashMap<>();
88
89 private final Set<IntentId> withdrawRequestedIntents
90 = Sets.newConcurrentHashSet();
91
92 private ConcurrentMap<IntentId, Timestamped<List<Intent>>> installables
93 = new ConcurrentHashMap<>();
94
95 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
96 protected IntentClockService intentClockService;
97
98 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
99 protected ClusterCommunicationService clusterCommunicator;
100
101 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
102 protected ClusterService clusterService;
103
104 private static final KryoSerializer SERIALIZER = new KryoSerializer() {
105 @Override
106 protected void setupKryoPool() {
107 serializerPool = KryoNamespace.newBuilder()
108 .register(DistributedStoreSerializers.STORE_COMMON)
109 .nextId(DistributedStoreSerializers.STORE_CUSTOM_BEGIN)
110 .register(InternalIntentEvent.class)
111 .register(InternalSetInstallablesEvent.class)
Jonathan Hart229794b2015-01-23 16:15:07 -0800112 .register(Collections.emptyList().getClass())
Jonathan Hart5573d322015-01-21 10:13:25 -0800113 //.register(InternalIntentAntiEntropyEvent.class)
114 //.register(IntentAntiEntropyAdvertisement.class)
115 .build();
116 }
117 };
118
119 private ExecutorService executor;
120
121 private ScheduledExecutorService backgroundExecutor;
122
123 // TODO: Make these anti-entropy params configurable
124 private long initialDelaySec = 5;
125 private long periodSec = 5;
126
127 @Activate
128 public void activate() {
129 clusterCommunicator.addSubscriber(INTENT_UPDATED_MSG,
130 new InternalIntentCreateOrUpdateEventListener());
131 clusterCommunicator.addSubscriber(INTENT_SET_INSTALLABLES_MSG,
132 new InternalIntentSetInstallablesListener());
133 clusterCommunicator.addSubscriber(
134 INTENT_ANTI_ENTROPY_ADVERTISEMENT,
135 new InternalIntentAntiEntropyAdvertisementListener());
136
Thomas Vachuska9ea3e6f2015-01-23 16:34:22 -0800137 executor = Executors.newCachedThreadPool(namedThreads("onos-intent-fg-%d"));
Jonathan Hart5573d322015-01-21 10:13:25 -0800138
139 backgroundExecutor =
Thomas Vachuska9ea3e6f2015-01-23 16:34:22 -0800140 newSingleThreadScheduledExecutor(minPriority(namedThreads("onos-intent-bg-%d")));
Jonathan Hart5573d322015-01-21 10:13:25 -0800141
142 // start anti-entropy thread
143 //backgroundExecutor.scheduleAtFixedRate(new SendAdvertisementTask(),
144 //initialDelaySec, periodSec, TimeUnit.SECONDS);
145
146 log.info("Started");
147 }
148
149 @Deactivate
150 public void deactivate() {
151 executor.shutdownNow();
152 backgroundExecutor.shutdownNow();
153 try {
154 if (!backgroundExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
155 log.error("Timeout during executor shutdown");
156 }
157 } catch (InterruptedException e) {
158 log.error("Error during executor shutdown", e);
159 }
160
161 intents.clear();
162
163 log.info("Stopped");
164 }
165
166 @Override
167 public long getIntentCount() {
168 return intents.size();
169 }
170
171 @Override
172 public Iterable<Intent> getIntents() {
173 // TODO don't actually need to copy intents, they are immutable
174 return ImmutableList.copyOf(intents.values());
175 }
176
177 @Override
178 public Intent getIntent(IntentId intentId) {
179 return intents.get(intentId);
180 }
181
182 @Override
183 public IntentState getIntentState(IntentId intentId) {
184 Timestamped<IntentState> state = intentStates.get(intentId);
185 if (state != null) {
186 return state.value();
187 }
188 return null;
189 }
190
Jonathan Hart5573d322015-01-21 10:13:25 -0800191 private IntentEvent setStateInternal(IntentId intentId, IntentState newState, Timestamp timestamp) {
192 switch (newState) {
193 case WITHDRAW_REQ:
194 withdrawRequestedIntents.add(intentId);
195 break;
196 case INSTALL_REQ:
197 case COMPILING:
198 case INSTALLING:
199 case INSTALLED:
200 case RECOMPILING:
201 case WITHDRAWING:
202 case WITHDRAWN:
203 case FAILED:
204 synchronized (intentStates) {
205 Timestamped<IntentState> existing = intentStates.get(intentId);
206 if (existing == null || !existing.isNewer(timestamp)) {
207 intentStates.put(intentId, new Timestamped<>(newState, timestamp));
208 }
209 }
210 break;
211 default:
212 log.warn("Unknown intent state {}", newState);
213 break;
214 }
215
216 try {
217 // TODO make sure it's OK if the intent is null
218 return IntentEvent.getEvent(newState, intents.get(intentId));
219 } catch (IllegalArgumentException e) {
220 // Transient states can't be used for events, so don't send one
221 return null;
222 }
223 }
224
Jonathan Hart5573d322015-01-21 10:13:25 -0800225 private void setInstallableIntentsInternal(IntentId intentId,
226 List<Intent> installableIntents,
227 Timestamp timestamp) {
228 synchronized (installables) {
229 Timestamped<List<Intent>> existing = installables.get(intentId);
230 if (existing == null || !existing.isNewer(timestamp)) {
231 installables.put(intentId,
232 new Timestamped<>(installableIntents, timestamp));
233 }
234 }
235 }
236
237 @Override
238 public List<Intent> getInstallableIntents(IntentId intentId) {
239 Timestamped<List<Intent>> tInstallables = installables.get(intentId);
240 if (tInstallables != null) {
241 return tInstallables.value();
242 }
243 return null;
244 }
245
246 @Override
Jonathan Hart5573d322015-01-21 10:13:25 -0800247 public List<BatchWrite.Operation> batchWrite(BatchWrite batch) {
248
249 List<IntentEvent> events = Lists.newArrayList();
250 List<BatchWrite.Operation> failed = new ArrayList<>();
251
Jonathan Hart229794b2015-01-23 16:15:07 -0800252 Timestamp timestamp = null;
253
Jonathan Hart5573d322015-01-21 10:13:25 -0800254 for (BatchWrite.Operation op : batch.operations()) {
255 switch (op.type()) {
256 case CREATE_INTENT:
257 checkArgument(op.args().size() == 1,
258 "CREATE_INTENT takes 1 argument. %s", op);
259 Intent intent = op.arg(0);
260
Jonathan Hart229794b2015-01-23 16:15:07 -0800261 timestamp = intentClockService.getTimestamp(intent.id());
262 if (createIntentInternal(intent)) {
263 events.add(setStateInternal(intent.id(), INSTALL_REQ, timestamp));
264 notifyPeers(new InternalIntentEvent(intent.id(), intent,
265 INSTALL_REQ, timestamp));
266 }
Jonathan Hart5573d322015-01-21 10:13:25 -0800267
268 break;
269 case REMOVE_INTENT:
270 checkArgument(op.args().size() == 1,
271 "REMOVE_INTENT takes 1 argument. %s", op);
272 IntentId intentId = (IntentId) op.arg(0);
273 // TODO implement
274
275 break;
276 case SET_STATE:
277 checkArgument(op.args().size() == 2,
278 "SET_STATE takes 2 arguments. %s", op);
279 intent = op.arg(0);
280 IntentState newState = op.arg(1);
281
Jonathan Hart229794b2015-01-23 16:15:07 -0800282 timestamp = intentClockService.getTimestamp(intent.id());
Jonathan Hart5573d322015-01-21 10:13:25 -0800283 IntentEvent externalEvent = setStateInternal(intent.id(), newState, timestamp);
284 events.add(externalEvent);
285 notifyPeers(new InternalIntentEvent(intent.id(), null, newState, timestamp));
286
287 break;
288 case SET_INSTALLABLE:
289 checkArgument(op.args().size() == 2,
290 "SET_INSTALLABLE takes 2 arguments. %s", op);
291 intentId = op.arg(0);
292 List<Intent> installableIntents = op.arg(1);
293
Jonathan Hart229794b2015-01-23 16:15:07 -0800294 timestamp = intentClockService.getTimestamp(intentId);
Jonathan Hart5573d322015-01-21 10:13:25 -0800295 setInstallableIntentsInternal(
Jonathan Hart229794b2015-01-23 16:15:07 -0800296 intentId, installableIntents, timestamp);
Jonathan Hart5573d322015-01-21 10:13:25 -0800297
Jonathan Hart229794b2015-01-23 16:15:07 -0800298 notifyPeers(new InternalSetInstallablesEvent(intentId, installableIntents, timestamp));
Jonathan Hart5573d322015-01-21 10:13:25 -0800299
300 break;
301 case REMOVE_INSTALLED:
302 checkArgument(op.args().size() == 1,
303 "REMOVE_INSTALLED takes 1 argument. %s", op);
304 intentId = op.arg(0);
305 // TODO implement
306 break;
307 default:
308 log.warn("Unknown Operation encountered: {}", op);
309 failed.add(op);
310 break;
311 }
312 }
313
314 notifyDelegate(events);
315 return failed;
316 }
317
Jonathan Hart229794b2015-01-23 16:15:07 -0800318 private boolean createIntentInternal(Intent intent) {
Jonathan Hart5573d322015-01-21 10:13:25 -0800319 Intent oldValue = intents.putIfAbsent(intent.id(), intent);
320 if (oldValue == null) {
Jonathan Hart229794b2015-01-23 16:15:07 -0800321 return true;
Jonathan Hart5573d322015-01-21 10:13:25 -0800322 }
323
324 log.warn("Intent ID {} already in store, throwing new update away",
325 intent.id());
Jonathan Hart229794b2015-01-23 16:15:07 -0800326 return false;
Jonathan Hart5573d322015-01-21 10:13:25 -0800327 }
328
329 private void notifyPeers(InternalIntentEvent event) {
Jonathan Hart7d656f42015-01-27 14:07:23 -0800330 broadcastMessage(INTENT_UPDATED_MSG, event);
Jonathan Hart5573d322015-01-21 10:13:25 -0800331 }
332
333 private void notifyPeers(InternalSetInstallablesEvent event) {
Jonathan Hart7d656f42015-01-27 14:07:23 -0800334 broadcastMessage(INTENT_SET_INSTALLABLES_MSG, event);
Jonathan Hart5573d322015-01-21 10:13:25 -0800335 }
336
Jonathan Hart7d656f42015-01-27 14:07:23 -0800337 private void broadcastMessage(MessageSubject subject, Object event) {
Jonathan Hart5573d322015-01-21 10:13:25 -0800338 ClusterMessage message = new ClusterMessage(
339 clusterService.getLocalNode().id(),
340 subject,
341 SERIALIZER.encode(event));
342 clusterCommunicator.broadcast(message);
343 }
344
345 private void unicastMessage(NodeId peer,
346 MessageSubject subject,
347 Object event) throws IOException {
348 ClusterMessage message = new ClusterMessage(
349 clusterService.getLocalNode().id(),
350 subject,
351 SERIALIZER.encode(event));
352 clusterCommunicator.unicast(message, peer);
353 }
354
355 private void notifyDelegateIfNotNull(IntentEvent event) {
356 if (event != null) {
357 notifyDelegate(event);
358 }
359 }
360
361 private final class InternalIntentCreateOrUpdateEventListener
362 implements ClusterMessageHandler {
363 @Override
364 public void handle(ClusterMessage message) {
365
366 log.debug("Received intent update event from peer: {}", message.sender());
367 InternalIntentEvent event = SERIALIZER.decode(message.payload());
368
369 IntentId intentId = event.intentId();
370 Intent intent = event.intent();
371 IntentState state = event.state();
372 Timestamp timestamp = event.timestamp();
373
374 executor.submit(() -> {
375 try {
376 switch (state) {
377 case INSTALL_REQ:
Jonathan Hart229794b2015-01-23 16:15:07 -0800378 createIntentInternal(intent);
379 // Fallthrough to setStateInternal for INSTALL_REQ
Jonathan Hart5573d322015-01-21 10:13:25 -0800380 default:
381 notifyDelegateIfNotNull(setStateInternal(intentId, state, timestamp));
382 break;
383 }
384 } catch (Exception e) {
385 log.warn("Exception thrown handling intent create or update", e);
386 }
387 });
388 }
389 }
390
391 private final class InternalIntentSetInstallablesListener
392 implements ClusterMessageHandler {
393 @Override
394 public void handle(ClusterMessage message) {
395 log.debug("Received intent set installables event from peer: {}", message.sender());
396 InternalSetInstallablesEvent event = SERIALIZER.decode(message.payload());
397
398 IntentId intentId = event.intentId();
399 List<Intent> installables = event.installables();
400 Timestamp timestamp = event.timestamp();
401
402 executor.submit(() -> {
403 try {
404 setInstallableIntentsInternal(intentId, installables, timestamp);
405 } catch (Exception e) {
406 log.warn("Exception thrown handling intent set installables", e);
407 }
408 });
409 }
410 }
411
412 private final class InternalIntentAntiEntropyAdvertisementListener
413 implements ClusterMessageHandler {
414
415 @Override
416 public void handle(ClusterMessage message) {
417 log.trace("Received intent Anti-Entropy advertisement from peer: {}", message.sender());
418 // TODO implement
419 //IntentAntiEntropyAdvertisement advertisement = SERIALIZER.decode(message.payload());
420 backgroundExecutor.submit(() -> {
421 try {
422 log.debug("something");
423 //handleAntiEntropyAdvertisement(advertisement);
424 } catch (Exception e) {
425 log.warn("Exception thrown handling intent advertisements", e);
426 }
427 });
428 }
429 }
430}
431