blob: 03b5b5e2784ed2c737d0bca495013707e9043e81 [file] [log] [blame]
Thomas Vachuska90b453f2015-01-30 18:57:14 -08001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.app;
17
Yuta HIGUCHI80292052015-02-10 23:11:59 -080018import com.google.common.base.Charsets;
Thomas Vachuska90b453f2015-01-30 18:57:14 -080019import com.google.common.collect.ImmutableSet;
20import com.google.common.util.concurrent.ListenableFuture;
21import org.apache.felix.scr.annotations.Activate;
22import org.apache.felix.scr.annotations.Component;
23import org.apache.felix.scr.annotations.Deactivate;
24import org.apache.felix.scr.annotations.Reference;
25import org.apache.felix.scr.annotations.ReferenceCardinality;
26import org.apache.felix.scr.annotations.Service;
27import org.onlab.util.KryoNamespace;
28import org.onosproject.app.ApplicationDescription;
29import org.onosproject.app.ApplicationEvent;
30import org.onosproject.app.ApplicationException;
31import org.onosproject.app.ApplicationState;
32import org.onosproject.app.ApplicationStore;
Thomas Vachuskacf960112015-03-06 22:36:51 -080033import org.onosproject.app.ApplicationStoreDelegate;
Thomas Vachuska90b453f2015-01-30 18:57:14 -080034import org.onosproject.cluster.ClusterService;
35import org.onosproject.cluster.ControllerNode;
36import org.onosproject.common.app.ApplicationArchive;
37import org.onosproject.core.Application;
38import org.onosproject.core.ApplicationId;
39import org.onosproject.core.ApplicationIdStore;
40import org.onosproject.core.DefaultApplication;
41import org.onosproject.core.Permission;
42import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
43import org.onosproject.store.cluster.messaging.ClusterMessage;
44import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
45import org.onosproject.store.cluster.messaging.MessageSubject;
Thomas Vachuskacf960112015-03-06 22:36:51 -080046import org.onosproject.store.impl.MultiValuedTimestamp;
Thomas Vachuska90b453f2015-01-30 18:57:14 -080047import org.onosproject.store.impl.WallclockClockManager;
48import org.onosproject.store.serializers.KryoNamespaces;
Jonathan Hart6ec029a2015-03-24 17:12:35 -070049import org.onosproject.store.service.ClockService;
50import org.onosproject.store.service.EventuallyConsistentMap;
51import org.onosproject.store.service.EventuallyConsistentMapEvent;
52import org.onosproject.store.service.EventuallyConsistentMapListener;
53import org.onosproject.store.service.StorageService;
Thomas Vachuska90b453f2015-01-30 18:57:14 -080054import org.slf4j.Logger;
55
56import java.io.ByteArrayInputStream;
57import java.io.IOException;
58import java.io.InputStream;
Thomas Vachuska90b453f2015-01-30 18:57:14 -080059import java.util.Set;
60import java.util.concurrent.CountDownLatch;
Madan Jampani2af244a2015-02-22 13:12:01 -080061import java.util.concurrent.ExecutorService;
Thomas Vachuska90b453f2015-01-30 18:57:14 -080062import java.util.concurrent.Executors;
63import java.util.concurrent.ScheduledExecutorService;
Thomas Vachuskacf960112015-03-06 22:36:51 -080064import java.util.concurrent.atomic.AtomicLong;
Thomas Vachuska90b453f2015-01-30 18:57:14 -080065
66import static com.google.common.io.ByteStreams.toByteArray;
67import static java.util.concurrent.TimeUnit.MILLISECONDS;
Thomas Vachuska6f94ded2015-02-21 14:02:38 -080068import static org.onlab.util.Tools.groupedThreads;
Jonathan Hart6ec029a2015-03-24 17:12:35 -070069import static org.onosproject.app.ApplicationEvent.Type.APP_ACTIVATED;
70import static org.onosproject.app.ApplicationEvent.Type.APP_DEACTIVATED;
71import static org.onosproject.app.ApplicationEvent.Type.APP_INSTALLED;
72import static org.onosproject.app.ApplicationEvent.Type.APP_PERMISSIONS_CHANGED;
73import static org.onosproject.app.ApplicationEvent.Type.APP_UNINSTALLED;
74import static org.onosproject.store.app.GossipApplicationStore.InternalState.ACTIVATED;
75import static org.onosproject.store.app.GossipApplicationStore.InternalState.DEACTIVATED;
76import static org.onosproject.store.app.GossipApplicationStore.InternalState.INSTALLED;
77import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.PUT;
78import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.REMOVE;
Thomas Vachuska90b453f2015-01-30 18:57:14 -080079import static org.slf4j.LoggerFactory.getLogger;
80
81/**
82 * Manages inventory of applications in a distributed data store that uses
83 * optimistic replication and gossip based anti-entropy techniques.
84 */
85@Component(immediate = true)
86@Service
87public class GossipApplicationStore extends ApplicationArchive
88 implements ApplicationStore {
89
90 private final Logger log = getLogger(getClass());
91
92 private static final MessageSubject APP_BITS_REQUEST = new MessageSubject("app-bits-request");
93
94 private static final int FETCH_TIMEOUT_MS = 10_000;
95 private static final int LOAD_TIMEOUT_MS = 5_000;
96
97 public enum InternalState {
98 INSTALLED, ACTIVATED, DEACTIVATED
99 }
100
Madan Jampani6b5b7172015-02-23 13:02:26 -0800101 private ScheduledExecutorService executor;
Madan Jampani2af244a2015-02-22 13:12:01 -0800102 private ExecutorService messageHandlingExecutor;
103
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800104 private EventuallyConsistentMap<ApplicationId, Application> apps;
105 private EventuallyConsistentMap<Application, InternalState> states;
106 private EventuallyConsistentMap<Application, Set<Permission>> permissions;
107
108 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
109 protected ClusterCommunicationService clusterCommunicator;
110
111 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
112 protected ClusterService clusterService;
113
114 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700115 protected StorageService storageService;
116
117 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800118 protected ApplicationIdStore idStore;
119
Thomas Vachuskacf960112015-03-06 22:36:51 -0800120 private final AtomicLong sequence = new AtomicLong();
121
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800122 @Activate
123 public void activate() {
Thomas Vachuskacf960112015-03-06 22:36:51 -0800124 KryoNamespace.Builder serializer = KryoNamespace.newBuilder()
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800125 .register(KryoNamespaces.API)
Thomas Vachuskacf960112015-03-06 22:36:51 -0800126 .register(MultiValuedTimestamp.class)
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800127 .register(InternalState.class);
128
Madan Jampani6b5b7172015-02-23 13:02:26 -0800129 executor = Executors.newSingleThreadScheduledExecutor(groupedThreads("onos/app", "store"));
130
Madan Jampani2af244a2015-02-22 13:12:01 -0800131 messageHandlingExecutor = Executors.newSingleThreadExecutor(
132 groupedThreads("onos/store/app", "message-handler"));
133
134 clusterCommunicator.addSubscriber(APP_BITS_REQUEST, new InternalBitServer(), messageHandlingExecutor);
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800135
Thomas Vachuskacf960112015-03-06 22:36:51 -0800136 // FIXME: Consider consolidating into a single map.
137
138 ClockService<ApplicationId, Application> appsClockService = (appId, app) ->
139 new MultiValuedTimestamp<>(getUpdateTime(appId.name()),
140 sequence.incrementAndGet());
141
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700142 apps = storageService.<ApplicationId, Application>eventuallyConsistentMapBuilder()
143 .withName("apps")
144 .withSerializer(serializer)
145 .withClockService(appsClockService)
146 .build();
Thomas Vachuskacf960112015-03-06 22:36:51 -0800147
148 ClockService<Application, InternalState> statesClockService = (app, state) ->
149 new MultiValuedTimestamp<>(getUpdateTime(app.id().name()),
150 sequence.incrementAndGet());
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800151
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700152 states = storageService.<Application, InternalState>eventuallyConsistentMapBuilder()
153 .withName("app-states")
154 .withSerializer(serializer)
155 .withClockService(statesClockService)
156 .build();
157
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800158 states.addListener(new InternalAppStatesListener());
159
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700160 permissions = storageService.<Application, Set<Permission>>eventuallyConsistentMapBuilder()
161 .withName("app-permissions")
162 .withSerializer(serializer)
163 .withClockService(new WallclockClockManager<>())
164 .build();
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800165
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800166 log.info("Started");
167 }
168
Thomas Vachuskacf960112015-03-06 22:36:51 -0800169 /**
170 * Loads the application inventory from the disk and activates apps if
171 * they are marked to be active.
172 */
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800173 private void loadFromDisk() {
174 for (String name : getApplicationNames()) {
Thomas Vachuska161baf52015-03-27 16:15:39 -0700175 Application app = create(getApplicationDescription(name), false);
Thomas Vachuskacf960112015-03-06 22:36:51 -0800176 if (app != null && isActive(app.id().name())) {
Thomas Vachuska161baf52015-03-27 16:15:39 -0700177 activate(app.id(), false);
Thomas Vachuskacf960112015-03-06 22:36:51 -0800178 // load app permissions
179 }
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800180 }
181 }
182
183 @Deactivate
184 public void deactivate() {
Madan Jampani2af244a2015-02-22 13:12:01 -0800185 clusterCommunicator.removeSubscriber(APP_BITS_REQUEST);
186 messageHandlingExecutor.shutdown();
Madan Jampani6b5b7172015-02-23 13:02:26 -0800187 executor.shutdown();
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800188 apps.destroy();
189 states.destroy();
190 permissions.destroy();
191 log.info("Stopped");
192 }
193
194 @Override
Thomas Vachuskacf960112015-03-06 22:36:51 -0800195 public void setDelegate(ApplicationStoreDelegate delegate) {
196 super.setDelegate(delegate);
197 loadFromDisk();
198// executor.schedule(this::pruneUninstalledApps, LOAD_TIMEOUT_MS, MILLISECONDS);
199 }
200
201 @Override
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800202 public Set<Application> getApplications() {
203 return ImmutableSet.copyOf(apps.values());
204 }
205
206 @Override
207 public ApplicationId getId(String name) {
208 return idStore.getAppId(name);
209 }
210
211 @Override
212 public Application getApplication(ApplicationId appId) {
213 return apps.get(appId);
214 }
215
216 @Override
217 public ApplicationState getState(ApplicationId appId) {
218 Application app = apps.get(appId);
219 InternalState s = app == null ? null : states.get(app);
220 return s == null ? null : s == ACTIVATED ?
221 ApplicationState.ACTIVE : ApplicationState.INSTALLED;
222 }
223
224 @Override
225 public Application create(InputStream appDescStream) {
226 ApplicationDescription appDesc = saveApplication(appDescStream);
Thomas Vachuska161baf52015-03-27 16:15:39 -0700227 return create(appDesc, true);
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800228 }
229
Thomas Vachuska161baf52015-03-27 16:15:39 -0700230 private Application create(ApplicationDescription appDesc, boolean updateTime) {
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800231 Application app = registerApp(appDesc);
Thomas Vachuska161baf52015-03-27 16:15:39 -0700232 if (updateTime) {
233 updateTime(app.id().name());
234 }
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800235 apps.put(app.id(), app);
236 states.put(app, INSTALLED);
237 return app;
238 }
239
240 @Override
241 public void remove(ApplicationId appId) {
242 Application app = apps.get(appId);
243 if (app != null) {
244 apps.remove(appId);
245 states.remove(app);
246 permissions.remove(app);
247 }
248 }
249
250 @Override
251 public void activate(ApplicationId appId) {
Thomas Vachuska161baf52015-03-27 16:15:39 -0700252 activate(appId, true);
253 }
254
255 private void activate(ApplicationId appId, boolean updateTime) {
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800256 Application app = apps.get(appId);
257 if (app != null) {
Thomas Vachuska161baf52015-03-27 16:15:39 -0700258 if (updateTime) {
259 updateTime(appId.name());
260 }
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800261 states.put(app, ACTIVATED);
262 }
263 }
264
265 @Override
266 public void deactivate(ApplicationId appId) {
267 Application app = apps.get(appId);
268 if (app != null) {
Thomas Vachuska161baf52015-03-27 16:15:39 -0700269 updateTime(appId.name());
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800270 states.put(app, DEACTIVATED);
271 }
272 }
273
274 @Override
275 public Set<Permission> getPermissions(ApplicationId appId) {
276 Application app = apps.get(appId);
277 return app != null ? permissions.get(app) : null;
278 }
279
280 @Override
281 public void setPermissions(ApplicationId appId, Set<Permission> permissions) {
282 Application app = getApplication(appId);
283 if (app != null) {
284 this.permissions.put(app, permissions);
285 delegate.notify(new ApplicationEvent(APP_PERMISSIONS_CHANGED, app));
286 }
287 }
288
289 /**
290 * Listener to application state distributed map changes.
291 */
292 private final class InternalAppStatesListener
293 implements EventuallyConsistentMapListener<Application, InternalState> {
294 @Override
295 public void event(EventuallyConsistentMapEvent<Application, InternalState> event) {
Thomas Vachuska4fcdae72015-03-24 10:22:54 -0700296 // If we do not have a delegate, refuse to process any events entirely.
297 // This is to allow the anti-entropy to kick in and process the events
298 // perhaps a bit later, but with opportunity to notify delegate.
299 if (delegate == null) {
300 return;
301 }
302
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800303 Application app = event.key();
304 InternalState state = event.value();
305
306 if (event.type() == PUT) {
307 if (state == INSTALLED) {
308 fetchBitsIfNeeded(app);
309 delegate.notify(new ApplicationEvent(APP_INSTALLED, app));
310
311 } else if (state == ACTIVATED) {
312 installAppIfNeeded(app);
313 setActive(app.id().name());
314 delegate.notify(new ApplicationEvent(APP_ACTIVATED, app));
315
316 } else if (state == DEACTIVATED) {
317 clearActive(app.id().name());
318 delegate.notify(new ApplicationEvent(APP_DEACTIVATED, app));
319 }
320 } else if (event.type() == REMOVE) {
321 delegate.notify(new ApplicationEvent(APP_UNINSTALLED, app));
322 purgeApplication(app.id().name());
323 }
324 }
325 }
326
327 /**
328 * Determines if the application bits are available locally.
329 */
330 private boolean appBitsAvailable(Application app) {
331 try {
332 ApplicationDescription appDesc = getApplicationDescription(app.id().name());
333 return appDesc.version().equals(app.version());
334 } catch (ApplicationException e) {
335 return false;
336 }
337 }
338
339 /**
340 * Fetches the bits from the cluster peers if necessary.
341 */
342 private void fetchBitsIfNeeded(Application app) {
343 if (!appBitsAvailable(app)) {
344 fetchBits(app);
345 }
346 }
347
348 /**
349 * Installs the application if necessary from the application peers.
350 */
351 private void installAppIfNeeded(Application app) {
352 if (!appBitsAvailable(app)) {
353 fetchBits(app);
354 delegate.notify(new ApplicationEvent(APP_INSTALLED, app));
355 }
356 }
357
358 /**
359 * Fetches the bits from the cluster peers.
360 */
361 private void fetchBits(Application app) {
362 ControllerNode localNode = clusterService.getLocalNode();
363 ClusterMessage message = new ClusterMessage(localNode.id(), APP_BITS_REQUEST,
Yuta HIGUCHI80292052015-02-10 23:11:59 -0800364 app.id().name().getBytes(Charsets.UTF_8));
365 //Map<ControllerNode, ListenableFuture<byte[]>> futures = new HashMap<>();
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800366 CountDownLatch latch = new CountDownLatch(1);
367
368 // FIXME: send message with name & version to make sure we don't get served old bits
369
370 log.info("Downloading bits for application {}", app.id().name());
371 for (ControllerNode node : clusterService.getNodes()) {
372 try {
373 ListenableFuture<byte[]> future = clusterCommunicator.sendAndReceive(message, node.id());
374 future.addListener(new InternalBitListener(app, node, future, latch), executor);
375 } catch (IOException e) {
376 log.debug("Unable to request bits for application {} from node {}",
377 app.id().name(), node.id());
378 }
379 }
380
381 try {
382 if (!latch.await(FETCH_TIMEOUT_MS, MILLISECONDS)) {
383 log.warn("Unable to fetch bits for application {}", app.id().name());
384 }
385 } catch (InterruptedException e) {
386 log.warn("Interrupted while fetching bits for application {}", app.id().name());
387 }
388 }
389
390 /**
391 * Responder to requests for application bits.
392 */
393 private class InternalBitServer implements ClusterMessageHandler {
394 @Override
395 public void handle(ClusterMessage message) {
Yuta HIGUCHI80292052015-02-10 23:11:59 -0800396 String name = new String(message.payload(), Charsets.UTF_8);
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800397 try {
398 message.respond(toByteArray(getApplicationInputStream(name)));
399 } catch (Exception e) {
400 log.debug("Unable to read bits for application {}", name);
401 }
402 }
403 }
404
405 /**
406 * Processes completed fetch requests.
407 */
408 private class InternalBitListener implements Runnable {
409 private final Application app;
410 private final ControllerNode node;
411 private final ListenableFuture<byte[]> future;
412 private final CountDownLatch latch;
413
414 public InternalBitListener(Application app, ControllerNode node,
415 ListenableFuture<byte[]> future, CountDownLatch latch) {
416 this.app = app;
417 this.node = node;
418 this.future = future;
419 this.latch = latch;
420 }
421
422 @Override
423 public void run() {
424 if (latch.getCount() > 0 && !future.isCancelled()) {
425 try {
426 byte[] bits = future.get(1, MILLISECONDS);
427 saveApplication(new ByteArrayInputStream(bits));
428 log.info("Downloaded bits for application {} from node {}",
429 app.id().name(), node.id());
430 latch.countDown();
431 } catch (Exception e) {
432 log.warn("Unable to fetch bits for application {} from node {}",
433 app.id().name(), node.id());
434 }
435 }
436 }
437 }
438
439 /**
440 * Prunes applications which are not in the map, but are on disk.
441 */
442 private void pruneUninstalledApps() {
443 for (String name : getApplicationNames()) {
444 if (getApplication(getId(name)) == null) {
445 Application app = registerApp(getApplicationDescription(name));
446 delegate.notify(new ApplicationEvent(APP_UNINSTALLED, app));
447 purgeApplication(app.id().name());
448 }
449 }
450 }
451
452 /**
453 * Produces a registered application from the supplied description.
454 */
455 private Application registerApp(ApplicationDescription appDesc) {
456 ApplicationId appId = idStore.registerApplication(appDesc.name());
457 return new DefaultApplication(appId, appDesc.version(), appDesc.description(),
458 appDesc.origin(), appDesc.permissions(),
459 appDesc.featuresRepo(), appDesc.features());
460 }
Thomas Vachuskacf960112015-03-06 22:36:51 -0800461
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800462}
463