blob: 5c1fc33239606a743b27f3f1035ba4c412cea4e2 [file] [log] [blame]
Thomas Vachuska90b453f2015-01-30 18:57:14 -08001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.app;
17
Yuta HIGUCHI80292052015-02-10 23:11:59 -080018import com.google.common.base.Charsets;
Thomas Vachuska90b453f2015-01-30 18:57:14 -080019import com.google.common.collect.ImmutableSet;
20import com.google.common.util.concurrent.ListenableFuture;
Madan Jampani3e033bd2015-04-08 13:03:49 -070021
Thomas Vachuska90b453f2015-01-30 18:57:14 -080022import org.apache.felix.scr.annotations.Activate;
23import org.apache.felix.scr.annotations.Component;
24import org.apache.felix.scr.annotations.Deactivate;
25import org.apache.felix.scr.annotations.Reference;
26import org.apache.felix.scr.annotations.ReferenceCardinality;
27import org.apache.felix.scr.annotations.Service;
28import org.onlab.util.KryoNamespace;
29import org.onosproject.app.ApplicationDescription;
30import org.onosproject.app.ApplicationEvent;
31import org.onosproject.app.ApplicationException;
32import org.onosproject.app.ApplicationState;
33import org.onosproject.app.ApplicationStore;
Thomas Vachuskacf960112015-03-06 22:36:51 -080034import org.onosproject.app.ApplicationStoreDelegate;
Thomas Vachuska90b453f2015-01-30 18:57:14 -080035import org.onosproject.cluster.ClusterService;
36import org.onosproject.cluster.ControllerNode;
37import org.onosproject.common.app.ApplicationArchive;
38import org.onosproject.core.Application;
39import org.onosproject.core.ApplicationId;
40import org.onosproject.core.ApplicationIdStore;
41import org.onosproject.core.DefaultApplication;
42import org.onosproject.core.Permission;
43import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
44import org.onosproject.store.cluster.messaging.ClusterMessage;
45import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
46import org.onosproject.store.cluster.messaging.MessageSubject;
Thomas Vachuskacf960112015-03-06 22:36:51 -080047import org.onosproject.store.impl.MultiValuedTimestamp;
Thomas Vachuska90b453f2015-01-30 18:57:14 -080048import org.onosproject.store.serializers.KryoNamespaces;
Jonathan Hart6ec029a2015-03-24 17:12:35 -070049import org.onosproject.store.service.EventuallyConsistentMap;
50import org.onosproject.store.service.EventuallyConsistentMapEvent;
51import org.onosproject.store.service.EventuallyConsistentMapListener;
Madan Jampani3e033bd2015-04-08 13:03:49 -070052import org.onosproject.store.service.LogicalClockService;
Jonathan Hart6ec029a2015-03-24 17:12:35 -070053import org.onosproject.store.service.StorageService;
Thomas Vachuska90b453f2015-01-30 18:57:14 -080054import org.slf4j.Logger;
55
56import java.io.ByteArrayInputStream;
57import java.io.IOException;
58import java.io.InputStream;
Thomas Vachuska90b453f2015-01-30 18:57:14 -080059import java.util.Set;
60import java.util.concurrent.CountDownLatch;
Madan Jampani2af244a2015-02-22 13:12:01 -080061import java.util.concurrent.ExecutorService;
Thomas Vachuska90b453f2015-01-30 18:57:14 -080062import java.util.concurrent.Executors;
63import java.util.concurrent.ScheduledExecutorService;
Thomas Vachuska90b453f2015-01-30 18:57:14 -080064import static com.google.common.io.ByteStreams.toByteArray;
65import static java.util.concurrent.TimeUnit.MILLISECONDS;
Thomas Vachuska6f94ded2015-02-21 14:02:38 -080066import static org.onlab.util.Tools.groupedThreads;
Jonathan Hart6ec029a2015-03-24 17:12:35 -070067import static org.onosproject.app.ApplicationEvent.Type.APP_ACTIVATED;
68import static org.onosproject.app.ApplicationEvent.Type.APP_DEACTIVATED;
69import static org.onosproject.app.ApplicationEvent.Type.APP_INSTALLED;
70import static org.onosproject.app.ApplicationEvent.Type.APP_PERMISSIONS_CHANGED;
71import static org.onosproject.app.ApplicationEvent.Type.APP_UNINSTALLED;
72import static org.onosproject.store.app.GossipApplicationStore.InternalState.ACTIVATED;
73import static org.onosproject.store.app.GossipApplicationStore.InternalState.DEACTIVATED;
74import static org.onosproject.store.app.GossipApplicationStore.InternalState.INSTALLED;
75import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.PUT;
76import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.REMOVE;
Thomas Vachuska90b453f2015-01-30 18:57:14 -080077import static org.slf4j.LoggerFactory.getLogger;
78
79/**
80 * Manages inventory of applications in a distributed data store that uses
81 * optimistic replication and gossip based anti-entropy techniques.
82 */
83@Component(immediate = true)
84@Service
85public class GossipApplicationStore extends ApplicationArchive
86 implements ApplicationStore {
87
88 private final Logger log = getLogger(getClass());
89
90 private static final MessageSubject APP_BITS_REQUEST = new MessageSubject("app-bits-request");
91
92 private static final int FETCH_TIMEOUT_MS = 10_000;
93 private static final int LOAD_TIMEOUT_MS = 5_000;
94
95 public enum InternalState {
96 INSTALLED, ACTIVATED, DEACTIVATED
97 }
98
Madan Jampani6b5b7172015-02-23 13:02:26 -080099 private ScheduledExecutorService executor;
Madan Jampani2af244a2015-02-22 13:12:01 -0800100 private ExecutorService messageHandlingExecutor;
101
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800102 private EventuallyConsistentMap<ApplicationId, Application> apps;
103 private EventuallyConsistentMap<Application, InternalState> states;
104 private EventuallyConsistentMap<Application, Set<Permission>> permissions;
105
106 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
107 protected ClusterCommunicationService clusterCommunicator;
108
109 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
110 protected ClusterService clusterService;
111
112 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700113 protected StorageService storageService;
114
115 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Madan Jampani3e033bd2015-04-08 13:03:49 -0700116 protected LogicalClockService clockService;
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800117
Madan Jampani3e033bd2015-04-08 13:03:49 -0700118 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
119 protected ApplicationIdStore idStore;
Thomas Vachuskacf960112015-03-06 22:36:51 -0800120
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800121 @Activate
122 public void activate() {
Thomas Vachuskacf960112015-03-06 22:36:51 -0800123 KryoNamespace.Builder serializer = KryoNamespace.newBuilder()
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800124 .register(KryoNamespaces.API)
Thomas Vachuskacf960112015-03-06 22:36:51 -0800125 .register(MultiValuedTimestamp.class)
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800126 .register(InternalState.class);
127
Madan Jampani6b5b7172015-02-23 13:02:26 -0800128 executor = Executors.newSingleThreadScheduledExecutor(groupedThreads("onos/app", "store"));
129
Madan Jampani2af244a2015-02-22 13:12:01 -0800130 messageHandlingExecutor = Executors.newSingleThreadExecutor(
131 groupedThreads("onos/store/app", "message-handler"));
132
133 clusterCommunicator.addSubscriber(APP_BITS_REQUEST, new InternalBitServer(), messageHandlingExecutor);
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800134
Thomas Vachuskacf960112015-03-06 22:36:51 -0800135 // FIXME: Consider consolidating into a single map.
136
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700137 apps = storageService.<ApplicationId, Application>eventuallyConsistentMapBuilder()
138 .withName("apps")
139 .withSerializer(serializer)
Madan Jampani3e033bd2015-04-08 13:03:49 -0700140 .withClockService((k, v) -> clockService.getTimestamp())
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700141 .build();
Thomas Vachuskacf960112015-03-06 22:36:51 -0800142
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700143 states = storageService.<Application, InternalState>eventuallyConsistentMapBuilder()
144 .withName("app-states")
145 .withSerializer(serializer)
Madan Jampani3e033bd2015-04-08 13:03:49 -0700146 .withClockService((k, v) -> clockService.getTimestamp())
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700147 .build();
148
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800149 states.addListener(new InternalAppStatesListener());
150
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700151 permissions = storageService.<Application, Set<Permission>>eventuallyConsistentMapBuilder()
152 .withName("app-permissions")
153 .withSerializer(serializer)
Madan Jampani3e033bd2015-04-08 13:03:49 -0700154 .withClockService((k, v) -> clockService.getTimestamp())
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700155 .build();
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800156
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800157 log.info("Started");
158 }
159
Thomas Vachuskacf960112015-03-06 22:36:51 -0800160 /**
161 * Loads the application inventory from the disk and activates apps if
162 * they are marked to be active.
163 */
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800164 private void loadFromDisk() {
165 for (String name : getApplicationNames()) {
Thomas Vachuska161baf52015-03-27 16:15:39 -0700166 Application app = create(getApplicationDescription(name), false);
Thomas Vachuskacf960112015-03-06 22:36:51 -0800167 if (app != null && isActive(app.id().name())) {
Thomas Vachuska161baf52015-03-27 16:15:39 -0700168 activate(app.id(), false);
Thomas Vachuskacf960112015-03-06 22:36:51 -0800169 // load app permissions
170 }
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800171 }
172 }
173
174 @Deactivate
175 public void deactivate() {
Madan Jampani2af244a2015-02-22 13:12:01 -0800176 clusterCommunicator.removeSubscriber(APP_BITS_REQUEST);
177 messageHandlingExecutor.shutdown();
Madan Jampani6b5b7172015-02-23 13:02:26 -0800178 executor.shutdown();
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800179 apps.destroy();
180 states.destroy();
181 permissions.destroy();
182 log.info("Stopped");
183 }
184
185 @Override
Thomas Vachuskacf960112015-03-06 22:36:51 -0800186 public void setDelegate(ApplicationStoreDelegate delegate) {
187 super.setDelegate(delegate);
188 loadFromDisk();
189// executor.schedule(this::pruneUninstalledApps, LOAD_TIMEOUT_MS, MILLISECONDS);
190 }
191
192 @Override
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800193 public Set<Application> getApplications() {
194 return ImmutableSet.copyOf(apps.values());
195 }
196
197 @Override
198 public ApplicationId getId(String name) {
199 return idStore.getAppId(name);
200 }
201
202 @Override
203 public Application getApplication(ApplicationId appId) {
204 return apps.get(appId);
205 }
206
207 @Override
208 public ApplicationState getState(ApplicationId appId) {
209 Application app = apps.get(appId);
210 InternalState s = app == null ? null : states.get(app);
211 return s == null ? null : s == ACTIVATED ?
212 ApplicationState.ACTIVE : ApplicationState.INSTALLED;
213 }
214
215 @Override
216 public Application create(InputStream appDescStream) {
217 ApplicationDescription appDesc = saveApplication(appDescStream);
Thomas Vachuska161baf52015-03-27 16:15:39 -0700218 return create(appDesc, true);
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800219 }
220
Thomas Vachuska161baf52015-03-27 16:15:39 -0700221 private Application create(ApplicationDescription appDesc, boolean updateTime) {
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800222 Application app = registerApp(appDesc);
Thomas Vachuska161baf52015-03-27 16:15:39 -0700223 if (updateTime) {
224 updateTime(app.id().name());
225 }
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800226 apps.put(app.id(), app);
227 states.put(app, INSTALLED);
228 return app;
229 }
230
231 @Override
232 public void remove(ApplicationId appId) {
233 Application app = apps.get(appId);
234 if (app != null) {
235 apps.remove(appId);
236 states.remove(app);
237 permissions.remove(app);
238 }
239 }
240
241 @Override
242 public void activate(ApplicationId appId) {
Thomas Vachuska161baf52015-03-27 16:15:39 -0700243 activate(appId, true);
244 }
245
246 private void activate(ApplicationId appId, boolean updateTime) {
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800247 Application app = apps.get(appId);
248 if (app != null) {
Thomas Vachuska161baf52015-03-27 16:15:39 -0700249 if (updateTime) {
250 updateTime(appId.name());
251 }
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800252 states.put(app, ACTIVATED);
253 }
254 }
255
256 @Override
257 public void deactivate(ApplicationId appId) {
258 Application app = apps.get(appId);
259 if (app != null) {
Thomas Vachuska161baf52015-03-27 16:15:39 -0700260 updateTime(appId.name());
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800261 states.put(app, DEACTIVATED);
262 }
263 }
264
265 @Override
266 public Set<Permission> getPermissions(ApplicationId appId) {
267 Application app = apps.get(appId);
268 return app != null ? permissions.get(app) : null;
269 }
270
271 @Override
272 public void setPermissions(ApplicationId appId, Set<Permission> permissions) {
273 Application app = getApplication(appId);
274 if (app != null) {
275 this.permissions.put(app, permissions);
276 delegate.notify(new ApplicationEvent(APP_PERMISSIONS_CHANGED, app));
277 }
278 }
279
280 /**
281 * Listener to application state distributed map changes.
282 */
283 private final class InternalAppStatesListener
284 implements EventuallyConsistentMapListener<Application, InternalState> {
285 @Override
286 public void event(EventuallyConsistentMapEvent<Application, InternalState> event) {
Thomas Vachuska4fcdae72015-03-24 10:22:54 -0700287 // If we do not have a delegate, refuse to process any events entirely.
288 // This is to allow the anti-entropy to kick in and process the events
289 // perhaps a bit later, but with opportunity to notify delegate.
290 if (delegate == null) {
291 return;
292 }
293
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800294 Application app = event.key();
295 InternalState state = event.value();
296
297 if (event.type() == PUT) {
298 if (state == INSTALLED) {
299 fetchBitsIfNeeded(app);
300 delegate.notify(new ApplicationEvent(APP_INSTALLED, app));
301
302 } else if (state == ACTIVATED) {
303 installAppIfNeeded(app);
304 setActive(app.id().name());
305 delegate.notify(new ApplicationEvent(APP_ACTIVATED, app));
306
307 } else if (state == DEACTIVATED) {
308 clearActive(app.id().name());
309 delegate.notify(new ApplicationEvent(APP_DEACTIVATED, app));
310 }
311 } else if (event.type() == REMOVE) {
312 delegate.notify(new ApplicationEvent(APP_UNINSTALLED, app));
313 purgeApplication(app.id().name());
314 }
315 }
316 }
317
318 /**
319 * Determines if the application bits are available locally.
320 */
321 private boolean appBitsAvailable(Application app) {
322 try {
323 ApplicationDescription appDesc = getApplicationDescription(app.id().name());
324 return appDesc.version().equals(app.version());
325 } catch (ApplicationException e) {
326 return false;
327 }
328 }
329
330 /**
331 * Fetches the bits from the cluster peers if necessary.
332 */
333 private void fetchBitsIfNeeded(Application app) {
334 if (!appBitsAvailable(app)) {
335 fetchBits(app);
336 }
337 }
338
339 /**
340 * Installs the application if necessary from the application peers.
341 */
342 private void installAppIfNeeded(Application app) {
343 if (!appBitsAvailable(app)) {
344 fetchBits(app);
345 delegate.notify(new ApplicationEvent(APP_INSTALLED, app));
346 }
347 }
348
349 /**
350 * Fetches the bits from the cluster peers.
351 */
352 private void fetchBits(Application app) {
353 ControllerNode localNode = clusterService.getLocalNode();
354 ClusterMessage message = new ClusterMessage(localNode.id(), APP_BITS_REQUEST,
Yuta HIGUCHI80292052015-02-10 23:11:59 -0800355 app.id().name().getBytes(Charsets.UTF_8));
356 //Map<ControllerNode, ListenableFuture<byte[]>> futures = new HashMap<>();
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800357 CountDownLatch latch = new CountDownLatch(1);
358
359 // FIXME: send message with name & version to make sure we don't get served old bits
360
361 log.info("Downloading bits for application {}", app.id().name());
362 for (ControllerNode node : clusterService.getNodes()) {
363 try {
364 ListenableFuture<byte[]> future = clusterCommunicator.sendAndReceive(message, node.id());
365 future.addListener(new InternalBitListener(app, node, future, latch), executor);
366 } catch (IOException e) {
367 log.debug("Unable to request bits for application {} from node {}",
368 app.id().name(), node.id());
369 }
370 }
371
372 try {
373 if (!latch.await(FETCH_TIMEOUT_MS, MILLISECONDS)) {
374 log.warn("Unable to fetch bits for application {}", app.id().name());
375 }
376 } catch (InterruptedException e) {
377 log.warn("Interrupted while fetching bits for application {}", app.id().name());
378 }
379 }
380
381 /**
382 * Responder to requests for application bits.
383 */
384 private class InternalBitServer implements ClusterMessageHandler {
385 @Override
386 public void handle(ClusterMessage message) {
Yuta HIGUCHI80292052015-02-10 23:11:59 -0800387 String name = new String(message.payload(), Charsets.UTF_8);
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800388 try {
389 message.respond(toByteArray(getApplicationInputStream(name)));
390 } catch (Exception e) {
391 log.debug("Unable to read bits for application {}", name);
392 }
393 }
394 }
395
396 /**
397 * Processes completed fetch requests.
398 */
399 private class InternalBitListener implements Runnable {
400 private final Application app;
401 private final ControllerNode node;
402 private final ListenableFuture<byte[]> future;
403 private final CountDownLatch latch;
404
405 public InternalBitListener(Application app, ControllerNode node,
406 ListenableFuture<byte[]> future, CountDownLatch latch) {
407 this.app = app;
408 this.node = node;
409 this.future = future;
410 this.latch = latch;
411 }
412
413 @Override
414 public void run() {
415 if (latch.getCount() > 0 && !future.isCancelled()) {
416 try {
417 byte[] bits = future.get(1, MILLISECONDS);
418 saveApplication(new ByteArrayInputStream(bits));
419 log.info("Downloaded bits for application {} from node {}",
420 app.id().name(), node.id());
421 latch.countDown();
422 } catch (Exception e) {
423 log.warn("Unable to fetch bits for application {} from node {}",
424 app.id().name(), node.id());
425 }
426 }
427 }
428 }
429
430 /**
431 * Prunes applications which are not in the map, but are on disk.
432 */
433 private void pruneUninstalledApps() {
434 for (String name : getApplicationNames()) {
435 if (getApplication(getId(name)) == null) {
436 Application app = registerApp(getApplicationDescription(name));
437 delegate.notify(new ApplicationEvent(APP_UNINSTALLED, app));
438 purgeApplication(app.id().name());
439 }
440 }
441 }
442
443 /**
444 * Produces a registered application from the supplied description.
445 */
446 private Application registerApp(ApplicationDescription appDesc) {
447 ApplicationId appId = idStore.registerApplication(appDesc.name());
448 return new DefaultApplication(appId, appDesc.version(), appDesc.description(),
449 appDesc.origin(), appDesc.permissions(),
450 appDesc.featuresRepo(), appDesc.features());
451 }
Thomas Vachuskacf960112015-03-06 22:36:51 -0800452
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800453}
454