blob: 36699b0addf347b32f6ac0e1ff0c2052d2a7b109 [file] [log] [blame]
Thomas Vachuska90b453f2015-01-30 18:57:14 -08001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.app;
17
Yuta HIGUCHI80292052015-02-10 23:11:59 -080018import com.google.common.base.Charsets;
Thomas Vachuska90b453f2015-01-30 18:57:14 -080019import com.google.common.collect.ImmutableSet;
Thomas Vachuska761f0042015-11-11 19:10:17 -080020import com.google.common.collect.Maps;
21import com.google.common.collect.Multimap;
22import com.google.common.collect.Sets;
Thomas Vachuska90b453f2015-01-30 18:57:14 -080023import org.apache.felix.scr.annotations.Activate;
24import org.apache.felix.scr.annotations.Component;
25import org.apache.felix.scr.annotations.Deactivate;
26import org.apache.felix.scr.annotations.Reference;
27import org.apache.felix.scr.annotations.ReferenceCardinality;
28import org.apache.felix.scr.annotations.Service;
29import org.onlab.util.KryoNamespace;
30import org.onosproject.app.ApplicationDescription;
31import org.onosproject.app.ApplicationEvent;
32import org.onosproject.app.ApplicationException;
33import org.onosproject.app.ApplicationState;
34import org.onosproject.app.ApplicationStore;
Thomas Vachuskacf960112015-03-06 22:36:51 -080035import org.onosproject.app.ApplicationStoreDelegate;
Thomas Vachuska90b453f2015-01-30 18:57:14 -080036import org.onosproject.cluster.ClusterService;
37import org.onosproject.cluster.ControllerNode;
38import org.onosproject.common.app.ApplicationArchive;
39import org.onosproject.core.Application;
40import org.onosproject.core.ApplicationId;
41import org.onosproject.core.ApplicationIdStore;
Thomas Vachuska761f0042015-11-11 19:10:17 -080042import org.onosproject.core.CoreService;
Thomas Vachuska90b453f2015-01-30 18:57:14 -080043import org.onosproject.core.DefaultApplication;
Changhoon Yoonb856b812015-08-10 03:47:19 +090044import org.onosproject.security.Permission;
Thomas Vachuska90b453f2015-01-30 18:57:14 -080045import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
Thomas Vachuska90b453f2015-01-30 18:57:14 -080046import org.onosproject.store.cluster.messaging.MessageSubject;
Thomas Vachuska90b453f2015-01-30 18:57:14 -080047import org.onosproject.store.serializers.KryoNamespaces;
Jonathan Hart6ec029a2015-03-24 17:12:35 -070048import org.onosproject.store.service.EventuallyConsistentMap;
49import org.onosproject.store.service.EventuallyConsistentMapEvent;
50import org.onosproject.store.service.EventuallyConsistentMapListener;
Madan Jampani3e033bd2015-04-08 13:03:49 -070051import org.onosproject.store.service.LogicalClockService;
Thomas Vachuskad0d58542015-06-03 12:38:44 -070052import org.onosproject.store.service.MultiValuedTimestamp;
Madan Jampani01e05fb2015-08-13 13:29:36 -070053import org.onosproject.store.service.StorageException;
Jonathan Hart6ec029a2015-03-24 17:12:35 -070054import org.onosproject.store.service.StorageService;
Thomas Vachuska90b453f2015-01-30 18:57:14 -080055import org.slf4j.Logger;
56
57import java.io.ByteArrayInputStream;
Madan Jampani01e05fb2015-08-13 13:29:36 -070058import java.io.IOException;
Thomas Vachuska90b453f2015-01-30 18:57:14 -080059import java.io.InputStream;
Thomas Vachuska90b453f2015-01-30 18:57:14 -080060import java.util.Set;
61import java.util.concurrent.CountDownLatch;
Madan Jampani2af244a2015-02-22 13:12:01 -080062import java.util.concurrent.ExecutorService;
Thomas Vachuska90b453f2015-01-30 18:57:14 -080063import java.util.concurrent.Executors;
64import java.util.concurrent.ScheduledExecutorService;
Jonathan Hartd6fb0532016-01-21 17:28:20 -080065import java.util.concurrent.TimeUnit;
Madan Jampani2bfa94c2015-04-11 05:03:49 -070066import java.util.function.Function;
67
Thomas Vachuska761f0042015-11-11 19:10:17 -080068import static com.google.common.collect.Multimaps.newSetMultimap;
69import static com.google.common.collect.Multimaps.synchronizedSetMultimap;
Thomas Vachuska90b453f2015-01-30 18:57:14 -080070import static com.google.common.io.ByteStreams.toByteArray;
HIGUCHI Yuta060da9a2016-03-11 19:16:35 -080071import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
Thomas Vachuska90b453f2015-01-30 18:57:14 -080072import static java.util.concurrent.TimeUnit.MILLISECONDS;
Thomas Vachuska6f94ded2015-02-21 14:02:38 -080073import static org.onlab.util.Tools.groupedThreads;
Thomas Vachuskaadba1522015-06-04 15:08:30 -070074import static org.onlab.util.Tools.randomDelay;
Thomas Vachuskad0d58542015-06-03 12:38:44 -070075import static org.onosproject.app.ApplicationEvent.Type.*;
76import static org.onosproject.store.app.GossipApplicationStore.InternalState.*;
Jonathan Hart6ec029a2015-03-24 17:12:35 -070077import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.PUT;
78import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.REMOVE;
Thomas Vachuska90b453f2015-01-30 18:57:14 -080079import static org.slf4j.LoggerFactory.getLogger;
80
81/**
82 * Manages inventory of applications in a distributed data store that uses
83 * optimistic replication and gossip based anti-entropy techniques.
84 */
85@Component(immediate = true)
86@Service
87public class GossipApplicationStore extends ApplicationArchive
88 implements ApplicationStore {
89
90 private final Logger log = getLogger(getClass());
91
92 private static final MessageSubject APP_BITS_REQUEST = new MessageSubject("app-bits-request");
93
Thomas Vachuskaadba1522015-06-04 15:08:30 -070094 private static final int MAX_LOAD_RETRIES = 5;
Thomas Vachuskad0d58542015-06-03 12:38:44 -070095 private static final int RETRY_DELAY_MS = 2_000;
96
Thomas Vachuska90b453f2015-01-30 18:57:14 -080097 private static final int FETCH_TIMEOUT_MS = 10_000;
Thomas Vachuska90b453f2015-01-30 18:57:14 -080098
Jonathan Hartd6fb0532016-01-21 17:28:20 -080099 private static final int APP_LOAD_DELAY_MS = 500;
100
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800101 public enum InternalState {
102 INSTALLED, ACTIVATED, DEACTIVATED
103 }
104
Madan Jampani6b5b7172015-02-23 13:02:26 -0800105 private ScheduledExecutorService executor;
Madan Jampani2af244a2015-02-22 13:12:01 -0800106 private ExecutorService messageHandlingExecutor;
107
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800108 private EventuallyConsistentMap<ApplicationId, Application> apps;
109 private EventuallyConsistentMap<Application, InternalState> states;
110 private EventuallyConsistentMap<Application, Set<Permission>> permissions;
111
112 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
113 protected ClusterCommunicationService clusterCommunicator;
114
115 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
116 protected ClusterService clusterService;
117
118 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700119 protected StorageService storageService;
120
121 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Madan Jampani3e033bd2015-04-08 13:03:49 -0700122 protected LogicalClockService clockService;
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800123
Madan Jampani3e033bd2015-04-08 13:03:49 -0700124 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
125 protected ApplicationIdStore idStore;
Thomas Vachuskacf960112015-03-06 22:36:51 -0800126
Thomas Vachuska761f0042015-11-11 19:10:17 -0800127 // Multimap to track which apps are required by others apps
128 // app -> { required-by, ... }
129 // Apps explicitly activated will be required by the CORE app
130 private final Multimap<ApplicationId, ApplicationId> requiredBy =
131 synchronizedSetMultimap(newSetMultimap(Maps.newHashMap(), Sets::newHashSet));
132
133 private ApplicationId coreAppId;
134
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800135 @Activate
136 public void activate() {
Thomas Vachuskacf960112015-03-06 22:36:51 -0800137 KryoNamespace.Builder serializer = KryoNamespace.newBuilder()
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800138 .register(KryoNamespaces.API)
Thomas Vachuskacf960112015-03-06 22:36:51 -0800139 .register(MultiValuedTimestamp.class)
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800140 .register(InternalState.class);
141
HIGUCHI Yuta060da9a2016-03-11 19:16:35 -0800142 executor = newSingleThreadScheduledExecutor(groupedThreads("onos/app", "store", log));
Madan Jampani6b5b7172015-02-23 13:02:26 -0800143
Madan Jampani2af244a2015-02-22 13:12:01 -0800144 messageHandlingExecutor = Executors.newSingleThreadExecutor(
HIGUCHI Yuta060da9a2016-03-11 19:16:35 -0800145 groupedThreads("onos/store/app", "message-handler", log));
Madan Jampani2af244a2015-02-22 13:12:01 -0800146
Madan Jampani01e05fb2015-08-13 13:29:36 -0700147 clusterCommunicator.<String, byte[]>addSubscriber(APP_BITS_REQUEST,
Thomas Vachuska761f0042015-11-11 19:10:17 -0800148 bytes -> new String(bytes, Charsets.UTF_8),
149 name -> {
150 try {
151 return toByteArray(getApplicationInputStream(name));
152 } catch (IOException e) {
153 throw new StorageException(e);
154 }
155 },
156 Function.identity(),
157 messageHandlingExecutor);
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800158
Thomas Vachuskacf960112015-03-06 22:36:51 -0800159 // FIXME: Consider consolidating into a single map.
160
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700161 apps = storageService.<ApplicationId, Application>eventuallyConsistentMapBuilder()
162 .withName("apps")
163 .withSerializer(serializer)
Madan Jampanibcf1a482015-06-24 19:05:56 -0700164 .withTimestampProvider((k, v) -> clockService.getTimestamp())
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700165 .build();
Thomas Vachuskacf960112015-03-06 22:36:51 -0800166
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700167 states = storageService.<Application, InternalState>eventuallyConsistentMapBuilder()
168 .withName("app-states")
169 .withSerializer(serializer)
Madan Jampanibcf1a482015-06-24 19:05:56 -0700170 .withTimestampProvider((k, v) -> clockService.getTimestamp())
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700171 .build();
172
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800173 states.addListener(new InternalAppStatesListener());
174
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700175 permissions = storageService.<Application, Set<Permission>>eventuallyConsistentMapBuilder()
176 .withName("app-permissions")
177 .withSerializer(serializer)
Madan Jampanibcf1a482015-06-24 19:05:56 -0700178 .withTimestampProvider((k, v) -> clockService.getTimestamp())
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700179 .build();
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800180
Thomas Vachuska761f0042015-11-11 19:10:17 -0800181 coreAppId = getId(CoreService.CORE_APP_NAME);
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800182 log.info("Started");
183 }
184
Thomas Vachuskacf960112015-03-06 22:36:51 -0800185 /**
186 * Loads the application inventory from the disk and activates apps if
187 * they are marked to be active.
188 */
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800189 private void loadFromDisk() {
Charles Chane889e2d2015-11-17 12:01:02 -0800190 getApplicationNames().forEach(appName -> {
191 Application app = loadFromDisk(appName);
192 if (app != null && isActive(app.id().name())) {
193 activate(app.id(), false);
194 // TODO Load app permissions
195 }
196 });
197 }
198
199 private Application loadFromDisk(String appName) {
200 for (int i = 0; i < MAX_LOAD_RETRIES; i++) {
201 try {
202 // Directly return if app already exists
203 ApplicationId appId = getId(appName);
204 if (appId != null) {
Ray Milkey64591a62016-01-12 09:50:45 -0800205 Application application = getApplication(appId);
206 if (application != null) {
207 return application;
208 }
Thomas Vachuska62f04a42015-04-22 14:38:34 -0700209 }
Charles Chane889e2d2015-11-17 12:01:02 -0800210
211 ApplicationDescription appDesc = getApplicationDescription(appName);
212 boolean success = appDesc.requiredApps().stream()
213 .noneMatch(requiredApp -> loadFromDisk(requiredApp) == null);
214 return success ? create(appDesc, false) : null;
215 } catch (Exception e) {
216 log.warn("Unable to load application {} from disk; retrying", appName);
217 randomDelay(RETRY_DELAY_MS); //FIXME: This is a deliberate hack; fix in Falcon
Thomas Vachuskacf960112015-03-06 22:36:51 -0800218 }
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800219 }
Charles Chane889e2d2015-11-17 12:01:02 -0800220 return null;
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800221 }
222
223 @Deactivate
224 public void deactivate() {
Madan Jampani2af244a2015-02-22 13:12:01 -0800225 clusterCommunicator.removeSubscriber(APP_BITS_REQUEST);
226 messageHandlingExecutor.shutdown();
Madan Jampani6b5b7172015-02-23 13:02:26 -0800227 executor.shutdown();
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800228 apps.destroy();
229 states.destroy();
230 permissions.destroy();
231 log.info("Stopped");
232 }
233
234 @Override
Thomas Vachuskacf960112015-03-06 22:36:51 -0800235 public void setDelegate(ApplicationStoreDelegate delegate) {
236 super.setDelegate(delegate);
Jonathan Hartd6fb0532016-01-21 17:28:20 -0800237 executor.schedule(() -> loadFromDisk(), APP_LOAD_DELAY_MS, TimeUnit.MILLISECONDS);
Thomas Vachuskacf960112015-03-06 22:36:51 -0800238 }
239
240 @Override
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800241 public Set<Application> getApplications() {
242 return ImmutableSet.copyOf(apps.values());
243 }
244
245 @Override
246 public ApplicationId getId(String name) {
247 return idStore.getAppId(name);
248 }
249
250 @Override
251 public Application getApplication(ApplicationId appId) {
252 return apps.get(appId);
253 }
254
255 @Override
256 public ApplicationState getState(ApplicationId appId) {
257 Application app = apps.get(appId);
258 InternalState s = app == null ? null : states.get(app);
259 return s == null ? null : s == ACTIVATED ?
260 ApplicationState.ACTIVE : ApplicationState.INSTALLED;
261 }
262
263 @Override
264 public Application create(InputStream appDescStream) {
265 ApplicationDescription appDesc = saveApplication(appDescStream);
Thomas Vachuska761f0042015-11-11 19:10:17 -0800266 if (hasPrerequisites(appDesc)) {
267 return create(appDesc, true);
268 }
269 throw new ApplicationException("Missing dependencies for app " + appDesc.name());
270 }
271
272 private boolean hasPrerequisites(ApplicationDescription app) {
273 return !app.requiredApps().stream().map(n -> getId(n))
274 .anyMatch(id -> id == null || getApplication(id) == null);
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800275 }
276
Thomas Vachuska161baf52015-03-27 16:15:39 -0700277 private Application create(ApplicationDescription appDesc, boolean updateTime) {
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800278 Application app = registerApp(appDesc);
Thomas Vachuska161baf52015-03-27 16:15:39 -0700279 if (updateTime) {
280 updateTime(app.id().name());
281 }
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800282 apps.put(app.id(), app);
283 states.put(app, INSTALLED);
284 return app;
285 }
286
287 @Override
288 public void remove(ApplicationId appId) {
289 Application app = apps.get(appId);
290 if (app != null) {
Thomas Vachuska761f0042015-11-11 19:10:17 -0800291 uninstallDependentApps(app);
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800292 apps.remove(appId);
293 states.remove(app);
294 permissions.remove(app);
295 }
296 }
297
Thomas Vachuska761f0042015-11-11 19:10:17 -0800298 // Uninstalls all apps that depend on the given app.
299 private void uninstallDependentApps(Application app) {
300 getApplications().stream()
301 .filter(a -> a.requiredApps().contains(app.id().name()))
302 .forEach(a -> remove(a.id()));
303 }
304
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800305 @Override
306 public void activate(ApplicationId appId) {
Thomas Vachuska761f0042015-11-11 19:10:17 -0800307 activate(appId, coreAppId);
308 }
309
310 private void activate(ApplicationId appId, ApplicationId forAppId) {
311 requiredBy.put(appId, forAppId);
Thomas Vachuska161baf52015-03-27 16:15:39 -0700312 activate(appId, true);
313 }
314
Thomas Vachuska761f0042015-11-11 19:10:17 -0800315
Thomas Vachuska161baf52015-03-27 16:15:39 -0700316 private void activate(ApplicationId appId, boolean updateTime) {
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800317 Application app = apps.get(appId);
318 if (app != null) {
Thomas Vachuska161baf52015-03-27 16:15:39 -0700319 if (updateTime) {
320 updateTime(appId.name());
321 }
Thomas Vachuska761f0042015-11-11 19:10:17 -0800322 activateRequiredApps(app);
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800323 states.put(app, ACTIVATED);
324 }
325 }
326
Thomas Vachuska761f0042015-11-11 19:10:17 -0800327 // Activates all apps required by this application.
328 private void activateRequiredApps(Application app) {
329 app.requiredApps().stream().map(this::getId).forEach(id -> activate(id, app.id()));
330 }
331
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800332 @Override
333 public void deactivate(ApplicationId appId) {
Thomas Vachuska761f0042015-11-11 19:10:17 -0800334 deactivateDependentApps(getApplication(appId));
335 deactivate(appId, coreAppId);
336 }
337
338 private void deactivate(ApplicationId appId, ApplicationId forAppId) {
339 requiredBy.remove(appId, forAppId);
340 if (requiredBy.get(appId).isEmpty()) {
341 Application app = apps.get(appId);
342 if (app != null) {
343 updateTime(appId.name());
344 states.put(app, DEACTIVATED);
345 deactivateRequiredApps(app);
346 }
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800347 }
348 }
349
Thomas Vachuska761f0042015-11-11 19:10:17 -0800350 // Deactivates all apps that require this application.
351 private void deactivateDependentApps(Application app) {
352 getApplications().stream()
353 .filter(a -> states.get(a) == ACTIVATED)
354 .filter(a -> a.requiredApps().contains(app.id().name()))
355 .forEach(a -> deactivate(a.id()));
356 }
357
358 // Deactivates all apps required by this application.
359 private void deactivateRequiredApps(Application app) {
360 app.requiredApps().stream().map(this::getId).map(this::getApplication)
361 .filter(a -> states.get(a) == ACTIVATED)
362 .forEach(a -> deactivate(a.id(), app.id()));
363 }
364
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800365 @Override
366 public Set<Permission> getPermissions(ApplicationId appId) {
367 Application app = apps.get(appId);
368 return app != null ? permissions.get(app) : null;
369 }
370
371 @Override
372 public void setPermissions(ApplicationId appId, Set<Permission> permissions) {
373 Application app = getApplication(appId);
374 if (app != null) {
375 this.permissions.put(app, permissions);
376 delegate.notify(new ApplicationEvent(APP_PERMISSIONS_CHANGED, app));
377 }
378 }
379
380 /**
381 * Listener to application state distributed map changes.
382 */
383 private final class InternalAppStatesListener
384 implements EventuallyConsistentMapListener<Application, InternalState> {
385 @Override
386 public void event(EventuallyConsistentMapEvent<Application, InternalState> event) {
Thomas Vachuska4fcdae72015-03-24 10:22:54 -0700387 // If we do not have a delegate, refuse to process any events entirely.
388 // This is to allow the anti-entropy to kick in and process the events
389 // perhaps a bit later, but with opportunity to notify delegate.
390 if (delegate == null) {
391 return;
392 }
393
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800394 Application app = event.key();
395 InternalState state = event.value();
396
397 if (event.type() == PUT) {
398 if (state == INSTALLED) {
399 fetchBitsIfNeeded(app);
400 delegate.notify(new ApplicationEvent(APP_INSTALLED, app));
401
402 } else if (state == ACTIVATED) {
403 installAppIfNeeded(app);
404 setActive(app.id().name());
405 delegate.notify(new ApplicationEvent(APP_ACTIVATED, app));
406
407 } else if (state == DEACTIVATED) {
408 clearActive(app.id().name());
409 delegate.notify(new ApplicationEvent(APP_DEACTIVATED, app));
410 }
411 } else if (event.type() == REMOVE) {
412 delegate.notify(new ApplicationEvent(APP_UNINSTALLED, app));
413 purgeApplication(app.id().name());
414 }
415 }
416 }
417
418 /**
419 * Determines if the application bits are available locally.
420 */
421 private boolean appBitsAvailable(Application app) {
422 try {
423 ApplicationDescription appDesc = getApplicationDescription(app.id().name());
424 return appDesc.version().equals(app.version());
425 } catch (ApplicationException e) {
426 return false;
427 }
428 }
429
430 /**
431 * Fetches the bits from the cluster peers if necessary.
432 */
433 private void fetchBitsIfNeeded(Application app) {
434 if (!appBitsAvailable(app)) {
435 fetchBits(app);
436 }
437 }
438
439 /**
440 * Installs the application if necessary from the application peers.
441 */
442 private void installAppIfNeeded(Application app) {
443 if (!appBitsAvailable(app)) {
444 fetchBits(app);
445 delegate.notify(new ApplicationEvent(APP_INSTALLED, app));
446 }
447 }
448
449 /**
450 * Fetches the bits from the cluster peers.
451 */
452 private void fetchBits(Application app) {
453 ControllerNode localNode = clusterService.getLocalNode();
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800454 CountDownLatch latch = new CountDownLatch(1);
455
456 // FIXME: send message with name & version to make sure we don't get served old bits
457
458 log.info("Downloading bits for application {}", app.id().name());
459 for (ControllerNode node : clusterService.getNodes()) {
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700460 if (latch.getCount() == 0) {
461 break;
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800462 }
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700463 if (node.equals(localNode)) {
464 continue;
465 }
466 clusterCommunicator.sendAndReceive(app.id().name(),
Thomas Vachuskad0d58542015-06-03 12:38:44 -0700467 APP_BITS_REQUEST,
468 s -> s.getBytes(Charsets.UTF_8),
469 Function.identity(),
470 node.id())
471 .whenCompleteAsync((bits, error) -> {
472 if (error == null && latch.getCount() > 0) {
473 saveApplication(new ByteArrayInputStream(bits));
474 log.info("Downloaded bits for application {} from node {}",
475 app.id().name(), node.id());
476 latch.countDown();
477 } else if (error != null) {
478 log.warn("Unable to fetch bits for application {} from node {}",
479 app.id().name(), node.id());
480 }
481 }, executor);
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800482 }
483
484 try {
485 if (!latch.await(FETCH_TIMEOUT_MS, MILLISECONDS)) {
486 log.warn("Unable to fetch bits for application {}", app.id().name());
487 }
488 } catch (InterruptedException e) {
489 log.warn("Interrupted while fetching bits for application {}", app.id().name());
490 }
491 }
492
493 /**
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800494 * Prunes applications which are not in the map, but are on disk.
495 */
496 private void pruneUninstalledApps() {
497 for (String name : getApplicationNames()) {
498 if (getApplication(getId(name)) == null) {
499 Application app = registerApp(getApplicationDescription(name));
500 delegate.notify(new ApplicationEvent(APP_UNINSTALLED, app));
501 purgeApplication(app.id().name());
502 }
503 }
504 }
505
506 /**
507 * Produces a registered application from the supplied description.
508 */
509 private Application registerApp(ApplicationDescription appDesc) {
510 ApplicationId appId = idStore.registerApplication(appDesc.name());
Simon Huntafae2f72016-03-04 21:18:23 -0800511 return new DefaultApplication(appId,
512 appDesc.version(),
513 appDesc.title(),
514 appDesc.description(),
515 appDesc.origin(),
516 appDesc.category(),
517 appDesc.url(),
518 appDesc.readme(),
519 appDesc.icon(),
520 appDesc.role(),
521 appDesc.permissions(),
522 appDesc.featuresRepo(),
523 appDesc.features(),
524 appDesc.requiredApps());
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800525 }
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800526}