blob: 7e7ec060e691c07684498220aaa4bcd2c36e4b3f [file] [log] [blame]
Thomas Vachuska90b453f2015-01-30 18:57:14 -08001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.app;
17
Yuta HIGUCHI80292052015-02-10 23:11:59 -080018import com.google.common.base.Charsets;
Thomas Vachuska90b453f2015-01-30 18:57:14 -080019import com.google.common.collect.ImmutableSet;
Madan Jampani3e033bd2015-04-08 13:03:49 -070020
Thomas Vachuska90b453f2015-01-30 18:57:14 -080021import org.apache.felix.scr.annotations.Activate;
22import org.apache.felix.scr.annotations.Component;
23import org.apache.felix.scr.annotations.Deactivate;
24import org.apache.felix.scr.annotations.Reference;
25import org.apache.felix.scr.annotations.ReferenceCardinality;
26import org.apache.felix.scr.annotations.Service;
27import org.onlab.util.KryoNamespace;
28import org.onosproject.app.ApplicationDescription;
29import org.onosproject.app.ApplicationEvent;
30import org.onosproject.app.ApplicationException;
31import org.onosproject.app.ApplicationState;
32import org.onosproject.app.ApplicationStore;
Thomas Vachuskacf960112015-03-06 22:36:51 -080033import org.onosproject.app.ApplicationStoreDelegate;
Thomas Vachuska90b453f2015-01-30 18:57:14 -080034import org.onosproject.cluster.ClusterService;
35import org.onosproject.cluster.ControllerNode;
36import org.onosproject.common.app.ApplicationArchive;
37import org.onosproject.core.Application;
38import org.onosproject.core.ApplicationId;
39import org.onosproject.core.ApplicationIdStore;
40import org.onosproject.core.DefaultApplication;
41import org.onosproject.core.Permission;
42import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
43import org.onosproject.store.cluster.messaging.ClusterMessage;
44import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
45import org.onosproject.store.cluster.messaging.MessageSubject;
Thomas Vachuskacf960112015-03-06 22:36:51 -080046import org.onosproject.store.impl.MultiValuedTimestamp;
Thomas Vachuska90b453f2015-01-30 18:57:14 -080047import org.onosproject.store.serializers.KryoNamespaces;
Jonathan Hart6ec029a2015-03-24 17:12:35 -070048import org.onosproject.store.service.EventuallyConsistentMap;
49import org.onosproject.store.service.EventuallyConsistentMapEvent;
50import org.onosproject.store.service.EventuallyConsistentMapListener;
Madan Jampani3e033bd2015-04-08 13:03:49 -070051import org.onosproject.store.service.LogicalClockService;
Jonathan Hart6ec029a2015-03-24 17:12:35 -070052import org.onosproject.store.service.StorageService;
Thomas Vachuska90b453f2015-01-30 18:57:14 -080053import org.slf4j.Logger;
54
55import java.io.ByteArrayInputStream;
Thomas Vachuska90b453f2015-01-30 18:57:14 -080056import java.io.InputStream;
Thomas Vachuska90b453f2015-01-30 18:57:14 -080057import java.util.Set;
58import java.util.concurrent.CountDownLatch;
Madan Jampani2af244a2015-02-22 13:12:01 -080059import java.util.concurrent.ExecutorService;
Thomas Vachuska90b453f2015-01-30 18:57:14 -080060import java.util.concurrent.Executors;
61import java.util.concurrent.ScheduledExecutorService;
Madan Jampani2bfa94c2015-04-11 05:03:49 -070062import java.util.function.Function;
63
Thomas Vachuska90b453f2015-01-30 18:57:14 -080064import static com.google.common.io.ByteStreams.toByteArray;
65import static java.util.concurrent.TimeUnit.MILLISECONDS;
Thomas Vachuska6f94ded2015-02-21 14:02:38 -080066import static org.onlab.util.Tools.groupedThreads;
Jonathan Hart6ec029a2015-03-24 17:12:35 -070067import static org.onosproject.app.ApplicationEvent.Type.APP_ACTIVATED;
68import static org.onosproject.app.ApplicationEvent.Type.APP_DEACTIVATED;
69import static org.onosproject.app.ApplicationEvent.Type.APP_INSTALLED;
70import static org.onosproject.app.ApplicationEvent.Type.APP_PERMISSIONS_CHANGED;
71import static org.onosproject.app.ApplicationEvent.Type.APP_UNINSTALLED;
72import static org.onosproject.store.app.GossipApplicationStore.InternalState.ACTIVATED;
73import static org.onosproject.store.app.GossipApplicationStore.InternalState.DEACTIVATED;
74import static org.onosproject.store.app.GossipApplicationStore.InternalState.INSTALLED;
75import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.PUT;
76import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.REMOVE;
Thomas Vachuska90b453f2015-01-30 18:57:14 -080077import static org.slf4j.LoggerFactory.getLogger;
78
79/**
80 * Manages inventory of applications in a distributed data store that uses
81 * optimistic replication and gossip based anti-entropy techniques.
82 */
83@Component(immediate = true)
84@Service
85public class GossipApplicationStore extends ApplicationArchive
86 implements ApplicationStore {
87
88 private final Logger log = getLogger(getClass());
89
90 private static final MessageSubject APP_BITS_REQUEST = new MessageSubject("app-bits-request");
91
92 private static final int FETCH_TIMEOUT_MS = 10_000;
93 private static final int LOAD_TIMEOUT_MS = 5_000;
94
95 public enum InternalState {
96 INSTALLED, ACTIVATED, DEACTIVATED
97 }
98
Madan Jampani6b5b7172015-02-23 13:02:26 -080099 private ScheduledExecutorService executor;
Madan Jampani2af244a2015-02-22 13:12:01 -0800100 private ExecutorService messageHandlingExecutor;
101
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800102 private EventuallyConsistentMap<ApplicationId, Application> apps;
103 private EventuallyConsistentMap<Application, InternalState> states;
104 private EventuallyConsistentMap<Application, Set<Permission>> permissions;
105
106 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
107 protected ClusterCommunicationService clusterCommunicator;
108
109 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
110 protected ClusterService clusterService;
111
112 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700113 protected StorageService storageService;
114
115 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Madan Jampani3e033bd2015-04-08 13:03:49 -0700116 protected LogicalClockService clockService;
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800117
Madan Jampani3e033bd2015-04-08 13:03:49 -0700118 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
119 protected ApplicationIdStore idStore;
Thomas Vachuskacf960112015-03-06 22:36:51 -0800120
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800121 @Activate
122 public void activate() {
Thomas Vachuskacf960112015-03-06 22:36:51 -0800123 KryoNamespace.Builder serializer = KryoNamespace.newBuilder()
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800124 .register(KryoNamespaces.API)
Thomas Vachuskacf960112015-03-06 22:36:51 -0800125 .register(MultiValuedTimestamp.class)
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800126 .register(InternalState.class);
127
Madan Jampani6b5b7172015-02-23 13:02:26 -0800128 executor = Executors.newSingleThreadScheduledExecutor(groupedThreads("onos/app", "store"));
129
Madan Jampani2af244a2015-02-22 13:12:01 -0800130 messageHandlingExecutor = Executors.newSingleThreadExecutor(
131 groupedThreads("onos/store/app", "message-handler"));
132
133 clusterCommunicator.addSubscriber(APP_BITS_REQUEST, new InternalBitServer(), messageHandlingExecutor);
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800134
Thomas Vachuskacf960112015-03-06 22:36:51 -0800135 // FIXME: Consider consolidating into a single map.
136
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700137 apps = storageService.<ApplicationId, Application>eventuallyConsistentMapBuilder()
138 .withName("apps")
139 .withSerializer(serializer)
Madan Jampani3e033bd2015-04-08 13:03:49 -0700140 .withClockService((k, v) -> clockService.getTimestamp())
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700141 .build();
Thomas Vachuskacf960112015-03-06 22:36:51 -0800142
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700143 states = storageService.<Application, InternalState>eventuallyConsistentMapBuilder()
144 .withName("app-states")
145 .withSerializer(serializer)
Madan Jampani3e033bd2015-04-08 13:03:49 -0700146 .withClockService((k, v) -> clockService.getTimestamp())
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700147 .build();
148
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800149 states.addListener(new InternalAppStatesListener());
150
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700151 permissions = storageService.<Application, Set<Permission>>eventuallyConsistentMapBuilder()
152 .withName("app-permissions")
153 .withSerializer(serializer)
Madan Jampani3e033bd2015-04-08 13:03:49 -0700154 .withClockService((k, v) -> clockService.getTimestamp())
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700155 .build();
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800156
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800157 log.info("Started");
158 }
159
Thomas Vachuskacf960112015-03-06 22:36:51 -0800160 /**
161 * Loads the application inventory from the disk and activates apps if
162 * they are marked to be active.
163 */
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800164 private void loadFromDisk() {
165 for (String name : getApplicationNames()) {
Thomas Vachuska161baf52015-03-27 16:15:39 -0700166 Application app = create(getApplicationDescription(name), false);
Thomas Vachuskacf960112015-03-06 22:36:51 -0800167 if (app != null && isActive(app.id().name())) {
Thomas Vachuska161baf52015-03-27 16:15:39 -0700168 activate(app.id(), false);
Thomas Vachuskacf960112015-03-06 22:36:51 -0800169 // load app permissions
170 }
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800171 }
172 }
173
174 @Deactivate
175 public void deactivate() {
Madan Jampani2af244a2015-02-22 13:12:01 -0800176 clusterCommunicator.removeSubscriber(APP_BITS_REQUEST);
177 messageHandlingExecutor.shutdown();
Madan Jampani6b5b7172015-02-23 13:02:26 -0800178 executor.shutdown();
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800179 apps.destroy();
180 states.destroy();
181 permissions.destroy();
182 log.info("Stopped");
183 }
184
185 @Override
Thomas Vachuskacf960112015-03-06 22:36:51 -0800186 public void setDelegate(ApplicationStoreDelegate delegate) {
187 super.setDelegate(delegate);
188 loadFromDisk();
189// executor.schedule(this::pruneUninstalledApps, LOAD_TIMEOUT_MS, MILLISECONDS);
190 }
191
192 @Override
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800193 public Set<Application> getApplications() {
194 return ImmutableSet.copyOf(apps.values());
195 }
196
197 @Override
198 public ApplicationId getId(String name) {
199 return idStore.getAppId(name);
200 }
201
202 @Override
203 public Application getApplication(ApplicationId appId) {
204 return apps.get(appId);
205 }
206
207 @Override
208 public ApplicationState getState(ApplicationId appId) {
209 Application app = apps.get(appId);
210 InternalState s = app == null ? null : states.get(app);
211 return s == null ? null : s == ACTIVATED ?
212 ApplicationState.ACTIVE : ApplicationState.INSTALLED;
213 }
214
215 @Override
216 public Application create(InputStream appDescStream) {
217 ApplicationDescription appDesc = saveApplication(appDescStream);
Thomas Vachuska161baf52015-03-27 16:15:39 -0700218 return create(appDesc, true);
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800219 }
220
Thomas Vachuska161baf52015-03-27 16:15:39 -0700221 private Application create(ApplicationDescription appDesc, boolean updateTime) {
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800222 Application app = registerApp(appDesc);
Thomas Vachuska161baf52015-03-27 16:15:39 -0700223 if (updateTime) {
224 updateTime(app.id().name());
225 }
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800226 apps.put(app.id(), app);
227 states.put(app, INSTALLED);
228 return app;
229 }
230
231 @Override
232 public void remove(ApplicationId appId) {
233 Application app = apps.get(appId);
234 if (app != null) {
235 apps.remove(appId);
236 states.remove(app);
237 permissions.remove(app);
238 }
239 }
240
241 @Override
242 public void activate(ApplicationId appId) {
Thomas Vachuska161baf52015-03-27 16:15:39 -0700243 activate(appId, true);
244 }
245
246 private void activate(ApplicationId appId, boolean updateTime) {
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800247 Application app = apps.get(appId);
248 if (app != null) {
Thomas Vachuska161baf52015-03-27 16:15:39 -0700249 if (updateTime) {
250 updateTime(appId.name());
251 }
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800252 states.put(app, ACTIVATED);
253 }
254 }
255
256 @Override
257 public void deactivate(ApplicationId appId) {
258 Application app = apps.get(appId);
259 if (app != null) {
Thomas Vachuska161baf52015-03-27 16:15:39 -0700260 updateTime(appId.name());
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800261 states.put(app, DEACTIVATED);
262 }
263 }
264
265 @Override
266 public Set<Permission> getPermissions(ApplicationId appId) {
267 Application app = apps.get(appId);
268 return app != null ? permissions.get(app) : null;
269 }
270
271 @Override
272 public void setPermissions(ApplicationId appId, Set<Permission> permissions) {
273 Application app = getApplication(appId);
274 if (app != null) {
275 this.permissions.put(app, permissions);
276 delegate.notify(new ApplicationEvent(APP_PERMISSIONS_CHANGED, app));
277 }
278 }
279
280 /**
281 * Listener to application state distributed map changes.
282 */
283 private final class InternalAppStatesListener
284 implements EventuallyConsistentMapListener<Application, InternalState> {
285 @Override
286 public void event(EventuallyConsistentMapEvent<Application, InternalState> event) {
Thomas Vachuska4fcdae72015-03-24 10:22:54 -0700287 // If we do not have a delegate, refuse to process any events entirely.
288 // This is to allow the anti-entropy to kick in and process the events
289 // perhaps a bit later, but with opportunity to notify delegate.
290 if (delegate == null) {
291 return;
292 }
293
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800294 Application app = event.key();
295 InternalState state = event.value();
296
297 if (event.type() == PUT) {
298 if (state == INSTALLED) {
299 fetchBitsIfNeeded(app);
300 delegate.notify(new ApplicationEvent(APP_INSTALLED, app));
301
302 } else if (state == ACTIVATED) {
303 installAppIfNeeded(app);
304 setActive(app.id().name());
305 delegate.notify(new ApplicationEvent(APP_ACTIVATED, app));
306
307 } else if (state == DEACTIVATED) {
308 clearActive(app.id().name());
309 delegate.notify(new ApplicationEvent(APP_DEACTIVATED, app));
310 }
311 } else if (event.type() == REMOVE) {
312 delegate.notify(new ApplicationEvent(APP_UNINSTALLED, app));
313 purgeApplication(app.id().name());
314 }
315 }
316 }
317
318 /**
319 * Determines if the application bits are available locally.
320 */
321 private boolean appBitsAvailable(Application app) {
322 try {
323 ApplicationDescription appDesc = getApplicationDescription(app.id().name());
324 return appDesc.version().equals(app.version());
325 } catch (ApplicationException e) {
326 return false;
327 }
328 }
329
330 /**
331 * Fetches the bits from the cluster peers if necessary.
332 */
333 private void fetchBitsIfNeeded(Application app) {
334 if (!appBitsAvailable(app)) {
335 fetchBits(app);
336 }
337 }
338
339 /**
340 * Installs the application if necessary from the application peers.
341 */
342 private void installAppIfNeeded(Application app) {
343 if (!appBitsAvailable(app)) {
344 fetchBits(app);
345 delegate.notify(new ApplicationEvent(APP_INSTALLED, app));
346 }
347 }
348
349 /**
350 * Fetches the bits from the cluster peers.
351 */
352 private void fetchBits(Application app) {
353 ControllerNode localNode = clusterService.getLocalNode();
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800354 CountDownLatch latch = new CountDownLatch(1);
355
356 // FIXME: send message with name & version to make sure we don't get served old bits
357
358 log.info("Downloading bits for application {}", app.id().name());
359 for (ControllerNode node : clusterService.getNodes()) {
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700360 if (latch.getCount() == 0) {
361 break;
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800362 }
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700363 if (node.equals(localNode)) {
364 continue;
365 }
366 clusterCommunicator.sendAndReceive(app.id().name(),
367 APP_BITS_REQUEST,
368 s -> s.getBytes(Charsets.UTF_8),
369 Function.identity(),
370 node.id())
371 .whenCompleteAsync((bits, error) -> {
372 if (error == null && latch.getCount() > 0) {
373 saveApplication(new ByteArrayInputStream(bits));
374 log.info("Downloaded bits for application {} from node {}",
375 app.id().name(), node.id());
376 latch.countDown();
377 } else if (error != null) {
378 log.warn("Unable to fetch bits for application {} from node {}",
379 app.id().name(), node.id(), error);
380 }
381 }, executor);
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800382 }
383
384 try {
385 if (!latch.await(FETCH_TIMEOUT_MS, MILLISECONDS)) {
386 log.warn("Unable to fetch bits for application {}", app.id().name());
387 }
388 } catch (InterruptedException e) {
389 log.warn("Interrupted while fetching bits for application {}", app.id().name());
390 }
391 }
392
393 /**
394 * Responder to requests for application bits.
395 */
396 private class InternalBitServer implements ClusterMessageHandler {
397 @Override
398 public void handle(ClusterMessage message) {
Yuta HIGUCHI80292052015-02-10 23:11:59 -0800399 String name = new String(message.payload(), Charsets.UTF_8);
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800400 try {
401 message.respond(toByteArray(getApplicationInputStream(name)));
402 } catch (Exception e) {
403 log.debug("Unable to read bits for application {}", name);
404 }
405 }
406 }
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800407 /**
408 * Prunes applications which are not in the map, but are on disk.
409 */
410 private void pruneUninstalledApps() {
411 for (String name : getApplicationNames()) {
412 if (getApplication(getId(name)) == null) {
413 Application app = registerApp(getApplicationDescription(name));
414 delegate.notify(new ApplicationEvent(APP_UNINSTALLED, app));
415 purgeApplication(app.id().name());
416 }
417 }
418 }
419
420 /**
421 * Produces a registered application from the supplied description.
422 */
423 private Application registerApp(ApplicationDescription appDesc) {
424 ApplicationId appId = idStore.registerApplication(appDesc.name());
425 return new DefaultApplication(appId, appDesc.version(), appDesc.description(),
426 appDesc.origin(), appDesc.permissions(),
427 appDesc.featuresRepo(), appDesc.features());
428 }
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800429}