blob: 0c902bf18fb835a6a386ed54ecdc83be25992db5 [file] [log] [blame]
Thomas Vachuska90b453f2015-01-30 18:57:14 -08001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.app;
17
Yuta HIGUCHI80292052015-02-10 23:11:59 -080018import com.google.common.base.Charsets;
Thomas Vachuska90b453f2015-01-30 18:57:14 -080019import com.google.common.collect.ImmutableSet;
Madan Jampani01e05fb2015-08-13 13:29:36 -070020
Thomas Vachuska90b453f2015-01-30 18:57:14 -080021import org.apache.felix.scr.annotations.Activate;
22import org.apache.felix.scr.annotations.Component;
23import org.apache.felix.scr.annotations.Deactivate;
24import org.apache.felix.scr.annotations.Reference;
25import org.apache.felix.scr.annotations.ReferenceCardinality;
26import org.apache.felix.scr.annotations.Service;
27import org.onlab.util.KryoNamespace;
28import org.onosproject.app.ApplicationDescription;
29import org.onosproject.app.ApplicationEvent;
30import org.onosproject.app.ApplicationException;
31import org.onosproject.app.ApplicationState;
32import org.onosproject.app.ApplicationStore;
Thomas Vachuskacf960112015-03-06 22:36:51 -080033import org.onosproject.app.ApplicationStoreDelegate;
Thomas Vachuska90b453f2015-01-30 18:57:14 -080034import org.onosproject.cluster.ClusterService;
35import org.onosproject.cluster.ControllerNode;
36import org.onosproject.common.app.ApplicationArchive;
37import org.onosproject.core.Application;
38import org.onosproject.core.ApplicationId;
39import org.onosproject.core.ApplicationIdStore;
40import org.onosproject.core.DefaultApplication;
41import org.onosproject.core.Permission;
42import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
Thomas Vachuska90b453f2015-01-30 18:57:14 -080043import org.onosproject.store.cluster.messaging.MessageSubject;
Thomas Vachuska90b453f2015-01-30 18:57:14 -080044import org.onosproject.store.serializers.KryoNamespaces;
Jonathan Hart6ec029a2015-03-24 17:12:35 -070045import org.onosproject.store.service.EventuallyConsistentMap;
46import org.onosproject.store.service.EventuallyConsistentMapEvent;
47import org.onosproject.store.service.EventuallyConsistentMapListener;
Madan Jampani3e033bd2015-04-08 13:03:49 -070048import org.onosproject.store.service.LogicalClockService;
Thomas Vachuskad0d58542015-06-03 12:38:44 -070049import org.onosproject.store.service.MultiValuedTimestamp;
Madan Jampani01e05fb2015-08-13 13:29:36 -070050import org.onosproject.store.service.StorageException;
Jonathan Hart6ec029a2015-03-24 17:12:35 -070051import org.onosproject.store.service.StorageService;
Thomas Vachuska90b453f2015-01-30 18:57:14 -080052import org.slf4j.Logger;
53
54import java.io.ByteArrayInputStream;
Madan Jampani01e05fb2015-08-13 13:29:36 -070055import java.io.IOException;
Thomas Vachuska90b453f2015-01-30 18:57:14 -080056import java.io.InputStream;
Thomas Vachuska90b453f2015-01-30 18:57:14 -080057import java.util.Set;
58import java.util.concurrent.CountDownLatch;
Madan Jampani2af244a2015-02-22 13:12:01 -080059import java.util.concurrent.ExecutorService;
Thomas Vachuska90b453f2015-01-30 18:57:14 -080060import java.util.concurrent.Executors;
61import java.util.concurrent.ScheduledExecutorService;
Madan Jampani2bfa94c2015-04-11 05:03:49 -070062import java.util.function.Function;
63
Thomas Vachuska90b453f2015-01-30 18:57:14 -080064import static com.google.common.io.ByteStreams.toByteArray;
65import static java.util.concurrent.TimeUnit.MILLISECONDS;
Thomas Vachuska6f94ded2015-02-21 14:02:38 -080066import static org.onlab.util.Tools.groupedThreads;
Thomas Vachuskaadba1522015-06-04 15:08:30 -070067import static org.onlab.util.Tools.randomDelay;
Thomas Vachuskad0d58542015-06-03 12:38:44 -070068import static org.onosproject.app.ApplicationEvent.Type.*;
69import static org.onosproject.store.app.GossipApplicationStore.InternalState.*;
Jonathan Hart6ec029a2015-03-24 17:12:35 -070070import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.PUT;
71import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.REMOVE;
Thomas Vachuska90b453f2015-01-30 18:57:14 -080072import static org.slf4j.LoggerFactory.getLogger;
73
74/**
75 * Manages inventory of applications in a distributed data store that uses
76 * optimistic replication and gossip based anti-entropy techniques.
77 */
78@Component(immediate = true)
79@Service
80public class GossipApplicationStore extends ApplicationArchive
81 implements ApplicationStore {
82
83 private final Logger log = getLogger(getClass());
84
85 private static final MessageSubject APP_BITS_REQUEST = new MessageSubject("app-bits-request");
86
Thomas Vachuskaadba1522015-06-04 15:08:30 -070087 private static final int MAX_LOAD_RETRIES = 5;
Thomas Vachuskad0d58542015-06-03 12:38:44 -070088 private static final int RETRY_DELAY_MS = 2_000;
89
Thomas Vachuska90b453f2015-01-30 18:57:14 -080090 private static final int FETCH_TIMEOUT_MS = 10_000;
Thomas Vachuska90b453f2015-01-30 18:57:14 -080091
92 public enum InternalState {
93 INSTALLED, ACTIVATED, DEACTIVATED
94 }
95
Madan Jampani6b5b7172015-02-23 13:02:26 -080096 private ScheduledExecutorService executor;
Madan Jampani2af244a2015-02-22 13:12:01 -080097 private ExecutorService messageHandlingExecutor;
98
Thomas Vachuska90b453f2015-01-30 18:57:14 -080099 private EventuallyConsistentMap<ApplicationId, Application> apps;
100 private EventuallyConsistentMap<Application, InternalState> states;
101 private EventuallyConsistentMap<Application, Set<Permission>> permissions;
102
103 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
104 protected ClusterCommunicationService clusterCommunicator;
105
106 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
107 protected ClusterService clusterService;
108
109 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700110 protected StorageService storageService;
111
112 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Madan Jampani3e033bd2015-04-08 13:03:49 -0700113 protected LogicalClockService clockService;
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800114
Madan Jampani3e033bd2015-04-08 13:03:49 -0700115 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
116 protected ApplicationIdStore idStore;
Thomas Vachuskacf960112015-03-06 22:36:51 -0800117
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800118 @Activate
119 public void activate() {
Thomas Vachuskacf960112015-03-06 22:36:51 -0800120 KryoNamespace.Builder serializer = KryoNamespace.newBuilder()
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800121 .register(KryoNamespaces.API)
Thomas Vachuskacf960112015-03-06 22:36:51 -0800122 .register(MultiValuedTimestamp.class)
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800123 .register(InternalState.class);
124
Madan Jampani6b5b7172015-02-23 13:02:26 -0800125 executor = Executors.newSingleThreadScheduledExecutor(groupedThreads("onos/app", "store"));
126
Madan Jampani2af244a2015-02-22 13:12:01 -0800127 messageHandlingExecutor = Executors.newSingleThreadExecutor(
128 groupedThreads("onos/store/app", "message-handler"));
129
Madan Jampani01e05fb2015-08-13 13:29:36 -0700130 clusterCommunicator.<String, byte[]>addSubscriber(APP_BITS_REQUEST,
131 bytes -> new String(bytes, Charsets.UTF_8),
132 name -> {
133 try {
134 return toByteArray(getApplicationInputStream(name));
135 } catch (IOException e) {
136 throw new StorageException(e);
137 }
138 },
139 Function.identity(),
140 messageHandlingExecutor);
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800141
Thomas Vachuskacf960112015-03-06 22:36:51 -0800142 // FIXME: Consider consolidating into a single map.
143
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700144 apps = storageService.<ApplicationId, Application>eventuallyConsistentMapBuilder()
145 .withName("apps")
146 .withSerializer(serializer)
Madan Jampanibcf1a482015-06-24 19:05:56 -0700147 .withTimestampProvider((k, v) -> clockService.getTimestamp())
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700148 .build();
Thomas Vachuskacf960112015-03-06 22:36:51 -0800149
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700150 states = storageService.<Application, InternalState>eventuallyConsistentMapBuilder()
151 .withName("app-states")
152 .withSerializer(serializer)
Madan Jampanibcf1a482015-06-24 19:05:56 -0700153 .withTimestampProvider((k, v) -> clockService.getTimestamp())
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700154 .build();
155
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800156 states.addListener(new InternalAppStatesListener());
157
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700158 permissions = storageService.<Application, Set<Permission>>eventuallyConsistentMapBuilder()
159 .withName("app-permissions")
160 .withSerializer(serializer)
Madan Jampanibcf1a482015-06-24 19:05:56 -0700161 .withTimestampProvider((k, v) -> clockService.getTimestamp())
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700162 .build();
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800163
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800164 log.info("Started");
165 }
166
Thomas Vachuskacf960112015-03-06 22:36:51 -0800167 /**
168 * Loads the application inventory from the disk and activates apps if
169 * they are marked to be active.
170 */
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800171 private void loadFromDisk() {
172 for (String name : getApplicationNames()) {
Thomas Vachuska62f04a42015-04-22 14:38:34 -0700173 for (int i = 0; i < MAX_LOAD_RETRIES; i++) {
174 try {
175 Application app = create(getApplicationDescription(name), false);
176 if (app != null && isActive(app.id().name())) {
177 activate(app.id(), false);
178 // load app permissions
179 }
180 } catch (Exception e) {
181 log.warn("Unable to load application {} from disk; retrying", name);
Thomas Vachuskaadba1522015-06-04 15:08:30 -0700182 randomDelay(RETRY_DELAY_MS); // FIXME: This is a deliberate hack; fix in Drake
Thomas Vachuska62f04a42015-04-22 14:38:34 -0700183 }
Thomas Vachuskacf960112015-03-06 22:36:51 -0800184 }
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800185 }
186 }
187
188 @Deactivate
189 public void deactivate() {
Madan Jampani2af244a2015-02-22 13:12:01 -0800190 clusterCommunicator.removeSubscriber(APP_BITS_REQUEST);
191 messageHandlingExecutor.shutdown();
Madan Jampani6b5b7172015-02-23 13:02:26 -0800192 executor.shutdown();
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800193 apps.destroy();
194 states.destroy();
195 permissions.destroy();
196 log.info("Stopped");
197 }
198
199 @Override
Thomas Vachuskacf960112015-03-06 22:36:51 -0800200 public void setDelegate(ApplicationStoreDelegate delegate) {
201 super.setDelegate(delegate);
202 loadFromDisk();
203// executor.schedule(this::pruneUninstalledApps, LOAD_TIMEOUT_MS, MILLISECONDS);
204 }
205
206 @Override
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800207 public Set<Application> getApplications() {
208 return ImmutableSet.copyOf(apps.values());
209 }
210
211 @Override
212 public ApplicationId getId(String name) {
213 return idStore.getAppId(name);
214 }
215
216 @Override
217 public Application getApplication(ApplicationId appId) {
218 return apps.get(appId);
219 }
220
221 @Override
222 public ApplicationState getState(ApplicationId appId) {
223 Application app = apps.get(appId);
224 InternalState s = app == null ? null : states.get(app);
225 return s == null ? null : s == ACTIVATED ?
226 ApplicationState.ACTIVE : ApplicationState.INSTALLED;
227 }
228
229 @Override
230 public Application create(InputStream appDescStream) {
231 ApplicationDescription appDesc = saveApplication(appDescStream);
Thomas Vachuska161baf52015-03-27 16:15:39 -0700232 return create(appDesc, true);
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800233 }
234
Thomas Vachuska161baf52015-03-27 16:15:39 -0700235 private Application create(ApplicationDescription appDesc, boolean updateTime) {
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800236 Application app = registerApp(appDesc);
Thomas Vachuska161baf52015-03-27 16:15:39 -0700237 if (updateTime) {
238 updateTime(app.id().name());
239 }
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800240 apps.put(app.id(), app);
241 states.put(app, INSTALLED);
242 return app;
243 }
244
245 @Override
246 public void remove(ApplicationId appId) {
247 Application app = apps.get(appId);
248 if (app != null) {
249 apps.remove(appId);
250 states.remove(app);
251 permissions.remove(app);
252 }
253 }
254
255 @Override
256 public void activate(ApplicationId appId) {
Thomas Vachuska161baf52015-03-27 16:15:39 -0700257 activate(appId, true);
258 }
259
260 private void activate(ApplicationId appId, boolean updateTime) {
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800261 Application app = apps.get(appId);
262 if (app != null) {
Thomas Vachuska161baf52015-03-27 16:15:39 -0700263 if (updateTime) {
264 updateTime(appId.name());
265 }
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800266 states.put(app, ACTIVATED);
267 }
268 }
269
270 @Override
271 public void deactivate(ApplicationId appId) {
272 Application app = apps.get(appId);
273 if (app != null) {
Thomas Vachuska161baf52015-03-27 16:15:39 -0700274 updateTime(appId.name());
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800275 states.put(app, DEACTIVATED);
276 }
277 }
278
279 @Override
280 public Set<Permission> getPermissions(ApplicationId appId) {
281 Application app = apps.get(appId);
282 return app != null ? permissions.get(app) : null;
283 }
284
285 @Override
286 public void setPermissions(ApplicationId appId, Set<Permission> permissions) {
287 Application app = getApplication(appId);
288 if (app != null) {
289 this.permissions.put(app, permissions);
290 delegate.notify(new ApplicationEvent(APP_PERMISSIONS_CHANGED, app));
291 }
292 }
293
294 /**
295 * Listener to application state distributed map changes.
296 */
297 private final class InternalAppStatesListener
298 implements EventuallyConsistentMapListener<Application, InternalState> {
299 @Override
300 public void event(EventuallyConsistentMapEvent<Application, InternalState> event) {
Thomas Vachuska4fcdae72015-03-24 10:22:54 -0700301 // If we do not have a delegate, refuse to process any events entirely.
302 // This is to allow the anti-entropy to kick in and process the events
303 // perhaps a bit later, but with opportunity to notify delegate.
304 if (delegate == null) {
305 return;
306 }
307
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800308 Application app = event.key();
309 InternalState state = event.value();
310
311 if (event.type() == PUT) {
312 if (state == INSTALLED) {
313 fetchBitsIfNeeded(app);
314 delegate.notify(new ApplicationEvent(APP_INSTALLED, app));
315
316 } else if (state == ACTIVATED) {
317 installAppIfNeeded(app);
318 setActive(app.id().name());
319 delegate.notify(new ApplicationEvent(APP_ACTIVATED, app));
320
321 } else if (state == DEACTIVATED) {
322 clearActive(app.id().name());
323 delegate.notify(new ApplicationEvent(APP_DEACTIVATED, app));
324 }
325 } else if (event.type() == REMOVE) {
326 delegate.notify(new ApplicationEvent(APP_UNINSTALLED, app));
327 purgeApplication(app.id().name());
328 }
329 }
330 }
331
332 /**
333 * Determines if the application bits are available locally.
334 */
335 private boolean appBitsAvailable(Application app) {
336 try {
337 ApplicationDescription appDesc = getApplicationDescription(app.id().name());
338 return appDesc.version().equals(app.version());
339 } catch (ApplicationException e) {
340 return false;
341 }
342 }
343
344 /**
345 * Fetches the bits from the cluster peers if necessary.
346 */
347 private void fetchBitsIfNeeded(Application app) {
348 if (!appBitsAvailable(app)) {
349 fetchBits(app);
350 }
351 }
352
353 /**
354 * Installs the application if necessary from the application peers.
355 */
356 private void installAppIfNeeded(Application app) {
357 if (!appBitsAvailable(app)) {
358 fetchBits(app);
359 delegate.notify(new ApplicationEvent(APP_INSTALLED, app));
360 }
361 }
362
363 /**
364 * Fetches the bits from the cluster peers.
365 */
366 private void fetchBits(Application app) {
367 ControllerNode localNode = clusterService.getLocalNode();
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800368 CountDownLatch latch = new CountDownLatch(1);
369
370 // FIXME: send message with name & version to make sure we don't get served old bits
371
372 log.info("Downloading bits for application {}", app.id().name());
373 for (ControllerNode node : clusterService.getNodes()) {
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700374 if (latch.getCount() == 0) {
375 break;
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800376 }
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700377 if (node.equals(localNode)) {
378 continue;
379 }
380 clusterCommunicator.sendAndReceive(app.id().name(),
Thomas Vachuskad0d58542015-06-03 12:38:44 -0700381 APP_BITS_REQUEST,
382 s -> s.getBytes(Charsets.UTF_8),
383 Function.identity(),
384 node.id())
385 .whenCompleteAsync((bits, error) -> {
386 if (error == null && latch.getCount() > 0) {
387 saveApplication(new ByteArrayInputStream(bits));
388 log.info("Downloaded bits for application {} from node {}",
389 app.id().name(), node.id());
390 latch.countDown();
391 } else if (error != null) {
392 log.warn("Unable to fetch bits for application {} from node {}",
393 app.id().name(), node.id());
394 }
395 }, executor);
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800396 }
397
398 try {
399 if (!latch.await(FETCH_TIMEOUT_MS, MILLISECONDS)) {
400 log.warn("Unable to fetch bits for application {}", app.id().name());
401 }
402 } catch (InterruptedException e) {
403 log.warn("Interrupted while fetching bits for application {}", app.id().name());
404 }
405 }
406
407 /**
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800408 * Prunes applications which are not in the map, but are on disk.
409 */
410 private void pruneUninstalledApps() {
411 for (String name : getApplicationNames()) {
412 if (getApplication(getId(name)) == null) {
413 Application app = registerApp(getApplicationDescription(name));
414 delegate.notify(new ApplicationEvent(APP_UNINSTALLED, app));
415 purgeApplication(app.id().name());
416 }
417 }
418 }
419
420 /**
421 * Produces a registered application from the supplied description.
422 */
423 private Application registerApp(ApplicationDescription appDesc) {
424 ApplicationId appId = idStore.registerApplication(appDesc.name());
425 return new DefaultApplication(appId, appDesc.version(), appDesc.description(),
Changhoon Yoonbdeb88a2015-05-12 20:35:31 +0900426 appDesc.origin(), appDesc.role(), appDesc.permissions(),
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800427 appDesc.featuresRepo(), appDesc.features());
428 }
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800429}