blob: 1279c0e05b909786c9f1a7ed6825bfeb975140b6 [file] [log] [blame]
Thomas Vachuska90b453f2015-01-30 18:57:14 -08001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.app;
17
Yuta HIGUCHI80292052015-02-10 23:11:59 -080018import com.google.common.base.Charsets;
Thomas Vachuska90b453f2015-01-30 18:57:14 -080019import com.google.common.collect.ImmutableSet;
20import com.google.common.util.concurrent.ListenableFuture;
21import org.apache.felix.scr.annotations.Activate;
22import org.apache.felix.scr.annotations.Component;
23import org.apache.felix.scr.annotations.Deactivate;
24import org.apache.felix.scr.annotations.Reference;
25import org.apache.felix.scr.annotations.ReferenceCardinality;
26import org.apache.felix.scr.annotations.Service;
27import org.onlab.util.KryoNamespace;
28import org.onosproject.app.ApplicationDescription;
29import org.onosproject.app.ApplicationEvent;
30import org.onosproject.app.ApplicationException;
31import org.onosproject.app.ApplicationState;
32import org.onosproject.app.ApplicationStore;
Thomas Vachuskacf960112015-03-06 22:36:51 -080033import org.onosproject.app.ApplicationStoreDelegate;
Thomas Vachuska90b453f2015-01-30 18:57:14 -080034import org.onosproject.cluster.ClusterService;
35import org.onosproject.cluster.ControllerNode;
36import org.onosproject.common.app.ApplicationArchive;
37import org.onosproject.core.Application;
38import org.onosproject.core.ApplicationId;
39import org.onosproject.core.ApplicationIdStore;
40import org.onosproject.core.DefaultApplication;
41import org.onosproject.core.Permission;
42import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
43import org.onosproject.store.cluster.messaging.ClusterMessage;
44import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
45import org.onosproject.store.cluster.messaging.MessageSubject;
Thomas Vachuskacf960112015-03-06 22:36:51 -080046import org.onosproject.store.impl.MultiValuedTimestamp;
Thomas Vachuska90b453f2015-01-30 18:57:14 -080047import org.onosproject.store.impl.WallclockClockManager;
48import org.onosproject.store.serializers.KryoNamespaces;
Jonathan Hart6ec029a2015-03-24 17:12:35 -070049import org.onosproject.store.service.ClockService;
50import org.onosproject.store.service.EventuallyConsistentMap;
51import org.onosproject.store.service.EventuallyConsistentMapEvent;
52import org.onosproject.store.service.EventuallyConsistentMapListener;
53import org.onosproject.store.service.StorageService;
Thomas Vachuska90b453f2015-01-30 18:57:14 -080054import org.slf4j.Logger;
55
56import java.io.ByteArrayInputStream;
57import java.io.IOException;
58import java.io.InputStream;
Thomas Vachuska90b453f2015-01-30 18:57:14 -080059import java.util.Set;
60import java.util.concurrent.CountDownLatch;
Madan Jampani2af244a2015-02-22 13:12:01 -080061import java.util.concurrent.ExecutorService;
Thomas Vachuska90b453f2015-01-30 18:57:14 -080062import java.util.concurrent.Executors;
63import java.util.concurrent.ScheduledExecutorService;
Thomas Vachuskacf960112015-03-06 22:36:51 -080064import java.util.concurrent.atomic.AtomicLong;
Thomas Vachuska90b453f2015-01-30 18:57:14 -080065
66import static com.google.common.io.ByteStreams.toByteArray;
67import static java.util.concurrent.TimeUnit.MILLISECONDS;
Thomas Vachuska6f94ded2015-02-21 14:02:38 -080068import static org.onlab.util.Tools.groupedThreads;
Jonathan Hart6ec029a2015-03-24 17:12:35 -070069import static org.onosproject.app.ApplicationEvent.Type.APP_ACTIVATED;
70import static org.onosproject.app.ApplicationEvent.Type.APP_DEACTIVATED;
71import static org.onosproject.app.ApplicationEvent.Type.APP_INSTALLED;
72import static org.onosproject.app.ApplicationEvent.Type.APP_PERMISSIONS_CHANGED;
73import static org.onosproject.app.ApplicationEvent.Type.APP_UNINSTALLED;
74import static org.onosproject.store.app.GossipApplicationStore.InternalState.ACTIVATED;
75import static org.onosproject.store.app.GossipApplicationStore.InternalState.DEACTIVATED;
76import static org.onosproject.store.app.GossipApplicationStore.InternalState.INSTALLED;
77import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.PUT;
78import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.REMOVE;
Thomas Vachuska90b453f2015-01-30 18:57:14 -080079import static org.slf4j.LoggerFactory.getLogger;
80
81/**
82 * Manages inventory of applications in a distributed data store that uses
83 * optimistic replication and gossip based anti-entropy techniques.
84 */
85@Component(immediate = true)
86@Service
87public class GossipApplicationStore extends ApplicationArchive
88 implements ApplicationStore {
89
90 private final Logger log = getLogger(getClass());
91
92 private static final MessageSubject APP_BITS_REQUEST = new MessageSubject("app-bits-request");
93
94 private static final int FETCH_TIMEOUT_MS = 10_000;
95 private static final int LOAD_TIMEOUT_MS = 5_000;
96
97 public enum InternalState {
98 INSTALLED, ACTIVATED, DEACTIVATED
99 }
100
Madan Jampani6b5b7172015-02-23 13:02:26 -0800101 private ScheduledExecutorService executor;
Madan Jampani2af244a2015-02-22 13:12:01 -0800102 private ExecutorService messageHandlingExecutor;
103
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800104 private EventuallyConsistentMap<ApplicationId, Application> apps;
105 private EventuallyConsistentMap<Application, InternalState> states;
106 private EventuallyConsistentMap<Application, Set<Permission>> permissions;
107
108 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
109 protected ClusterCommunicationService clusterCommunicator;
110
111 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
112 protected ClusterService clusterService;
113
114 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700115 protected StorageService storageService;
116
117 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800118 protected ApplicationIdStore idStore;
119
Thomas Vachuskacf960112015-03-06 22:36:51 -0800120 private final AtomicLong sequence = new AtomicLong();
121
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800122 @Activate
123 public void activate() {
Thomas Vachuskacf960112015-03-06 22:36:51 -0800124 KryoNamespace.Builder serializer = KryoNamespace.newBuilder()
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800125 .register(KryoNamespaces.API)
Thomas Vachuskacf960112015-03-06 22:36:51 -0800126 .register(MultiValuedTimestamp.class)
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800127 .register(InternalState.class);
128
Madan Jampani6b5b7172015-02-23 13:02:26 -0800129 executor = Executors.newSingleThreadScheduledExecutor(groupedThreads("onos/app", "store"));
130
Madan Jampani2af244a2015-02-22 13:12:01 -0800131 messageHandlingExecutor = Executors.newSingleThreadExecutor(
132 groupedThreads("onos/store/app", "message-handler"));
133
134 clusterCommunicator.addSubscriber(APP_BITS_REQUEST, new InternalBitServer(), messageHandlingExecutor);
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800135
Thomas Vachuskacf960112015-03-06 22:36:51 -0800136 // FIXME: Consider consolidating into a single map.
137
138 ClockService<ApplicationId, Application> appsClockService = (appId, app) ->
139 new MultiValuedTimestamp<>(getUpdateTime(appId.name()),
140 sequence.incrementAndGet());
141
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700142 apps = storageService.<ApplicationId, Application>eventuallyConsistentMapBuilder()
143 .withName("apps")
144 .withSerializer(serializer)
145 .withClockService(appsClockService)
146 .build();
Thomas Vachuskacf960112015-03-06 22:36:51 -0800147
148 ClockService<Application, InternalState> statesClockService = (app, state) ->
149 new MultiValuedTimestamp<>(getUpdateTime(app.id().name()),
150 sequence.incrementAndGet());
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800151
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700152 states = storageService.<Application, InternalState>eventuallyConsistentMapBuilder()
153 .withName("app-states")
154 .withSerializer(serializer)
155 .withClockService(statesClockService)
156 .build();
157
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800158 states.addListener(new InternalAppStatesListener());
159
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700160 permissions = storageService.<Application, Set<Permission>>eventuallyConsistentMapBuilder()
161 .withName("app-permissions")
162 .withSerializer(serializer)
163 .withClockService(new WallclockClockManager<>())
164 .build();
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800165
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800166 log.info("Started");
167 }
168
Thomas Vachuskacf960112015-03-06 22:36:51 -0800169 /**
170 * Loads the application inventory from the disk and activates apps if
171 * they are marked to be active.
172 */
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800173 private void loadFromDisk() {
174 for (String name : getApplicationNames()) {
Thomas Vachuskacf960112015-03-06 22:36:51 -0800175 Application app = create(getApplicationDescription(name));
176 if (app != null && isActive(app.id().name())) {
177 activate(app.id());
178 // load app permissions
179 }
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800180 }
181 }
182
183 @Deactivate
184 public void deactivate() {
Madan Jampani2af244a2015-02-22 13:12:01 -0800185 clusterCommunicator.removeSubscriber(APP_BITS_REQUEST);
186 messageHandlingExecutor.shutdown();
Madan Jampani6b5b7172015-02-23 13:02:26 -0800187 executor.shutdown();
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800188 apps.destroy();
189 states.destroy();
190 permissions.destroy();
191 log.info("Stopped");
192 }
193
194 @Override
Thomas Vachuskacf960112015-03-06 22:36:51 -0800195 public void setDelegate(ApplicationStoreDelegate delegate) {
196 super.setDelegate(delegate);
197 loadFromDisk();
198// executor.schedule(this::pruneUninstalledApps, LOAD_TIMEOUT_MS, MILLISECONDS);
199 }
200
201 @Override
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800202 public Set<Application> getApplications() {
203 return ImmutableSet.copyOf(apps.values());
204 }
205
206 @Override
207 public ApplicationId getId(String name) {
208 return idStore.getAppId(name);
209 }
210
211 @Override
212 public Application getApplication(ApplicationId appId) {
213 return apps.get(appId);
214 }
215
216 @Override
217 public ApplicationState getState(ApplicationId appId) {
218 Application app = apps.get(appId);
219 InternalState s = app == null ? null : states.get(app);
220 return s == null ? null : s == ACTIVATED ?
221 ApplicationState.ACTIVE : ApplicationState.INSTALLED;
222 }
223
224 @Override
225 public Application create(InputStream appDescStream) {
226 ApplicationDescription appDesc = saveApplication(appDescStream);
227 return create(appDesc);
228 }
229
230 private Application create(ApplicationDescription appDesc) {
231 Application app = registerApp(appDesc);
232 apps.put(app.id(), app);
233 states.put(app, INSTALLED);
234 return app;
235 }
236
237 @Override
238 public void remove(ApplicationId appId) {
239 Application app = apps.get(appId);
240 if (app != null) {
241 apps.remove(appId);
242 states.remove(app);
243 permissions.remove(app);
244 }
245 }
246
247 @Override
248 public void activate(ApplicationId appId) {
249 Application app = apps.get(appId);
250 if (app != null) {
251 states.put(app, ACTIVATED);
252 }
253 }
254
255 @Override
256 public void deactivate(ApplicationId appId) {
257 Application app = apps.get(appId);
258 if (app != null) {
259 states.put(app, DEACTIVATED);
260 }
261 }
262
263 @Override
264 public Set<Permission> getPermissions(ApplicationId appId) {
265 Application app = apps.get(appId);
266 return app != null ? permissions.get(app) : null;
267 }
268
269 @Override
270 public void setPermissions(ApplicationId appId, Set<Permission> permissions) {
271 Application app = getApplication(appId);
272 if (app != null) {
273 this.permissions.put(app, permissions);
274 delegate.notify(new ApplicationEvent(APP_PERMISSIONS_CHANGED, app));
275 }
276 }
277
278 /**
279 * Listener to application state distributed map changes.
280 */
281 private final class InternalAppStatesListener
282 implements EventuallyConsistentMapListener<Application, InternalState> {
283 @Override
284 public void event(EventuallyConsistentMapEvent<Application, InternalState> event) {
Thomas Vachuska4fcdae72015-03-24 10:22:54 -0700285 // If we do not have a delegate, refuse to process any events entirely.
286 // This is to allow the anti-entropy to kick in and process the events
287 // perhaps a bit later, but with opportunity to notify delegate.
288 if (delegate == null) {
289 return;
290 }
291
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800292 Application app = event.key();
293 InternalState state = event.value();
294
295 if (event.type() == PUT) {
296 if (state == INSTALLED) {
297 fetchBitsIfNeeded(app);
298 delegate.notify(new ApplicationEvent(APP_INSTALLED, app));
299
300 } else if (state == ACTIVATED) {
301 installAppIfNeeded(app);
302 setActive(app.id().name());
303 delegate.notify(new ApplicationEvent(APP_ACTIVATED, app));
304
305 } else if (state == DEACTIVATED) {
306 clearActive(app.id().name());
307 delegate.notify(new ApplicationEvent(APP_DEACTIVATED, app));
308 }
309 } else if (event.type() == REMOVE) {
310 delegate.notify(new ApplicationEvent(APP_UNINSTALLED, app));
311 purgeApplication(app.id().name());
312 }
313 }
314 }
315
316 /**
317 * Determines if the application bits are available locally.
318 */
319 private boolean appBitsAvailable(Application app) {
320 try {
321 ApplicationDescription appDesc = getApplicationDescription(app.id().name());
322 return appDesc.version().equals(app.version());
323 } catch (ApplicationException e) {
324 return false;
325 }
326 }
327
328 /**
329 * Fetches the bits from the cluster peers if necessary.
330 */
331 private void fetchBitsIfNeeded(Application app) {
332 if (!appBitsAvailable(app)) {
333 fetchBits(app);
334 }
335 }
336
337 /**
338 * Installs the application if necessary from the application peers.
339 */
340 private void installAppIfNeeded(Application app) {
341 if (!appBitsAvailable(app)) {
342 fetchBits(app);
343 delegate.notify(new ApplicationEvent(APP_INSTALLED, app));
344 }
345 }
346
347 /**
348 * Fetches the bits from the cluster peers.
349 */
350 private void fetchBits(Application app) {
351 ControllerNode localNode = clusterService.getLocalNode();
352 ClusterMessage message = new ClusterMessage(localNode.id(), APP_BITS_REQUEST,
Yuta HIGUCHI80292052015-02-10 23:11:59 -0800353 app.id().name().getBytes(Charsets.UTF_8));
354 //Map<ControllerNode, ListenableFuture<byte[]>> futures = new HashMap<>();
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800355 CountDownLatch latch = new CountDownLatch(1);
356
357 // FIXME: send message with name & version to make sure we don't get served old bits
358
359 log.info("Downloading bits for application {}", app.id().name());
360 for (ControllerNode node : clusterService.getNodes()) {
361 try {
362 ListenableFuture<byte[]> future = clusterCommunicator.sendAndReceive(message, node.id());
363 future.addListener(new InternalBitListener(app, node, future, latch), executor);
364 } catch (IOException e) {
365 log.debug("Unable to request bits for application {} from node {}",
366 app.id().name(), node.id());
367 }
368 }
369
370 try {
371 if (!latch.await(FETCH_TIMEOUT_MS, MILLISECONDS)) {
372 log.warn("Unable to fetch bits for application {}", app.id().name());
373 }
374 } catch (InterruptedException e) {
375 log.warn("Interrupted while fetching bits for application {}", app.id().name());
376 }
377 }
378
379 /**
380 * Responder to requests for application bits.
381 */
382 private class InternalBitServer implements ClusterMessageHandler {
383 @Override
384 public void handle(ClusterMessage message) {
Yuta HIGUCHI80292052015-02-10 23:11:59 -0800385 String name = new String(message.payload(), Charsets.UTF_8);
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800386 try {
387 message.respond(toByteArray(getApplicationInputStream(name)));
388 } catch (Exception e) {
389 log.debug("Unable to read bits for application {}", name);
390 }
391 }
392 }
393
394 /**
395 * Processes completed fetch requests.
396 */
397 private class InternalBitListener implements Runnable {
398 private final Application app;
399 private final ControllerNode node;
400 private final ListenableFuture<byte[]> future;
401 private final CountDownLatch latch;
402
403 public InternalBitListener(Application app, ControllerNode node,
404 ListenableFuture<byte[]> future, CountDownLatch latch) {
405 this.app = app;
406 this.node = node;
407 this.future = future;
408 this.latch = latch;
409 }
410
411 @Override
412 public void run() {
413 if (latch.getCount() > 0 && !future.isCancelled()) {
414 try {
415 byte[] bits = future.get(1, MILLISECONDS);
416 saveApplication(new ByteArrayInputStream(bits));
417 log.info("Downloaded bits for application {} from node {}",
418 app.id().name(), node.id());
419 latch.countDown();
420 } catch (Exception e) {
421 log.warn("Unable to fetch bits for application {} from node {}",
422 app.id().name(), node.id());
423 }
424 }
425 }
426 }
427
428 /**
429 * Prunes applications which are not in the map, but are on disk.
430 */
431 private void pruneUninstalledApps() {
432 for (String name : getApplicationNames()) {
433 if (getApplication(getId(name)) == null) {
434 Application app = registerApp(getApplicationDescription(name));
435 delegate.notify(new ApplicationEvent(APP_UNINSTALLED, app));
436 purgeApplication(app.id().name());
437 }
438 }
439 }
440
441 /**
442 * Produces a registered application from the supplied description.
443 */
444 private Application registerApp(ApplicationDescription appDesc) {
445 ApplicationId appId = idStore.registerApplication(appDesc.name());
446 return new DefaultApplication(appId, appDesc.version(), appDesc.description(),
447 appDesc.origin(), appDesc.permissions(),
448 appDesc.featuresRepo(), appDesc.features());
449 }
Thomas Vachuskacf960112015-03-06 22:36:51 -0800450
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800451}
452