blob: 4910ce4360cd9148be4fa9dd935ab0c2d0e5e065 [file] [log] [blame]
Thomas Vachuska90b453f2015-01-30 18:57:14 -08001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.app;
17
Yuta HIGUCHI80292052015-02-10 23:11:59 -080018import com.google.common.base.Charsets;
Thomas Vachuska90b453f2015-01-30 18:57:14 -080019import com.google.common.collect.ImmutableSet;
Madan Jampani3e033bd2015-04-08 13:03:49 -070020
Thomas Vachuska90b453f2015-01-30 18:57:14 -080021import org.apache.felix.scr.annotations.Activate;
22import org.apache.felix.scr.annotations.Component;
23import org.apache.felix.scr.annotations.Deactivate;
24import org.apache.felix.scr.annotations.Reference;
25import org.apache.felix.scr.annotations.ReferenceCardinality;
26import org.apache.felix.scr.annotations.Service;
27import org.onlab.util.KryoNamespace;
28import org.onosproject.app.ApplicationDescription;
29import org.onosproject.app.ApplicationEvent;
30import org.onosproject.app.ApplicationException;
31import org.onosproject.app.ApplicationState;
32import org.onosproject.app.ApplicationStore;
Thomas Vachuskacf960112015-03-06 22:36:51 -080033import org.onosproject.app.ApplicationStoreDelegate;
Thomas Vachuska90b453f2015-01-30 18:57:14 -080034import org.onosproject.cluster.ClusterService;
35import org.onosproject.cluster.ControllerNode;
36import org.onosproject.common.app.ApplicationArchive;
37import org.onosproject.core.Application;
38import org.onosproject.core.ApplicationId;
39import org.onosproject.core.ApplicationIdStore;
40import org.onosproject.core.DefaultApplication;
41import org.onosproject.core.Permission;
42import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
43import org.onosproject.store.cluster.messaging.ClusterMessage;
44import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
45import org.onosproject.store.cluster.messaging.MessageSubject;
Thomas Vachuskacf960112015-03-06 22:36:51 -080046import org.onosproject.store.impl.MultiValuedTimestamp;
Thomas Vachuska90b453f2015-01-30 18:57:14 -080047import org.onosproject.store.serializers.KryoNamespaces;
Jonathan Hart6ec029a2015-03-24 17:12:35 -070048import org.onosproject.store.service.EventuallyConsistentMap;
49import org.onosproject.store.service.EventuallyConsistentMapEvent;
50import org.onosproject.store.service.EventuallyConsistentMapListener;
Madan Jampani3e033bd2015-04-08 13:03:49 -070051import org.onosproject.store.service.LogicalClockService;
Jonathan Hart6ec029a2015-03-24 17:12:35 -070052import org.onosproject.store.service.StorageService;
Thomas Vachuska90b453f2015-01-30 18:57:14 -080053import org.slf4j.Logger;
54
55import java.io.ByteArrayInputStream;
Thomas Vachuska90b453f2015-01-30 18:57:14 -080056import java.io.InputStream;
Thomas Vachuska90b453f2015-01-30 18:57:14 -080057import java.util.Set;
58import java.util.concurrent.CountDownLatch;
Madan Jampani2af244a2015-02-22 13:12:01 -080059import java.util.concurrent.ExecutorService;
Thomas Vachuska90b453f2015-01-30 18:57:14 -080060import java.util.concurrent.Executors;
61import java.util.concurrent.ScheduledExecutorService;
Madan Jampani2bfa94c2015-04-11 05:03:49 -070062import java.util.function.Function;
63
Thomas Vachuska90b453f2015-01-30 18:57:14 -080064import static com.google.common.io.ByteStreams.toByteArray;
65import static java.util.concurrent.TimeUnit.MILLISECONDS;
Thomas Vachuska6f94ded2015-02-21 14:02:38 -080066import static org.onlab.util.Tools.groupedThreads;
Jonathan Hart6ec029a2015-03-24 17:12:35 -070067import static org.onosproject.app.ApplicationEvent.Type.APP_ACTIVATED;
68import static org.onosproject.app.ApplicationEvent.Type.APP_DEACTIVATED;
69import static org.onosproject.app.ApplicationEvent.Type.APP_INSTALLED;
70import static org.onosproject.app.ApplicationEvent.Type.APP_PERMISSIONS_CHANGED;
71import static org.onosproject.app.ApplicationEvent.Type.APP_UNINSTALLED;
72import static org.onosproject.store.app.GossipApplicationStore.InternalState.ACTIVATED;
73import static org.onosproject.store.app.GossipApplicationStore.InternalState.DEACTIVATED;
74import static org.onosproject.store.app.GossipApplicationStore.InternalState.INSTALLED;
75import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.PUT;
76import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.REMOVE;
Thomas Vachuska90b453f2015-01-30 18:57:14 -080077import static org.slf4j.LoggerFactory.getLogger;
78
79/**
80 * Manages inventory of applications in a distributed data store that uses
81 * optimistic replication and gossip based anti-entropy techniques.
82 */
83@Component(immediate = true)
84@Service
85public class GossipApplicationStore extends ApplicationArchive
86 implements ApplicationStore {
87
Thomas Vachuska62f04a42015-04-22 14:38:34 -070088 private static final int MAX_LOAD_RETRIES = 3;
89
Thomas Vachuska90b453f2015-01-30 18:57:14 -080090 private final Logger log = getLogger(getClass());
91
92 private static final MessageSubject APP_BITS_REQUEST = new MessageSubject("app-bits-request");
93
94 private static final int FETCH_TIMEOUT_MS = 10_000;
95 private static final int LOAD_TIMEOUT_MS = 5_000;
96
97 public enum InternalState {
98 INSTALLED, ACTIVATED, DEACTIVATED
99 }
100
Madan Jampani6b5b7172015-02-23 13:02:26 -0800101 private ScheduledExecutorService executor;
Madan Jampani2af244a2015-02-22 13:12:01 -0800102 private ExecutorService messageHandlingExecutor;
103
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800104 private EventuallyConsistentMap<ApplicationId, Application> apps;
105 private EventuallyConsistentMap<Application, InternalState> states;
106 private EventuallyConsistentMap<Application, Set<Permission>> permissions;
107
108 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
109 protected ClusterCommunicationService clusterCommunicator;
110
111 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
112 protected ClusterService clusterService;
113
114 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700115 protected StorageService storageService;
116
117 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Madan Jampani3e033bd2015-04-08 13:03:49 -0700118 protected LogicalClockService clockService;
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800119
Madan Jampani3e033bd2015-04-08 13:03:49 -0700120 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
121 protected ApplicationIdStore idStore;
Thomas Vachuskacf960112015-03-06 22:36:51 -0800122
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800123 @Activate
124 public void activate() {
Thomas Vachuskacf960112015-03-06 22:36:51 -0800125 KryoNamespace.Builder serializer = KryoNamespace.newBuilder()
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800126 .register(KryoNamespaces.API)
Thomas Vachuskacf960112015-03-06 22:36:51 -0800127 .register(MultiValuedTimestamp.class)
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800128 .register(InternalState.class);
129
Madan Jampani6b5b7172015-02-23 13:02:26 -0800130 executor = Executors.newSingleThreadScheduledExecutor(groupedThreads("onos/app", "store"));
131
Madan Jampani2af244a2015-02-22 13:12:01 -0800132 messageHandlingExecutor = Executors.newSingleThreadExecutor(
133 groupedThreads("onos/store/app", "message-handler"));
134
135 clusterCommunicator.addSubscriber(APP_BITS_REQUEST, new InternalBitServer(), messageHandlingExecutor);
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800136
Thomas Vachuskacf960112015-03-06 22:36:51 -0800137 // FIXME: Consider consolidating into a single map.
138
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700139 apps = storageService.<ApplicationId, Application>eventuallyConsistentMapBuilder()
140 .withName("apps")
141 .withSerializer(serializer)
Madan Jampani3e033bd2015-04-08 13:03:49 -0700142 .withClockService((k, v) -> clockService.getTimestamp())
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700143 .build();
Thomas Vachuskacf960112015-03-06 22:36:51 -0800144
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700145 states = storageService.<Application, InternalState>eventuallyConsistentMapBuilder()
146 .withName("app-states")
147 .withSerializer(serializer)
Madan Jampani3e033bd2015-04-08 13:03:49 -0700148 .withClockService((k, v) -> clockService.getTimestamp())
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700149 .build();
150
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800151 states.addListener(new InternalAppStatesListener());
152
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700153 permissions = storageService.<Application, Set<Permission>>eventuallyConsistentMapBuilder()
154 .withName("app-permissions")
155 .withSerializer(serializer)
Madan Jampani3e033bd2015-04-08 13:03:49 -0700156 .withClockService((k, v) -> clockService.getTimestamp())
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700157 .build();
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800158
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800159 log.info("Started");
160 }
161
Thomas Vachuskacf960112015-03-06 22:36:51 -0800162 /**
163 * Loads the application inventory from the disk and activates apps if
164 * they are marked to be active.
165 */
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800166 private void loadFromDisk() {
167 for (String name : getApplicationNames()) {
Thomas Vachuska62f04a42015-04-22 14:38:34 -0700168 for (int i = 0; i < MAX_LOAD_RETRIES; i++) {
169 try {
170 Application app = create(getApplicationDescription(name), false);
171 if (app != null && isActive(app.id().name())) {
172 activate(app.id(), false);
173 // load app permissions
174 }
175 } catch (Exception e) {
176 log.warn("Unable to load application {} from disk; retrying", name);
177 }
Thomas Vachuskacf960112015-03-06 22:36:51 -0800178 }
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800179 }
180 }
181
182 @Deactivate
183 public void deactivate() {
Madan Jampani2af244a2015-02-22 13:12:01 -0800184 clusterCommunicator.removeSubscriber(APP_BITS_REQUEST);
185 messageHandlingExecutor.shutdown();
Madan Jampani6b5b7172015-02-23 13:02:26 -0800186 executor.shutdown();
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800187 apps.destroy();
188 states.destroy();
189 permissions.destroy();
190 log.info("Stopped");
191 }
192
193 @Override
Thomas Vachuskacf960112015-03-06 22:36:51 -0800194 public void setDelegate(ApplicationStoreDelegate delegate) {
195 super.setDelegate(delegate);
196 loadFromDisk();
197// executor.schedule(this::pruneUninstalledApps, LOAD_TIMEOUT_MS, MILLISECONDS);
198 }
199
200 @Override
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800201 public Set<Application> getApplications() {
202 return ImmutableSet.copyOf(apps.values());
203 }
204
205 @Override
206 public ApplicationId getId(String name) {
207 return idStore.getAppId(name);
208 }
209
210 @Override
211 public Application getApplication(ApplicationId appId) {
212 return apps.get(appId);
213 }
214
215 @Override
216 public ApplicationState getState(ApplicationId appId) {
217 Application app = apps.get(appId);
218 InternalState s = app == null ? null : states.get(app);
219 return s == null ? null : s == ACTIVATED ?
220 ApplicationState.ACTIVE : ApplicationState.INSTALLED;
221 }
222
223 @Override
224 public Application create(InputStream appDescStream) {
225 ApplicationDescription appDesc = saveApplication(appDescStream);
Thomas Vachuska161baf52015-03-27 16:15:39 -0700226 return create(appDesc, true);
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800227 }
228
Thomas Vachuska161baf52015-03-27 16:15:39 -0700229 private Application create(ApplicationDescription appDesc, boolean updateTime) {
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800230 Application app = registerApp(appDesc);
Thomas Vachuska161baf52015-03-27 16:15:39 -0700231 if (updateTime) {
232 updateTime(app.id().name());
233 }
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800234 apps.put(app.id(), app);
235 states.put(app, INSTALLED);
236 return app;
237 }
238
239 @Override
240 public void remove(ApplicationId appId) {
241 Application app = apps.get(appId);
242 if (app != null) {
243 apps.remove(appId);
244 states.remove(app);
245 permissions.remove(app);
246 }
247 }
248
249 @Override
250 public void activate(ApplicationId appId) {
Thomas Vachuska161baf52015-03-27 16:15:39 -0700251 activate(appId, true);
252 }
253
254 private void activate(ApplicationId appId, boolean updateTime) {
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800255 Application app = apps.get(appId);
256 if (app != null) {
Thomas Vachuska161baf52015-03-27 16:15:39 -0700257 if (updateTime) {
258 updateTime(appId.name());
259 }
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800260 states.put(app, ACTIVATED);
261 }
262 }
263
264 @Override
265 public void deactivate(ApplicationId appId) {
266 Application app = apps.get(appId);
267 if (app != null) {
Thomas Vachuska161baf52015-03-27 16:15:39 -0700268 updateTime(appId.name());
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800269 states.put(app, DEACTIVATED);
270 }
271 }
272
273 @Override
274 public Set<Permission> getPermissions(ApplicationId appId) {
275 Application app = apps.get(appId);
276 return app != null ? permissions.get(app) : null;
277 }
278
279 @Override
280 public void setPermissions(ApplicationId appId, Set<Permission> permissions) {
281 Application app = getApplication(appId);
282 if (app != null) {
283 this.permissions.put(app, permissions);
284 delegate.notify(new ApplicationEvent(APP_PERMISSIONS_CHANGED, app));
285 }
286 }
287
288 /**
289 * Listener to application state distributed map changes.
290 */
291 private final class InternalAppStatesListener
292 implements EventuallyConsistentMapListener<Application, InternalState> {
293 @Override
294 public void event(EventuallyConsistentMapEvent<Application, InternalState> event) {
Thomas Vachuska4fcdae72015-03-24 10:22:54 -0700295 // If we do not have a delegate, refuse to process any events entirely.
296 // This is to allow the anti-entropy to kick in and process the events
297 // perhaps a bit later, but with opportunity to notify delegate.
298 if (delegate == null) {
299 return;
300 }
301
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800302 Application app = event.key();
303 InternalState state = event.value();
304
305 if (event.type() == PUT) {
306 if (state == INSTALLED) {
307 fetchBitsIfNeeded(app);
308 delegate.notify(new ApplicationEvent(APP_INSTALLED, app));
309
310 } else if (state == ACTIVATED) {
311 installAppIfNeeded(app);
312 setActive(app.id().name());
313 delegate.notify(new ApplicationEvent(APP_ACTIVATED, app));
314
315 } else if (state == DEACTIVATED) {
316 clearActive(app.id().name());
317 delegate.notify(new ApplicationEvent(APP_DEACTIVATED, app));
318 }
319 } else if (event.type() == REMOVE) {
320 delegate.notify(new ApplicationEvent(APP_UNINSTALLED, app));
321 purgeApplication(app.id().name());
322 }
323 }
324 }
325
326 /**
327 * Determines if the application bits are available locally.
328 */
329 private boolean appBitsAvailable(Application app) {
330 try {
331 ApplicationDescription appDesc = getApplicationDescription(app.id().name());
332 return appDesc.version().equals(app.version());
333 } catch (ApplicationException e) {
334 return false;
335 }
336 }
337
338 /**
339 * Fetches the bits from the cluster peers if necessary.
340 */
341 private void fetchBitsIfNeeded(Application app) {
342 if (!appBitsAvailable(app)) {
343 fetchBits(app);
344 }
345 }
346
347 /**
348 * Installs the application if necessary from the application peers.
349 */
350 private void installAppIfNeeded(Application app) {
351 if (!appBitsAvailable(app)) {
352 fetchBits(app);
353 delegate.notify(new ApplicationEvent(APP_INSTALLED, app));
354 }
355 }
356
357 /**
358 * Fetches the bits from the cluster peers.
359 */
360 private void fetchBits(Application app) {
361 ControllerNode localNode = clusterService.getLocalNode();
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800362 CountDownLatch latch = new CountDownLatch(1);
363
364 // FIXME: send message with name & version to make sure we don't get served old bits
365
366 log.info("Downloading bits for application {}", app.id().name());
367 for (ControllerNode node : clusterService.getNodes()) {
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700368 if (latch.getCount() == 0) {
369 break;
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800370 }
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700371 if (node.equals(localNode)) {
372 continue;
373 }
374 clusterCommunicator.sendAndReceive(app.id().name(),
375 APP_BITS_REQUEST,
376 s -> s.getBytes(Charsets.UTF_8),
377 Function.identity(),
378 node.id())
379 .whenCompleteAsync((bits, error) -> {
380 if (error == null && latch.getCount() > 0) {
381 saveApplication(new ByteArrayInputStream(bits));
382 log.info("Downloaded bits for application {} from node {}",
383 app.id().name(), node.id());
384 latch.countDown();
385 } else if (error != null) {
386 log.warn("Unable to fetch bits for application {} from node {}",
387 app.id().name(), node.id(), error);
388 }
389 }, executor);
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800390 }
391
392 try {
393 if (!latch.await(FETCH_TIMEOUT_MS, MILLISECONDS)) {
394 log.warn("Unable to fetch bits for application {}", app.id().name());
395 }
396 } catch (InterruptedException e) {
397 log.warn("Interrupted while fetching bits for application {}", app.id().name());
398 }
399 }
400
401 /**
402 * Responder to requests for application bits.
403 */
404 private class InternalBitServer implements ClusterMessageHandler {
405 @Override
406 public void handle(ClusterMessage message) {
Yuta HIGUCHI80292052015-02-10 23:11:59 -0800407 String name = new String(message.payload(), Charsets.UTF_8);
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800408 try {
409 message.respond(toByteArray(getApplicationInputStream(name)));
410 } catch (Exception e) {
411 log.debug("Unable to read bits for application {}", name);
412 }
413 }
414 }
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800415 /**
416 * Prunes applications which are not in the map, but are on disk.
417 */
418 private void pruneUninstalledApps() {
419 for (String name : getApplicationNames()) {
420 if (getApplication(getId(name)) == null) {
421 Application app = registerApp(getApplicationDescription(name));
422 delegate.notify(new ApplicationEvent(APP_UNINSTALLED, app));
423 purgeApplication(app.id().name());
424 }
425 }
426 }
427
428 /**
429 * Produces a registered application from the supplied description.
430 */
431 private Application registerApp(ApplicationDescription appDesc) {
432 ApplicationId appId = idStore.registerApplication(appDesc.name());
433 return new DefaultApplication(appId, appDesc.version(), appDesc.description(),
434 appDesc.origin(), appDesc.permissions(),
435 appDesc.featuresRepo(), appDesc.features());
436 }
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800437}