blob: 027e1a096b35b363589944e31bda8d69381308fd [file] [log] [blame]
Thomas Vachuska90b453f2015-01-30 18:57:14 -08001/*
Madan Jampani6c02d9e2016-05-24 15:09:47 -07002 * Copyright 2016-present Open Networking Laboratory
Thomas Vachuska90b453f2015-01-30 18:57:14 -08003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.app;
17
Yuta HIGUCHI80292052015-02-10 23:11:59 -080018import com.google.common.base.Charsets;
Madan Jampani6c02d9e2016-05-24 15:09:47 -070019import com.google.common.base.MoreObjects;
20import com.google.common.base.Preconditions;
Thomas Vachuska90b453f2015-01-30 18:57:14 -080021import com.google.common.collect.ImmutableSet;
Andrea Campanellac8eca242016-04-06 19:34:32 -070022import com.google.common.collect.Lists;
Thomas Vachuska761f0042015-11-11 19:10:17 -080023import com.google.common.collect.Maps;
24import com.google.common.collect.Multimap;
25import com.google.common.collect.Sets;
Madan Jampani6c02d9e2016-05-24 15:09:47 -070026
Thomas Vachuska90b453f2015-01-30 18:57:14 -080027import org.apache.felix.scr.annotations.Activate;
28import org.apache.felix.scr.annotations.Component;
29import org.apache.felix.scr.annotations.Deactivate;
30import org.apache.felix.scr.annotations.Reference;
31import org.apache.felix.scr.annotations.ReferenceCardinality;
32import org.apache.felix.scr.annotations.Service;
Thomas Vachuska90b453f2015-01-30 18:57:14 -080033import org.onosproject.app.ApplicationDescription;
34import org.onosproject.app.ApplicationEvent;
35import org.onosproject.app.ApplicationException;
36import org.onosproject.app.ApplicationState;
37import org.onosproject.app.ApplicationStore;
Thomas Vachuskacf960112015-03-06 22:36:51 -080038import org.onosproject.app.ApplicationStoreDelegate;
Thomas Vachuska90b453f2015-01-30 18:57:14 -080039import org.onosproject.cluster.ClusterService;
40import org.onosproject.cluster.ControllerNode;
41import org.onosproject.common.app.ApplicationArchive;
42import org.onosproject.core.Application;
43import org.onosproject.core.ApplicationId;
44import org.onosproject.core.ApplicationIdStore;
Thomas Vachuska761f0042015-11-11 19:10:17 -080045import org.onosproject.core.CoreService;
Thomas Vachuska90b453f2015-01-30 18:57:14 -080046import org.onosproject.core.DefaultApplication;
Changhoon Yoonb856b812015-08-10 03:47:19 +090047import org.onosproject.security.Permission;
Thomas Vachuska90b453f2015-01-30 18:57:14 -080048import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
Thomas Vachuska90b453f2015-01-30 18:57:14 -080049import org.onosproject.store.cluster.messaging.MessageSubject;
Thomas Vachuska90b453f2015-01-30 18:57:14 -080050import org.onosproject.store.serializers.KryoNamespaces;
Madan Jampani6c02d9e2016-05-24 15:09:47 -070051import org.onosproject.store.service.ConsistentMap;
52import org.onosproject.store.service.MapEvent;
53import org.onosproject.store.service.MapEventListener;
54import org.onosproject.store.service.Serializer;
Madan Jampani01e05fb2015-08-13 13:29:36 -070055import org.onosproject.store.service.StorageException;
Jonathan Hart6ec029a2015-03-24 17:12:35 -070056import org.onosproject.store.service.StorageService;
Madan Jampani6c02d9e2016-05-24 15:09:47 -070057import org.onosproject.store.service.Versioned;
58import org.onosproject.store.service.DistributedPrimitive.Status;
Thomas Vachuska90b453f2015-01-30 18:57:14 -080059import org.slf4j.Logger;
60
61import java.io.ByteArrayInputStream;
Madan Jampani01e05fb2015-08-13 13:29:36 -070062import java.io.IOException;
Thomas Vachuska90b453f2015-01-30 18:57:14 -080063import java.io.InputStream;
Andrea Campanellac8eca242016-04-06 19:34:32 -070064import java.util.List;
65import java.util.Optional;
Thomas Vachuska90b453f2015-01-30 18:57:14 -080066import java.util.Set;
67import java.util.concurrent.CountDownLatch;
Madan Jampani2af244a2015-02-22 13:12:01 -080068import java.util.concurrent.ExecutorService;
Thomas Vachuska90b453f2015-01-30 18:57:14 -080069import java.util.concurrent.Executors;
70import java.util.concurrent.ScheduledExecutorService;
Jonathan Hartd6fb0532016-01-21 17:28:20 -080071import java.util.concurrent.TimeUnit;
Madan Jampani6c02d9e2016-05-24 15:09:47 -070072import java.util.concurrent.atomic.AtomicBoolean;
73import java.util.function.Consumer;
Madan Jampani2bfa94c2015-04-11 05:03:49 -070074import java.util.function.Function;
Madan Jampani6c02d9e2016-05-24 15:09:47 -070075import java.util.stream.Collectors;
Madan Jampani2bfa94c2015-04-11 05:03:49 -070076
Thomas Vachuska761f0042015-11-11 19:10:17 -080077import static com.google.common.collect.Multimaps.newSetMultimap;
78import static com.google.common.collect.Multimaps.synchronizedSetMultimap;
Thomas Vachuska90b453f2015-01-30 18:57:14 -080079import static com.google.common.io.ByteStreams.toByteArray;
HIGUCHI Yuta060da9a2016-03-11 19:16:35 -080080import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
Thomas Vachuska90b453f2015-01-30 18:57:14 -080081import static java.util.concurrent.TimeUnit.MILLISECONDS;
Thomas Vachuska6f94ded2015-02-21 14:02:38 -080082import static org.onlab.util.Tools.groupedThreads;
Thomas Vachuskaadba1522015-06-04 15:08:30 -070083import static org.onlab.util.Tools.randomDelay;
Thomas Vachuskad0d58542015-06-03 12:38:44 -070084import static org.onosproject.app.ApplicationEvent.Type.*;
Madan Jampani6c02d9e2016-05-24 15:09:47 -070085import static org.onosproject.store.app.DistributedApplicationStore.InternalState.*;
Thomas Vachuska90b453f2015-01-30 18:57:14 -080086import static org.slf4j.LoggerFactory.getLogger;
87
88/**
Madan Jampani6c02d9e2016-05-24 15:09:47 -070089 * Manages inventory of applications in a distributed data store providing
90 * stronger consistency guarantees.
Thomas Vachuska90b453f2015-01-30 18:57:14 -080091 */
Madan Jampani6c02d9e2016-05-24 15:09:47 -070092@Component(immediate = true, enabled = true)
Thomas Vachuska90b453f2015-01-30 18:57:14 -080093@Service
Madan Jampani6c02d9e2016-05-24 15:09:47 -070094public class DistributedApplicationStore extends ApplicationArchive
Thomas Vachuska90b453f2015-01-30 18:57:14 -080095 implements ApplicationStore {
96
97 private final Logger log = getLogger(getClass());
98
99 private static final MessageSubject APP_BITS_REQUEST = new MessageSubject("app-bits-request");
100
Thomas Vachuskaadba1522015-06-04 15:08:30 -0700101 private static final int MAX_LOAD_RETRIES = 5;
Thomas Vachuskad0d58542015-06-03 12:38:44 -0700102 private static final int RETRY_DELAY_MS = 2_000;
103
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800104 private static final int FETCH_TIMEOUT_MS = 10_000;
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800105
Jonathan Hartd6fb0532016-01-21 17:28:20 -0800106 private static final int APP_LOAD_DELAY_MS = 500;
107
Andrea Campanellac8eca242016-04-06 19:34:32 -0700108 private static List<String> pendingApps = Lists.newArrayList();
109
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800110 public enum InternalState {
111 INSTALLED, ACTIVATED, DEACTIVATED
112 }
113
Madan Jampani6b5b7172015-02-23 13:02:26 -0800114 private ScheduledExecutorService executor;
Madan Jampani2af244a2015-02-22 13:12:01 -0800115 private ExecutorService messageHandlingExecutor;
116
Madan Jampani6c02d9e2016-05-24 15:09:47 -0700117 private ConsistentMap<ApplicationId, InternalApplicationHolder> apps;
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800118
119 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
120 protected ClusterCommunicationService clusterCommunicator;
121
122 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
123 protected ClusterService clusterService;
124
125 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700126 protected StorageService storageService;
127
128 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Madan Jampani3e033bd2015-04-08 13:03:49 -0700129 protected ApplicationIdStore idStore;
Thomas Vachuskacf960112015-03-06 22:36:51 -0800130
Madan Jampani6c02d9e2016-05-24 15:09:47 -0700131 private final InternalAppsListener appsListener = new InternalAppsListener();
132
133 private Consumer<Status> statusChangeListener;
134
Thomas Vachuska761f0042015-11-11 19:10:17 -0800135 // Multimap to track which apps are required by others apps
136 // app -> { required-by, ... }
137 // Apps explicitly activated will be required by the CORE app
138 private final Multimap<ApplicationId, ApplicationId> requiredBy =
139 synchronizedSetMultimap(newSetMultimap(Maps.newHashMap(), Sets::newHashSet));
140
141 private ApplicationId coreAppId;
142
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800143 @Activate
144 public void activate() {
Madan Jampani2af244a2015-02-22 13:12:01 -0800145 messageHandlingExecutor = Executors.newSingleThreadExecutor(
HIGUCHI Yuta060da9a2016-03-11 19:16:35 -0800146 groupedThreads("onos/store/app", "message-handler", log));
Madan Jampani01e05fb2015-08-13 13:29:36 -0700147 clusterCommunicator.<String, byte[]>addSubscriber(APP_BITS_REQUEST,
Thomas Vachuska761f0042015-11-11 19:10:17 -0800148 bytes -> new String(bytes, Charsets.UTF_8),
149 name -> {
150 try {
151 return toByteArray(getApplicationInputStream(name));
152 } catch (IOException e) {
153 throw new StorageException(e);
154 }
155 },
156 Function.identity(),
157 messageHandlingExecutor);
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800158
Madan Jampani6c02d9e2016-05-24 15:09:47 -0700159 apps = storageService.<ApplicationId, InternalApplicationHolder>consistentMapBuilder()
160 .withName("onos-apps")
161 .withRelaxedReadConsistency()
162 .withSerializer(Serializer.using(KryoNamespaces.API,
163 InternalApplicationHolder.class,
164 InternalState.class))
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700165 .build();
Thomas Vachuskacf960112015-03-06 22:36:51 -0800166
Madan Jampani6c02d9e2016-05-24 15:09:47 -0700167 executor = newSingleThreadScheduledExecutor(groupedThreads("onos/app", "store", log));
168 statusChangeListener = status -> {
169 if (status == Status.ACTIVE) {
170 executor.execute(this::bootstrapExistingApplications);
171 }
172 };
173 apps.addListener(appsListener, messageHandlingExecutor);
174 apps.addStatusChangeListener(statusChangeListener);
Thomas Vachuska761f0042015-11-11 19:10:17 -0800175 coreAppId = getId(CoreService.CORE_APP_NAME);
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800176 log.info("Started");
177 }
178
Thomas Vachuskacf960112015-03-06 22:36:51 -0800179 /**
Madan Jampani6c02d9e2016-05-24 15:09:47 -0700180 * Processes existing applications from the distributed map. This is done to
181 * account for events that this instance may be have missed due to a staggered start.
182 */
183 void bootstrapExistingApplications() {
184 apps.asJavaMap().forEach((appId, holder) -> setupApplicationAndNotify(appId, holder.app(), holder.state()));
185
186 }
187
188 /**
Thomas Vachuskacf960112015-03-06 22:36:51 -0800189 * Loads the application inventory from the disk and activates apps if
190 * they are marked to be active.
191 */
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800192 private void loadFromDisk() {
Charles Chane889e2d2015-11-17 12:01:02 -0800193 getApplicationNames().forEach(appName -> {
194 Application app = loadFromDisk(appName);
195 if (app != null && isActive(app.id().name())) {
196 activate(app.id(), false);
197 // TODO Load app permissions
198 }
199 });
200 }
201
202 private Application loadFromDisk(String appName) {
Andrea Campanellac8eca242016-04-06 19:34:32 -0700203 pendingApps.add(appName);
204
Charles Chane889e2d2015-11-17 12:01:02 -0800205 for (int i = 0; i < MAX_LOAD_RETRIES; i++) {
206 try {
207 // Directly return if app already exists
208 ApplicationId appId = getId(appName);
209 if (appId != null) {
Ray Milkey64591a62016-01-12 09:50:45 -0800210 Application application = getApplication(appId);
211 if (application != null) {
Andrea Campanellac8eca242016-04-06 19:34:32 -0700212 pendingApps.remove(appName);
Ray Milkey64591a62016-01-12 09:50:45 -0800213 return application;
214 }
Thomas Vachuska62f04a42015-04-22 14:38:34 -0700215 }
Charles Chane889e2d2015-11-17 12:01:02 -0800216
217 ApplicationDescription appDesc = getApplicationDescription(appName);
Andrea Campanellac8eca242016-04-06 19:34:32 -0700218
219 Optional<String> loop = appDesc.requiredApps().stream()
220 .filter(app -> pendingApps.contains(app)).findAny();
221 if (loop.isPresent()) {
222 log.error("Circular app dependency detected: {} -> {}", pendingApps, loop.get());
223 pendingApps.remove(appName);
224 return null;
225 }
226
Charles Chane889e2d2015-11-17 12:01:02 -0800227 boolean success = appDesc.requiredApps().stream()
228 .noneMatch(requiredApp -> loadFromDisk(requiredApp) == null);
Andrea Campanellac8eca242016-04-06 19:34:32 -0700229 pendingApps.remove(appName);
230
Charles Chane889e2d2015-11-17 12:01:02 -0800231 return success ? create(appDesc, false) : null;
Andrea Campanellac8eca242016-04-06 19:34:32 -0700232
Charles Chane889e2d2015-11-17 12:01:02 -0800233 } catch (Exception e) {
234 log.warn("Unable to load application {} from disk; retrying", appName);
235 randomDelay(RETRY_DELAY_MS); //FIXME: This is a deliberate hack; fix in Falcon
Thomas Vachuskacf960112015-03-06 22:36:51 -0800236 }
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800237 }
Andrea Campanellac8eca242016-04-06 19:34:32 -0700238 pendingApps.remove(appName);
Charles Chane889e2d2015-11-17 12:01:02 -0800239 return null;
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800240 }
241
242 @Deactivate
243 public void deactivate() {
Madan Jampani2af244a2015-02-22 13:12:01 -0800244 clusterCommunicator.removeSubscriber(APP_BITS_REQUEST);
Madan Jampani6c02d9e2016-05-24 15:09:47 -0700245 apps.removeStatusChangeListener(statusChangeListener);
246 apps.removeListener(appsListener);
Madan Jampani2af244a2015-02-22 13:12:01 -0800247 messageHandlingExecutor.shutdown();
Madan Jampani6b5b7172015-02-23 13:02:26 -0800248 executor.shutdown();
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800249 log.info("Stopped");
250 }
251
252 @Override
Thomas Vachuskacf960112015-03-06 22:36:51 -0800253 public void setDelegate(ApplicationStoreDelegate delegate) {
254 super.setDelegate(delegate);
Madan Jampani6c02d9e2016-05-24 15:09:47 -0700255 executor.execute(this::bootstrapExistingApplications);
Jonathan Hartd6fb0532016-01-21 17:28:20 -0800256 executor.schedule(() -> loadFromDisk(), APP_LOAD_DELAY_MS, TimeUnit.MILLISECONDS);
Thomas Vachuskacf960112015-03-06 22:36:51 -0800257 }
258
259 @Override
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800260 public Set<Application> getApplications() {
Madan Jampani6c02d9e2016-05-24 15:09:47 -0700261 return ImmutableSet.copyOf(apps.values()
262 .stream()
263 .map(Versioned::value)
264 .map(InternalApplicationHolder::app)
265 .collect(Collectors.toSet()));
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800266 }
267
268 @Override
269 public ApplicationId getId(String name) {
270 return idStore.getAppId(name);
271 }
272
273 @Override
274 public Application getApplication(ApplicationId appId) {
Madan Jampani6c02d9e2016-05-24 15:09:47 -0700275 InternalApplicationHolder appHolder = Versioned.valueOrNull(apps.get(appId));
276 return appHolder != null ? appHolder.app() : null;
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800277 }
278
279 @Override
280 public ApplicationState getState(ApplicationId appId) {
Madan Jampani6c02d9e2016-05-24 15:09:47 -0700281 InternalApplicationHolder appHolder = Versioned.valueOrNull(apps.get(appId));
282 InternalState state = appHolder != null ? appHolder.state() : null;
283 return state == null ? null : state == ACTIVATED ? ApplicationState.ACTIVE : ApplicationState.INSTALLED;
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800284 }
285
286 @Override
287 public Application create(InputStream appDescStream) {
288 ApplicationDescription appDesc = saveApplication(appDescStream);
Thomas Vachuska761f0042015-11-11 19:10:17 -0800289 if (hasPrerequisites(appDesc)) {
290 return create(appDesc, true);
291 }
Jonathan Hart14651b52016-06-07 17:59:53 -0700292 // Purge bits off disk if we don't have prerequisites to allow app to be
293 // reinstalled later
294 purgeApplication(appDesc.name());
Thomas Vachuska761f0042015-11-11 19:10:17 -0800295 throw new ApplicationException("Missing dependencies for app " + appDesc.name());
296 }
297
298 private boolean hasPrerequisites(ApplicationDescription app) {
299 return !app.requiredApps().stream().map(n -> getId(n))
300 .anyMatch(id -> id == null || getApplication(id) == null);
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800301 }
302
Thomas Vachuska161baf52015-03-27 16:15:39 -0700303 private Application create(ApplicationDescription appDesc, boolean updateTime) {
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800304 Application app = registerApp(appDesc);
Thomas Vachuska161baf52015-03-27 16:15:39 -0700305 if (updateTime) {
306 updateTime(app.id().name());
307 }
Madan Jampani6c02d9e2016-05-24 15:09:47 -0700308 InternalApplicationHolder previousApp =
309 Versioned.valueOrNull(apps.putIfAbsent(app.id(), new InternalApplicationHolder(app, INSTALLED, null)));
310 return previousApp != null ? previousApp.app() : app;
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800311 }
312
313 @Override
314 public void remove(ApplicationId appId) {
Madan Jampani6c02d9e2016-05-24 15:09:47 -0700315 uninstallDependentApps(appId);
316 apps.remove(appId);
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800317 }
318
Thomas Vachuska761f0042015-11-11 19:10:17 -0800319 // Uninstalls all apps that depend on the given app.
Madan Jampani6c02d9e2016-05-24 15:09:47 -0700320 private void uninstallDependentApps(ApplicationId appId) {
Thomas Vachuska761f0042015-11-11 19:10:17 -0800321 getApplications().stream()
Madan Jampani6c02d9e2016-05-24 15:09:47 -0700322 .filter(a -> a.requiredApps().contains(appId.name()))
Thomas Vachuska761f0042015-11-11 19:10:17 -0800323 .forEach(a -> remove(a.id()));
324 }
325
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800326 @Override
327 public void activate(ApplicationId appId) {
Thomas Vachuska761f0042015-11-11 19:10:17 -0800328 activate(appId, coreAppId);
329 }
330
331 private void activate(ApplicationId appId, ApplicationId forAppId) {
332 requiredBy.put(appId, forAppId);
Thomas Vachuska161baf52015-03-27 16:15:39 -0700333 activate(appId, true);
334 }
335
Thomas Vachuska761f0042015-11-11 19:10:17 -0800336
Thomas Vachuska161baf52015-03-27 16:15:39 -0700337 private void activate(ApplicationId appId, boolean updateTime) {
Madan Jampani6c02d9e2016-05-24 15:09:47 -0700338 AtomicBoolean stateChanged = new AtomicBoolean(false);
339 InternalApplicationHolder appHolder = Versioned.valueOrNull(apps.computeIf(appId,
340 v -> v != null && v.state() != ACTIVATED,
341 (k, v) -> {
342 stateChanged.set(true);
343 return new InternalApplicationHolder(v.app(), ACTIVATED, v.permissions());
344 }));
345 if (stateChanged.get()) {
Thomas Vachuska161baf52015-03-27 16:15:39 -0700346 if (updateTime) {
347 updateTime(appId.name());
348 }
Madan Jampani6c02d9e2016-05-24 15:09:47 -0700349 activateRequiredApps(appHolder.app());
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800350 }
351 }
352
Thomas Vachuska761f0042015-11-11 19:10:17 -0800353 // Activates all apps required by this application.
354 private void activateRequiredApps(Application app) {
355 app.requiredApps().stream().map(this::getId).forEach(id -> activate(id, app.id()));
356 }
357
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800358 @Override
359 public void deactivate(ApplicationId appId) {
Madan Jampani6c02d9e2016-05-24 15:09:47 -0700360 deactivateDependentApps(appId);
Thomas Vachuska761f0042015-11-11 19:10:17 -0800361 deactivate(appId, coreAppId);
362 }
363
364 private void deactivate(ApplicationId appId, ApplicationId forAppId) {
365 requiredBy.remove(appId, forAppId);
366 if (requiredBy.get(appId).isEmpty()) {
Madan Jampani6c02d9e2016-05-24 15:09:47 -0700367 AtomicBoolean stateChanged = new AtomicBoolean(false);
368 apps.computeIf(appId,
369 v -> v != null && v.state() != DEACTIVATED,
370 (k, v) -> {
371 stateChanged.set(true);
372 return new InternalApplicationHolder(v.app(), DEACTIVATED, v.permissions());
373 });
374 if (stateChanged.get()) {
Thomas Vachuska761f0042015-11-11 19:10:17 -0800375 updateTime(appId.name());
Madan Jampani6c02d9e2016-05-24 15:09:47 -0700376 deactivateRequiredApps(appId);
Thomas Vachuska761f0042015-11-11 19:10:17 -0800377 }
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800378 }
379 }
380
Thomas Vachuska761f0042015-11-11 19:10:17 -0800381 // Deactivates all apps that require this application.
Madan Jampani6c02d9e2016-05-24 15:09:47 -0700382 private void deactivateDependentApps(ApplicationId appId) {
383 apps.values()
384 .stream()
385 .map(Versioned::value)
386 .filter(a -> a.state() == ACTIVATED)
387 .filter(a -> a.app().requiredApps().contains(appId.name()))
388 .forEach(a -> deactivate(a.app().id()));
Thomas Vachuska761f0042015-11-11 19:10:17 -0800389 }
390
391 // Deactivates all apps required by this application.
Madan Jampani6c02d9e2016-05-24 15:09:47 -0700392 private void deactivateRequiredApps(ApplicationId appId) {
393 getApplication(appId).requiredApps()
394 .stream()
395 .map(this::getId)
396 .map(apps::get)
397 .map(Versioned::value)
398 .filter(a -> a.state() == ACTIVATED)
399 .forEach(a -> deactivate(a.app().id(), appId));
Thomas Vachuska761f0042015-11-11 19:10:17 -0800400 }
401
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800402 @Override
403 public Set<Permission> getPermissions(ApplicationId appId) {
Madan Jampani6c02d9e2016-05-24 15:09:47 -0700404 InternalApplicationHolder app = Versioned.valueOrNull(apps.get(appId));
405 return app != null ? ImmutableSet.copyOf(app.permissions()) : ImmutableSet.of();
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800406 }
407
408 @Override
409 public void setPermissions(ApplicationId appId, Set<Permission> permissions) {
Madan Jampani6c02d9e2016-05-24 15:09:47 -0700410 AtomicBoolean permissionsChanged = new AtomicBoolean(false);
411 Versioned<InternalApplicationHolder> appHolder = apps.computeIf(appId,
412 v -> v != null && !Sets.symmetricDifference(v.permissions(), permissions).isEmpty(),
413 (k, v) -> {
414 permissionsChanged.set(true);
415 return new InternalApplicationHolder(v.app(), v.state(), ImmutableSet.copyOf(permissions));
416 });
417 if (permissionsChanged.get()) {
418 delegate.notify(new ApplicationEvent(APP_PERMISSIONS_CHANGED, appHolder.value().app()));
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800419 }
420 }
421
422 /**
423 * Listener to application state distributed map changes.
424 */
Madan Jampani6c02d9e2016-05-24 15:09:47 -0700425 private final class InternalAppsListener
426 implements MapEventListener<ApplicationId, InternalApplicationHolder> {
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800427 @Override
Madan Jampani6c02d9e2016-05-24 15:09:47 -0700428 public void event(MapEvent<ApplicationId, InternalApplicationHolder> event) {
Thomas Vachuska4fcdae72015-03-24 10:22:54 -0700429 if (delegate == null) {
430 return;
431 }
432
Madan Jampani6c02d9e2016-05-24 15:09:47 -0700433 ApplicationId appId = event.key();
434 InternalApplicationHolder newApp = event.newValue() == null ? null : event.newValue().value();
435 InternalApplicationHolder oldApp = event.oldValue() == null ? null : event.oldValue().value();
436 if (event.type() == MapEvent.Type.INSERT || event.type() == MapEvent.Type.UPDATE) {
437 if (event.type() == MapEvent.Type.UPDATE && newApp.state() == oldApp.state()) {
438 return;
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800439 }
Madan Jampani6c02d9e2016-05-24 15:09:47 -0700440 setupApplicationAndNotify(appId, newApp.app(), newApp.state());
441 } else if (event.type() == MapEvent.Type.REMOVE) {
442 delegate.notify(new ApplicationEvent(APP_UNINSTALLED, oldApp.app()));
443 purgeApplication(appId.name());
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800444 }
445 }
446 }
447
Madan Jampani6c02d9e2016-05-24 15:09:47 -0700448 private void setupApplicationAndNotify(ApplicationId appId, Application app, InternalState state) {
449 if (state == INSTALLED) {
450 fetchBitsIfNeeded(app);
451 delegate.notify(new ApplicationEvent(APP_INSTALLED, app));
452 } else if (state == ACTIVATED) {
453 installAppIfNeeded(app);
454 setActive(appId.name());
455 delegate.notify(new ApplicationEvent(APP_ACTIVATED, app));
456 } else if (state == DEACTIVATED) {
457 clearActive(appId.name());
458 delegate.notify(new ApplicationEvent(APP_DEACTIVATED, app));
459 }
460 }
461
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800462 /**
463 * Determines if the application bits are available locally.
464 */
465 private boolean appBitsAvailable(Application app) {
466 try {
467 ApplicationDescription appDesc = getApplicationDescription(app.id().name());
468 return appDesc.version().equals(app.version());
469 } catch (ApplicationException e) {
470 return false;
471 }
472 }
473
474 /**
475 * Fetches the bits from the cluster peers if necessary.
476 */
477 private void fetchBitsIfNeeded(Application app) {
478 if (!appBitsAvailable(app)) {
479 fetchBits(app);
480 }
481 }
482
483 /**
484 * Installs the application if necessary from the application peers.
485 */
486 private void installAppIfNeeded(Application app) {
487 if (!appBitsAvailable(app)) {
488 fetchBits(app);
489 delegate.notify(new ApplicationEvent(APP_INSTALLED, app));
490 }
491 }
492
493 /**
494 * Fetches the bits from the cluster peers.
495 */
496 private void fetchBits(Application app) {
497 ControllerNode localNode = clusterService.getLocalNode();
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800498 CountDownLatch latch = new CountDownLatch(1);
499
500 // FIXME: send message with name & version to make sure we don't get served old bits
501
502 log.info("Downloading bits for application {}", app.id().name());
503 for (ControllerNode node : clusterService.getNodes()) {
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700504 if (latch.getCount() == 0) {
505 break;
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800506 }
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700507 if (node.equals(localNode)) {
508 continue;
509 }
510 clusterCommunicator.sendAndReceive(app.id().name(),
Thomas Vachuskad0d58542015-06-03 12:38:44 -0700511 APP_BITS_REQUEST,
512 s -> s.getBytes(Charsets.UTF_8),
513 Function.identity(),
514 node.id())
515 .whenCompleteAsync((bits, error) -> {
516 if (error == null && latch.getCount() > 0) {
517 saveApplication(new ByteArrayInputStream(bits));
518 log.info("Downloaded bits for application {} from node {}",
519 app.id().name(), node.id());
520 latch.countDown();
521 } else if (error != null) {
522 log.warn("Unable to fetch bits for application {} from node {}",
523 app.id().name(), node.id());
524 }
525 }, executor);
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800526 }
527
528 try {
529 if (!latch.await(FETCH_TIMEOUT_MS, MILLISECONDS)) {
530 log.warn("Unable to fetch bits for application {}", app.id().name());
531 }
532 } catch (InterruptedException e) {
533 log.warn("Interrupted while fetching bits for application {}", app.id().name());
534 }
535 }
536
537 /**
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800538 * Produces a registered application from the supplied description.
539 */
540 private Application registerApp(ApplicationDescription appDesc) {
541 ApplicationId appId = idStore.registerApplication(appDesc.name());
Simon Huntafae2f72016-03-04 21:18:23 -0800542 return new DefaultApplication(appId,
543 appDesc.version(),
544 appDesc.title(),
545 appDesc.description(),
546 appDesc.origin(),
547 appDesc.category(),
548 appDesc.url(),
549 appDesc.readme(),
550 appDesc.icon(),
551 appDesc.role(),
552 appDesc.permissions(),
553 appDesc.featuresRepo(),
554 appDesc.features(),
555 appDesc.requiredApps());
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800556 }
Madan Jampani6c02d9e2016-05-24 15:09:47 -0700557
558 /**
559 * Internal class for holding app information.
560 */
561 private static class InternalApplicationHolder {
562 private final Application app;
563 private final InternalState state;
564 private final Set<Permission> permissions;
565
566 @SuppressWarnings("unused")
567 private InternalApplicationHolder() {
568 app = null;
569 state = null;
570 permissions = null;
571 }
572
573 public InternalApplicationHolder(Application app, InternalState state, Set<Permission> permissions) {
574 this.app = Preconditions.checkNotNull(app);
575 this.state = state;
576 this.permissions = permissions == null ? null : ImmutableSet.copyOf(permissions);
577 }
578
579 public Application app() {
580 return app;
581 }
582
583 public InternalState state() {
584 return state;
585 }
586
587 public Set<Permission> permissions() {
588 return permissions;
589 }
590
591 @Override
592 public String toString() {
593 return MoreObjects.toStringHelper(getClass())
594 .add("app", app.id())
595 .add("state", state)
596 .toString();
597 }
598 }
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800599}