blob: 9809d11aef71443d5c9756c689f676b7f0a876df [file] [log] [blame]
Thomas Vachuska90b453f2015-01-30 18:57:14 -08001/*
Brian O'Connor5ab426f2016-04-09 01:19:45 -07002 * Copyright 2015-present Open Networking Laboratory
Thomas Vachuska90b453f2015-01-30 18:57:14 -08003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.app;
17
Yuta HIGUCHI80292052015-02-10 23:11:59 -080018import com.google.common.base.Charsets;
Thomas Vachuska90b453f2015-01-30 18:57:14 -080019import com.google.common.collect.ImmutableSet;
Andrea Campanellac8eca242016-04-06 19:34:32 -070020import com.google.common.collect.Lists;
Thomas Vachuska761f0042015-11-11 19:10:17 -080021import com.google.common.collect.Maps;
22import com.google.common.collect.Multimap;
23import com.google.common.collect.Sets;
Thomas Vachuska90b453f2015-01-30 18:57:14 -080024import org.apache.felix.scr.annotations.Activate;
25import org.apache.felix.scr.annotations.Component;
26import org.apache.felix.scr.annotations.Deactivate;
27import org.apache.felix.scr.annotations.Reference;
28import org.apache.felix.scr.annotations.ReferenceCardinality;
29import org.apache.felix.scr.annotations.Service;
30import org.onlab.util.KryoNamespace;
31import org.onosproject.app.ApplicationDescription;
32import org.onosproject.app.ApplicationEvent;
33import org.onosproject.app.ApplicationException;
34import org.onosproject.app.ApplicationState;
35import org.onosproject.app.ApplicationStore;
Thomas Vachuskacf960112015-03-06 22:36:51 -080036import org.onosproject.app.ApplicationStoreDelegate;
Thomas Vachuska90b453f2015-01-30 18:57:14 -080037import org.onosproject.cluster.ClusterService;
38import org.onosproject.cluster.ControllerNode;
39import org.onosproject.common.app.ApplicationArchive;
40import org.onosproject.core.Application;
41import org.onosproject.core.ApplicationId;
42import org.onosproject.core.ApplicationIdStore;
Thomas Vachuska761f0042015-11-11 19:10:17 -080043import org.onosproject.core.CoreService;
Thomas Vachuska90b453f2015-01-30 18:57:14 -080044import org.onosproject.core.DefaultApplication;
Changhoon Yoonb856b812015-08-10 03:47:19 +090045import org.onosproject.security.Permission;
Thomas Vachuska90b453f2015-01-30 18:57:14 -080046import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
Thomas Vachuska90b453f2015-01-30 18:57:14 -080047import org.onosproject.store.cluster.messaging.MessageSubject;
Thomas Vachuska90b453f2015-01-30 18:57:14 -080048import org.onosproject.store.serializers.KryoNamespaces;
Jonathan Hart6ec029a2015-03-24 17:12:35 -070049import org.onosproject.store.service.EventuallyConsistentMap;
50import org.onosproject.store.service.EventuallyConsistentMapEvent;
51import org.onosproject.store.service.EventuallyConsistentMapListener;
Madan Jampani3e033bd2015-04-08 13:03:49 -070052import org.onosproject.store.service.LogicalClockService;
Thomas Vachuskad0d58542015-06-03 12:38:44 -070053import org.onosproject.store.service.MultiValuedTimestamp;
Madan Jampani01e05fb2015-08-13 13:29:36 -070054import org.onosproject.store.service.StorageException;
Jonathan Hart6ec029a2015-03-24 17:12:35 -070055import org.onosproject.store.service.StorageService;
Thomas Vachuska90b453f2015-01-30 18:57:14 -080056import org.slf4j.Logger;
57
58import java.io.ByteArrayInputStream;
Madan Jampani01e05fb2015-08-13 13:29:36 -070059import java.io.IOException;
Thomas Vachuska90b453f2015-01-30 18:57:14 -080060import java.io.InputStream;
Andrea Campanellac8eca242016-04-06 19:34:32 -070061import java.util.List;
62import java.util.Optional;
Thomas Vachuska90b453f2015-01-30 18:57:14 -080063import java.util.Set;
64import java.util.concurrent.CountDownLatch;
Madan Jampani2af244a2015-02-22 13:12:01 -080065import java.util.concurrent.ExecutorService;
Thomas Vachuska90b453f2015-01-30 18:57:14 -080066import java.util.concurrent.Executors;
67import java.util.concurrent.ScheduledExecutorService;
Jonathan Hartd6fb0532016-01-21 17:28:20 -080068import java.util.concurrent.TimeUnit;
Madan Jampani2bfa94c2015-04-11 05:03:49 -070069import java.util.function.Function;
70
Thomas Vachuska761f0042015-11-11 19:10:17 -080071import static com.google.common.collect.Multimaps.newSetMultimap;
72import static com.google.common.collect.Multimaps.synchronizedSetMultimap;
Thomas Vachuska90b453f2015-01-30 18:57:14 -080073import static com.google.common.io.ByteStreams.toByteArray;
HIGUCHI Yuta060da9a2016-03-11 19:16:35 -080074import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
Thomas Vachuska90b453f2015-01-30 18:57:14 -080075import static java.util.concurrent.TimeUnit.MILLISECONDS;
Thomas Vachuska6f94ded2015-02-21 14:02:38 -080076import static org.onlab.util.Tools.groupedThreads;
Thomas Vachuskaadba1522015-06-04 15:08:30 -070077import static org.onlab.util.Tools.randomDelay;
Thomas Vachuskad0d58542015-06-03 12:38:44 -070078import static org.onosproject.app.ApplicationEvent.Type.*;
79import static org.onosproject.store.app.GossipApplicationStore.InternalState.*;
Jonathan Hart6ec029a2015-03-24 17:12:35 -070080import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.PUT;
81import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.REMOVE;
Thomas Vachuska90b453f2015-01-30 18:57:14 -080082import static org.slf4j.LoggerFactory.getLogger;
83
84/**
85 * Manages inventory of applications in a distributed data store that uses
86 * optimistic replication and gossip based anti-entropy techniques.
87 */
88@Component(immediate = true)
89@Service
90public class GossipApplicationStore extends ApplicationArchive
91 implements ApplicationStore {
92
93 private final Logger log = getLogger(getClass());
94
95 private static final MessageSubject APP_BITS_REQUEST = new MessageSubject("app-bits-request");
96
Thomas Vachuskaadba1522015-06-04 15:08:30 -070097 private static final int MAX_LOAD_RETRIES = 5;
Thomas Vachuskad0d58542015-06-03 12:38:44 -070098 private static final int RETRY_DELAY_MS = 2_000;
99
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800100 private static final int FETCH_TIMEOUT_MS = 10_000;
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800101
Jonathan Hartd6fb0532016-01-21 17:28:20 -0800102 private static final int APP_LOAD_DELAY_MS = 500;
103
Andrea Campanellac8eca242016-04-06 19:34:32 -0700104 private static List<String> pendingApps = Lists.newArrayList();
105
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800106 public enum InternalState {
107 INSTALLED, ACTIVATED, DEACTIVATED
108 }
109
Madan Jampani6b5b7172015-02-23 13:02:26 -0800110 private ScheduledExecutorService executor;
Madan Jampani2af244a2015-02-22 13:12:01 -0800111 private ExecutorService messageHandlingExecutor;
112
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800113 private EventuallyConsistentMap<ApplicationId, Application> apps;
114 private EventuallyConsistentMap<Application, InternalState> states;
115 private EventuallyConsistentMap<Application, Set<Permission>> permissions;
116
117 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
118 protected ClusterCommunicationService clusterCommunicator;
119
120 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
121 protected ClusterService clusterService;
122
123 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700124 protected StorageService storageService;
125
126 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Madan Jampani3e033bd2015-04-08 13:03:49 -0700127 protected LogicalClockService clockService;
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800128
Madan Jampani3e033bd2015-04-08 13:03:49 -0700129 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
130 protected ApplicationIdStore idStore;
Thomas Vachuskacf960112015-03-06 22:36:51 -0800131
Thomas Vachuska761f0042015-11-11 19:10:17 -0800132 // Multimap to track which apps are required by others apps
133 // app -> { required-by, ... }
134 // Apps explicitly activated will be required by the CORE app
135 private final Multimap<ApplicationId, ApplicationId> requiredBy =
136 synchronizedSetMultimap(newSetMultimap(Maps.newHashMap(), Sets::newHashSet));
137
138 private ApplicationId coreAppId;
139
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800140 @Activate
141 public void activate() {
Thomas Vachuskacf960112015-03-06 22:36:51 -0800142 KryoNamespace.Builder serializer = KryoNamespace.newBuilder()
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800143 .register(KryoNamespaces.API)
Thomas Vachuskacf960112015-03-06 22:36:51 -0800144 .register(MultiValuedTimestamp.class)
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800145 .register(InternalState.class);
146
HIGUCHI Yuta060da9a2016-03-11 19:16:35 -0800147 executor = newSingleThreadScheduledExecutor(groupedThreads("onos/app", "store", log));
Madan Jampani6b5b7172015-02-23 13:02:26 -0800148
Madan Jampani2af244a2015-02-22 13:12:01 -0800149 messageHandlingExecutor = Executors.newSingleThreadExecutor(
HIGUCHI Yuta060da9a2016-03-11 19:16:35 -0800150 groupedThreads("onos/store/app", "message-handler", log));
Madan Jampani2af244a2015-02-22 13:12:01 -0800151
Madan Jampani01e05fb2015-08-13 13:29:36 -0700152 clusterCommunicator.<String, byte[]>addSubscriber(APP_BITS_REQUEST,
Thomas Vachuska761f0042015-11-11 19:10:17 -0800153 bytes -> new String(bytes, Charsets.UTF_8),
154 name -> {
155 try {
156 return toByteArray(getApplicationInputStream(name));
157 } catch (IOException e) {
158 throw new StorageException(e);
159 }
160 },
161 Function.identity(),
162 messageHandlingExecutor);
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800163
Thomas Vachuskacf960112015-03-06 22:36:51 -0800164 // FIXME: Consider consolidating into a single map.
165
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700166 apps = storageService.<ApplicationId, Application>eventuallyConsistentMapBuilder()
167 .withName("apps")
168 .withSerializer(serializer)
Madan Jampanibcf1a482015-06-24 19:05:56 -0700169 .withTimestampProvider((k, v) -> clockService.getTimestamp())
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700170 .build();
Thomas Vachuskacf960112015-03-06 22:36:51 -0800171
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700172 states = storageService.<Application, InternalState>eventuallyConsistentMapBuilder()
173 .withName("app-states")
174 .withSerializer(serializer)
Madan Jampanibcf1a482015-06-24 19:05:56 -0700175 .withTimestampProvider((k, v) -> clockService.getTimestamp())
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700176 .build();
177
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800178 states.addListener(new InternalAppStatesListener());
179
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700180 permissions = storageService.<Application, Set<Permission>>eventuallyConsistentMapBuilder()
181 .withName("app-permissions")
182 .withSerializer(serializer)
Madan Jampanibcf1a482015-06-24 19:05:56 -0700183 .withTimestampProvider((k, v) -> clockService.getTimestamp())
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700184 .build();
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800185
Thomas Vachuska761f0042015-11-11 19:10:17 -0800186 coreAppId = getId(CoreService.CORE_APP_NAME);
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800187 log.info("Started");
188 }
189
Thomas Vachuskacf960112015-03-06 22:36:51 -0800190 /**
191 * Loads the application inventory from the disk and activates apps if
192 * they are marked to be active.
193 */
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800194 private void loadFromDisk() {
Charles Chane889e2d2015-11-17 12:01:02 -0800195 getApplicationNames().forEach(appName -> {
196 Application app = loadFromDisk(appName);
197 if (app != null && isActive(app.id().name())) {
198 activate(app.id(), false);
199 // TODO Load app permissions
200 }
201 });
202 }
203
204 private Application loadFromDisk(String appName) {
Andrea Campanellac8eca242016-04-06 19:34:32 -0700205 pendingApps.add(appName);
206
Charles Chane889e2d2015-11-17 12:01:02 -0800207 for (int i = 0; i < MAX_LOAD_RETRIES; i++) {
208 try {
209 // Directly return if app already exists
210 ApplicationId appId = getId(appName);
211 if (appId != null) {
Ray Milkey64591a62016-01-12 09:50:45 -0800212 Application application = getApplication(appId);
213 if (application != null) {
Andrea Campanellac8eca242016-04-06 19:34:32 -0700214 pendingApps.remove(appName);
Ray Milkey64591a62016-01-12 09:50:45 -0800215 return application;
216 }
Thomas Vachuska62f04a42015-04-22 14:38:34 -0700217 }
Charles Chane889e2d2015-11-17 12:01:02 -0800218
219 ApplicationDescription appDesc = getApplicationDescription(appName);
Andrea Campanellac8eca242016-04-06 19:34:32 -0700220
221 Optional<String> loop = appDesc.requiredApps().stream()
222 .filter(app -> pendingApps.contains(app)).findAny();
223 if (loop.isPresent()) {
224 log.error("Circular app dependency detected: {} -> {}", pendingApps, loop.get());
225 pendingApps.remove(appName);
226 return null;
227 }
228
Charles Chane889e2d2015-11-17 12:01:02 -0800229 boolean success = appDesc.requiredApps().stream()
230 .noneMatch(requiredApp -> loadFromDisk(requiredApp) == null);
Andrea Campanellac8eca242016-04-06 19:34:32 -0700231 pendingApps.remove(appName);
232
Charles Chane889e2d2015-11-17 12:01:02 -0800233 return success ? create(appDesc, false) : null;
Andrea Campanellac8eca242016-04-06 19:34:32 -0700234
Charles Chane889e2d2015-11-17 12:01:02 -0800235 } catch (Exception e) {
236 log.warn("Unable to load application {} from disk; retrying", appName);
237 randomDelay(RETRY_DELAY_MS); //FIXME: This is a deliberate hack; fix in Falcon
Thomas Vachuskacf960112015-03-06 22:36:51 -0800238 }
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800239 }
Andrea Campanellac8eca242016-04-06 19:34:32 -0700240 pendingApps.remove(appName);
Charles Chane889e2d2015-11-17 12:01:02 -0800241 return null;
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800242 }
243
244 @Deactivate
245 public void deactivate() {
Madan Jampani2af244a2015-02-22 13:12:01 -0800246 clusterCommunicator.removeSubscriber(APP_BITS_REQUEST);
247 messageHandlingExecutor.shutdown();
Madan Jampani6b5b7172015-02-23 13:02:26 -0800248 executor.shutdown();
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800249 apps.destroy();
250 states.destroy();
251 permissions.destroy();
252 log.info("Stopped");
253 }
254
255 @Override
Thomas Vachuskacf960112015-03-06 22:36:51 -0800256 public void setDelegate(ApplicationStoreDelegate delegate) {
257 super.setDelegate(delegate);
Jonathan Hartd6fb0532016-01-21 17:28:20 -0800258 executor.schedule(() -> loadFromDisk(), APP_LOAD_DELAY_MS, TimeUnit.MILLISECONDS);
Thomas Vachuskacf960112015-03-06 22:36:51 -0800259 }
260
261 @Override
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800262 public Set<Application> getApplications() {
263 return ImmutableSet.copyOf(apps.values());
264 }
265
266 @Override
267 public ApplicationId getId(String name) {
268 return idStore.getAppId(name);
269 }
270
271 @Override
272 public Application getApplication(ApplicationId appId) {
273 return apps.get(appId);
274 }
275
276 @Override
277 public ApplicationState getState(ApplicationId appId) {
278 Application app = apps.get(appId);
279 InternalState s = app == null ? null : states.get(app);
280 return s == null ? null : s == ACTIVATED ?
281 ApplicationState.ACTIVE : ApplicationState.INSTALLED;
282 }
283
284 @Override
285 public Application create(InputStream appDescStream) {
286 ApplicationDescription appDesc = saveApplication(appDescStream);
Thomas Vachuska761f0042015-11-11 19:10:17 -0800287 if (hasPrerequisites(appDesc)) {
288 return create(appDesc, true);
289 }
290 throw new ApplicationException("Missing dependencies for app " + appDesc.name());
291 }
292
293 private boolean hasPrerequisites(ApplicationDescription app) {
294 return !app.requiredApps().stream().map(n -> getId(n))
295 .anyMatch(id -> id == null || getApplication(id) == null);
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800296 }
297
Thomas Vachuska161baf52015-03-27 16:15:39 -0700298 private Application create(ApplicationDescription appDesc, boolean updateTime) {
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800299 Application app = registerApp(appDesc);
Thomas Vachuska161baf52015-03-27 16:15:39 -0700300 if (updateTime) {
301 updateTime(app.id().name());
302 }
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800303 apps.put(app.id(), app);
304 states.put(app, INSTALLED);
305 return app;
306 }
307
308 @Override
309 public void remove(ApplicationId appId) {
310 Application app = apps.get(appId);
311 if (app != null) {
Thomas Vachuska761f0042015-11-11 19:10:17 -0800312 uninstallDependentApps(app);
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800313 apps.remove(appId);
314 states.remove(app);
315 permissions.remove(app);
316 }
317 }
318
Thomas Vachuska761f0042015-11-11 19:10:17 -0800319 // Uninstalls all apps that depend on the given app.
320 private void uninstallDependentApps(Application app) {
321 getApplications().stream()
322 .filter(a -> a.requiredApps().contains(app.id().name()))
323 .forEach(a -> remove(a.id()));
324 }
325
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800326 @Override
327 public void activate(ApplicationId appId) {
Thomas Vachuska761f0042015-11-11 19:10:17 -0800328 activate(appId, coreAppId);
329 }
330
331 private void activate(ApplicationId appId, ApplicationId forAppId) {
332 requiredBy.put(appId, forAppId);
Thomas Vachuska161baf52015-03-27 16:15:39 -0700333 activate(appId, true);
334 }
335
Thomas Vachuska761f0042015-11-11 19:10:17 -0800336
Thomas Vachuska161baf52015-03-27 16:15:39 -0700337 private void activate(ApplicationId appId, boolean updateTime) {
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800338 Application app = apps.get(appId);
339 if (app != null) {
Thomas Vachuska161baf52015-03-27 16:15:39 -0700340 if (updateTime) {
341 updateTime(appId.name());
342 }
Thomas Vachuska761f0042015-11-11 19:10:17 -0800343 activateRequiredApps(app);
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800344 states.put(app, ACTIVATED);
345 }
346 }
347
Thomas Vachuska761f0042015-11-11 19:10:17 -0800348 // Activates all apps required by this application.
349 private void activateRequiredApps(Application app) {
350 app.requiredApps().stream().map(this::getId).forEach(id -> activate(id, app.id()));
351 }
352
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800353 @Override
354 public void deactivate(ApplicationId appId) {
Thomas Vachuska761f0042015-11-11 19:10:17 -0800355 deactivateDependentApps(getApplication(appId));
356 deactivate(appId, coreAppId);
357 }
358
359 private void deactivate(ApplicationId appId, ApplicationId forAppId) {
360 requiredBy.remove(appId, forAppId);
361 if (requiredBy.get(appId).isEmpty()) {
362 Application app = apps.get(appId);
363 if (app != null) {
364 updateTime(appId.name());
365 states.put(app, DEACTIVATED);
366 deactivateRequiredApps(app);
367 }
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800368 }
369 }
370
Thomas Vachuska761f0042015-11-11 19:10:17 -0800371 // Deactivates all apps that require this application.
372 private void deactivateDependentApps(Application app) {
373 getApplications().stream()
374 .filter(a -> states.get(a) == ACTIVATED)
375 .filter(a -> a.requiredApps().contains(app.id().name()))
376 .forEach(a -> deactivate(a.id()));
377 }
378
379 // Deactivates all apps required by this application.
380 private void deactivateRequiredApps(Application app) {
381 app.requiredApps().stream().map(this::getId).map(this::getApplication)
382 .filter(a -> states.get(a) == ACTIVATED)
383 .forEach(a -> deactivate(a.id(), app.id()));
384 }
385
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800386 @Override
387 public Set<Permission> getPermissions(ApplicationId appId) {
388 Application app = apps.get(appId);
389 return app != null ? permissions.get(app) : null;
390 }
391
392 @Override
393 public void setPermissions(ApplicationId appId, Set<Permission> permissions) {
394 Application app = getApplication(appId);
395 if (app != null) {
396 this.permissions.put(app, permissions);
397 delegate.notify(new ApplicationEvent(APP_PERMISSIONS_CHANGED, app));
398 }
399 }
400
401 /**
402 * Listener to application state distributed map changes.
403 */
404 private final class InternalAppStatesListener
405 implements EventuallyConsistentMapListener<Application, InternalState> {
406 @Override
407 public void event(EventuallyConsistentMapEvent<Application, InternalState> event) {
Thomas Vachuska4fcdae72015-03-24 10:22:54 -0700408 // If we do not have a delegate, refuse to process any events entirely.
409 // This is to allow the anti-entropy to kick in and process the events
410 // perhaps a bit later, but with opportunity to notify delegate.
411 if (delegate == null) {
412 return;
413 }
414
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800415 Application app = event.key();
416 InternalState state = event.value();
417
418 if (event.type() == PUT) {
419 if (state == INSTALLED) {
420 fetchBitsIfNeeded(app);
421 delegate.notify(new ApplicationEvent(APP_INSTALLED, app));
422
423 } else if (state == ACTIVATED) {
424 installAppIfNeeded(app);
425 setActive(app.id().name());
426 delegate.notify(new ApplicationEvent(APP_ACTIVATED, app));
427
428 } else if (state == DEACTIVATED) {
429 clearActive(app.id().name());
430 delegate.notify(new ApplicationEvent(APP_DEACTIVATED, app));
431 }
432 } else if (event.type() == REMOVE) {
433 delegate.notify(new ApplicationEvent(APP_UNINSTALLED, app));
434 purgeApplication(app.id().name());
435 }
436 }
437 }
438
439 /**
440 * Determines if the application bits are available locally.
441 */
442 private boolean appBitsAvailable(Application app) {
443 try {
444 ApplicationDescription appDesc = getApplicationDescription(app.id().name());
445 return appDesc.version().equals(app.version());
446 } catch (ApplicationException e) {
447 return false;
448 }
449 }
450
451 /**
452 * Fetches the bits from the cluster peers if necessary.
453 */
454 private void fetchBitsIfNeeded(Application app) {
455 if (!appBitsAvailable(app)) {
456 fetchBits(app);
457 }
458 }
459
460 /**
461 * Installs the application if necessary from the application peers.
462 */
463 private void installAppIfNeeded(Application app) {
464 if (!appBitsAvailable(app)) {
465 fetchBits(app);
466 delegate.notify(new ApplicationEvent(APP_INSTALLED, app));
467 }
468 }
469
470 /**
471 * Fetches the bits from the cluster peers.
472 */
473 private void fetchBits(Application app) {
474 ControllerNode localNode = clusterService.getLocalNode();
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800475 CountDownLatch latch = new CountDownLatch(1);
476
477 // FIXME: send message with name & version to make sure we don't get served old bits
478
479 log.info("Downloading bits for application {}", app.id().name());
480 for (ControllerNode node : clusterService.getNodes()) {
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700481 if (latch.getCount() == 0) {
482 break;
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800483 }
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700484 if (node.equals(localNode)) {
485 continue;
486 }
487 clusterCommunicator.sendAndReceive(app.id().name(),
Thomas Vachuskad0d58542015-06-03 12:38:44 -0700488 APP_BITS_REQUEST,
489 s -> s.getBytes(Charsets.UTF_8),
490 Function.identity(),
491 node.id())
492 .whenCompleteAsync((bits, error) -> {
493 if (error == null && latch.getCount() > 0) {
494 saveApplication(new ByteArrayInputStream(bits));
495 log.info("Downloaded bits for application {} from node {}",
496 app.id().name(), node.id());
497 latch.countDown();
498 } else if (error != null) {
499 log.warn("Unable to fetch bits for application {} from node {}",
500 app.id().name(), node.id());
501 }
502 }, executor);
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800503 }
504
505 try {
506 if (!latch.await(FETCH_TIMEOUT_MS, MILLISECONDS)) {
507 log.warn("Unable to fetch bits for application {}", app.id().name());
508 }
509 } catch (InterruptedException e) {
510 log.warn("Interrupted while fetching bits for application {}", app.id().name());
511 }
512 }
513
514 /**
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800515 * Prunes applications which are not in the map, but are on disk.
516 */
517 private void pruneUninstalledApps() {
518 for (String name : getApplicationNames()) {
519 if (getApplication(getId(name)) == null) {
520 Application app = registerApp(getApplicationDescription(name));
521 delegate.notify(new ApplicationEvent(APP_UNINSTALLED, app));
522 purgeApplication(app.id().name());
523 }
524 }
525 }
526
527 /**
528 * Produces a registered application from the supplied description.
529 */
530 private Application registerApp(ApplicationDescription appDesc) {
531 ApplicationId appId = idStore.registerApplication(appDesc.name());
Simon Huntafae2f72016-03-04 21:18:23 -0800532 return new DefaultApplication(appId,
533 appDesc.version(),
534 appDesc.title(),
535 appDesc.description(),
536 appDesc.origin(),
537 appDesc.category(),
538 appDesc.url(),
539 appDesc.readme(),
540 appDesc.icon(),
541 appDesc.role(),
542 appDesc.permissions(),
543 appDesc.featuresRepo(),
544 appDesc.features(),
545 appDesc.requiredApps());
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800546 }
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800547}