blob: b6c3b984ac114fac6a69e8ad1f84d551db279b70 [file] [log] [blame]
Thomas Vachuska90b453f2015-01-30 18:57:14 -08001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.app;
17
Yuta HIGUCHI80292052015-02-10 23:11:59 -080018import com.google.common.base.Charsets;
Thomas Vachuska90b453f2015-01-30 18:57:14 -080019import com.google.common.collect.ImmutableSet;
Thomas Vachuska90b453f2015-01-30 18:57:14 -080020import org.apache.felix.scr.annotations.Activate;
21import org.apache.felix.scr.annotations.Component;
22import org.apache.felix.scr.annotations.Deactivate;
23import org.apache.felix.scr.annotations.Reference;
24import org.apache.felix.scr.annotations.ReferenceCardinality;
25import org.apache.felix.scr.annotations.Service;
26import org.onlab.util.KryoNamespace;
27import org.onosproject.app.ApplicationDescription;
28import org.onosproject.app.ApplicationEvent;
29import org.onosproject.app.ApplicationException;
30import org.onosproject.app.ApplicationState;
31import org.onosproject.app.ApplicationStore;
Thomas Vachuskacf960112015-03-06 22:36:51 -080032import org.onosproject.app.ApplicationStoreDelegate;
Thomas Vachuska90b453f2015-01-30 18:57:14 -080033import org.onosproject.cluster.ClusterService;
34import org.onosproject.cluster.ControllerNode;
35import org.onosproject.common.app.ApplicationArchive;
36import org.onosproject.core.Application;
37import org.onosproject.core.ApplicationId;
38import org.onosproject.core.ApplicationIdStore;
39import org.onosproject.core.DefaultApplication;
40import org.onosproject.core.Permission;
41import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
42import org.onosproject.store.cluster.messaging.ClusterMessage;
43import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
44import org.onosproject.store.cluster.messaging.MessageSubject;
Thomas Vachuska90b453f2015-01-30 18:57:14 -080045import org.onosproject.store.serializers.KryoNamespaces;
Jonathan Hart6ec029a2015-03-24 17:12:35 -070046import org.onosproject.store.service.EventuallyConsistentMap;
47import org.onosproject.store.service.EventuallyConsistentMapEvent;
48import org.onosproject.store.service.EventuallyConsistentMapListener;
Madan Jampani3e033bd2015-04-08 13:03:49 -070049import org.onosproject.store.service.LogicalClockService;
Thomas Vachuskad0d58542015-06-03 12:38:44 -070050import org.onosproject.store.service.MultiValuedTimestamp;
Jonathan Hart6ec029a2015-03-24 17:12:35 -070051import org.onosproject.store.service.StorageService;
Thomas Vachuska90b453f2015-01-30 18:57:14 -080052import org.slf4j.Logger;
53
54import java.io.ByteArrayInputStream;
Thomas Vachuska90b453f2015-01-30 18:57:14 -080055import java.io.InputStream;
Thomas Vachuska90b453f2015-01-30 18:57:14 -080056import java.util.Set;
57import java.util.concurrent.CountDownLatch;
Madan Jampani2af244a2015-02-22 13:12:01 -080058import java.util.concurrent.ExecutorService;
Thomas Vachuska90b453f2015-01-30 18:57:14 -080059import java.util.concurrent.Executors;
60import java.util.concurrent.ScheduledExecutorService;
Madan Jampani2bfa94c2015-04-11 05:03:49 -070061import java.util.function.Function;
62
Thomas Vachuska90b453f2015-01-30 18:57:14 -080063import static com.google.common.io.ByteStreams.toByteArray;
64import static java.util.concurrent.TimeUnit.MILLISECONDS;
Thomas Vachuska6f94ded2015-02-21 14:02:38 -080065import static org.onlab.util.Tools.groupedThreads;
Thomas Vachuskaadba1522015-06-04 15:08:30 -070066import static org.onlab.util.Tools.randomDelay;
Thomas Vachuskad0d58542015-06-03 12:38:44 -070067import static org.onosproject.app.ApplicationEvent.Type.*;
68import static org.onosproject.store.app.GossipApplicationStore.InternalState.*;
Jonathan Hart6ec029a2015-03-24 17:12:35 -070069import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.PUT;
70import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.REMOVE;
Thomas Vachuska90b453f2015-01-30 18:57:14 -080071import static org.slf4j.LoggerFactory.getLogger;
72
73/**
74 * Manages inventory of applications in a distributed data store that uses
75 * optimistic replication and gossip based anti-entropy techniques.
76 */
77@Component(immediate = true)
78@Service
79public class GossipApplicationStore extends ApplicationArchive
80 implements ApplicationStore {
81
82 private final Logger log = getLogger(getClass());
83
84 private static final MessageSubject APP_BITS_REQUEST = new MessageSubject("app-bits-request");
85
Thomas Vachuskaadba1522015-06-04 15:08:30 -070086 private static final int MAX_LOAD_RETRIES = 5;
Thomas Vachuskad0d58542015-06-03 12:38:44 -070087 private static final int RETRY_DELAY_MS = 2_000;
88
Thomas Vachuska90b453f2015-01-30 18:57:14 -080089 private static final int FETCH_TIMEOUT_MS = 10_000;
Thomas Vachuska90b453f2015-01-30 18:57:14 -080090
91 public enum InternalState {
92 INSTALLED, ACTIVATED, DEACTIVATED
93 }
94
Madan Jampani6b5b7172015-02-23 13:02:26 -080095 private ScheduledExecutorService executor;
Madan Jampani2af244a2015-02-22 13:12:01 -080096 private ExecutorService messageHandlingExecutor;
97
Thomas Vachuska90b453f2015-01-30 18:57:14 -080098 private EventuallyConsistentMap<ApplicationId, Application> apps;
99 private EventuallyConsistentMap<Application, InternalState> states;
100 private EventuallyConsistentMap<Application, Set<Permission>> permissions;
101
102 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
103 protected ClusterCommunicationService clusterCommunicator;
104
105 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
106 protected ClusterService clusterService;
107
108 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700109 protected StorageService storageService;
110
111 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Madan Jampani3e033bd2015-04-08 13:03:49 -0700112 protected LogicalClockService clockService;
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800113
Madan Jampani3e033bd2015-04-08 13:03:49 -0700114 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
115 protected ApplicationIdStore idStore;
Thomas Vachuskacf960112015-03-06 22:36:51 -0800116
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800117 @Activate
118 public void activate() {
Thomas Vachuskacf960112015-03-06 22:36:51 -0800119 KryoNamespace.Builder serializer = KryoNamespace.newBuilder()
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800120 .register(KryoNamespaces.API)
Thomas Vachuskacf960112015-03-06 22:36:51 -0800121 .register(MultiValuedTimestamp.class)
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800122 .register(InternalState.class);
123
Madan Jampani6b5b7172015-02-23 13:02:26 -0800124 executor = Executors.newSingleThreadScheduledExecutor(groupedThreads("onos/app", "store"));
125
Madan Jampani2af244a2015-02-22 13:12:01 -0800126 messageHandlingExecutor = Executors.newSingleThreadExecutor(
127 groupedThreads("onos/store/app", "message-handler"));
128
129 clusterCommunicator.addSubscriber(APP_BITS_REQUEST, new InternalBitServer(), messageHandlingExecutor);
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800130
Thomas Vachuskacf960112015-03-06 22:36:51 -0800131 // FIXME: Consider consolidating into a single map.
132
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700133 apps = storageService.<ApplicationId, Application>eventuallyConsistentMapBuilder()
134 .withName("apps")
135 .withSerializer(serializer)
Madan Jampani3e033bd2015-04-08 13:03:49 -0700136 .withClockService((k, v) -> clockService.getTimestamp())
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700137 .build();
Thomas Vachuskacf960112015-03-06 22:36:51 -0800138
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700139 states = storageService.<Application, InternalState>eventuallyConsistentMapBuilder()
140 .withName("app-states")
141 .withSerializer(serializer)
Madan Jampani3e033bd2015-04-08 13:03:49 -0700142 .withClockService((k, v) -> clockService.getTimestamp())
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700143 .build();
144
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800145 states.addListener(new InternalAppStatesListener());
146
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700147 permissions = storageService.<Application, Set<Permission>>eventuallyConsistentMapBuilder()
148 .withName("app-permissions")
149 .withSerializer(serializer)
Madan Jampani3e033bd2015-04-08 13:03:49 -0700150 .withClockService((k, v) -> clockService.getTimestamp())
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700151 .build();
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800152
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800153 log.info("Started");
154 }
155
Thomas Vachuskacf960112015-03-06 22:36:51 -0800156 /**
157 * Loads the application inventory from the disk and activates apps if
158 * they are marked to be active.
159 */
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800160 private void loadFromDisk() {
161 for (String name : getApplicationNames()) {
Thomas Vachuska62f04a42015-04-22 14:38:34 -0700162 for (int i = 0; i < MAX_LOAD_RETRIES; i++) {
163 try {
164 Application app = create(getApplicationDescription(name), false);
165 if (app != null && isActive(app.id().name())) {
166 activate(app.id(), false);
167 // load app permissions
168 }
169 } catch (Exception e) {
170 log.warn("Unable to load application {} from disk; retrying", name);
Thomas Vachuskaadba1522015-06-04 15:08:30 -0700171 randomDelay(RETRY_DELAY_MS); // FIXME: This is a deliberate hack; fix in Drake
Thomas Vachuska62f04a42015-04-22 14:38:34 -0700172 }
Thomas Vachuskacf960112015-03-06 22:36:51 -0800173 }
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800174 }
175 }
176
177 @Deactivate
178 public void deactivate() {
Madan Jampani2af244a2015-02-22 13:12:01 -0800179 clusterCommunicator.removeSubscriber(APP_BITS_REQUEST);
180 messageHandlingExecutor.shutdown();
Madan Jampani6b5b7172015-02-23 13:02:26 -0800181 executor.shutdown();
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800182 apps.destroy();
183 states.destroy();
184 permissions.destroy();
185 log.info("Stopped");
186 }
187
188 @Override
Thomas Vachuskacf960112015-03-06 22:36:51 -0800189 public void setDelegate(ApplicationStoreDelegate delegate) {
190 super.setDelegate(delegate);
191 loadFromDisk();
192// executor.schedule(this::pruneUninstalledApps, LOAD_TIMEOUT_MS, MILLISECONDS);
193 }
194
195 @Override
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800196 public Set<Application> getApplications() {
197 return ImmutableSet.copyOf(apps.values());
198 }
199
200 @Override
201 public ApplicationId getId(String name) {
202 return idStore.getAppId(name);
203 }
204
205 @Override
206 public Application getApplication(ApplicationId appId) {
207 return apps.get(appId);
208 }
209
210 @Override
211 public ApplicationState getState(ApplicationId appId) {
212 Application app = apps.get(appId);
213 InternalState s = app == null ? null : states.get(app);
214 return s == null ? null : s == ACTIVATED ?
215 ApplicationState.ACTIVE : ApplicationState.INSTALLED;
216 }
217
218 @Override
219 public Application create(InputStream appDescStream) {
220 ApplicationDescription appDesc = saveApplication(appDescStream);
Thomas Vachuska161baf52015-03-27 16:15:39 -0700221 return create(appDesc, true);
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800222 }
223
Thomas Vachuska161baf52015-03-27 16:15:39 -0700224 private Application create(ApplicationDescription appDesc, boolean updateTime) {
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800225 Application app = registerApp(appDesc);
Thomas Vachuska161baf52015-03-27 16:15:39 -0700226 if (updateTime) {
227 updateTime(app.id().name());
228 }
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800229 apps.put(app.id(), app);
230 states.put(app, INSTALLED);
231 return app;
232 }
233
234 @Override
235 public void remove(ApplicationId appId) {
236 Application app = apps.get(appId);
237 if (app != null) {
238 apps.remove(appId);
239 states.remove(app);
240 permissions.remove(app);
241 }
242 }
243
244 @Override
245 public void activate(ApplicationId appId) {
Thomas Vachuska161baf52015-03-27 16:15:39 -0700246 activate(appId, true);
247 }
248
249 private void activate(ApplicationId appId, boolean updateTime) {
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800250 Application app = apps.get(appId);
251 if (app != null) {
Thomas Vachuska161baf52015-03-27 16:15:39 -0700252 if (updateTime) {
253 updateTime(appId.name());
254 }
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800255 states.put(app, ACTIVATED);
256 }
257 }
258
259 @Override
260 public void deactivate(ApplicationId appId) {
261 Application app = apps.get(appId);
262 if (app != null) {
Thomas Vachuska161baf52015-03-27 16:15:39 -0700263 updateTime(appId.name());
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800264 states.put(app, DEACTIVATED);
265 }
266 }
267
268 @Override
269 public Set<Permission> getPermissions(ApplicationId appId) {
270 Application app = apps.get(appId);
271 return app != null ? permissions.get(app) : null;
272 }
273
274 @Override
275 public void setPermissions(ApplicationId appId, Set<Permission> permissions) {
276 Application app = getApplication(appId);
277 if (app != null) {
278 this.permissions.put(app, permissions);
279 delegate.notify(new ApplicationEvent(APP_PERMISSIONS_CHANGED, app));
280 }
281 }
282
283 /**
284 * Listener to application state distributed map changes.
285 */
286 private final class InternalAppStatesListener
287 implements EventuallyConsistentMapListener<Application, InternalState> {
288 @Override
289 public void event(EventuallyConsistentMapEvent<Application, InternalState> event) {
Thomas Vachuska4fcdae72015-03-24 10:22:54 -0700290 // If we do not have a delegate, refuse to process any events entirely.
291 // This is to allow the anti-entropy to kick in and process the events
292 // perhaps a bit later, but with opportunity to notify delegate.
293 if (delegate == null) {
294 return;
295 }
296
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800297 Application app = event.key();
298 InternalState state = event.value();
299
300 if (event.type() == PUT) {
301 if (state == INSTALLED) {
302 fetchBitsIfNeeded(app);
303 delegate.notify(new ApplicationEvent(APP_INSTALLED, app));
304
305 } else if (state == ACTIVATED) {
306 installAppIfNeeded(app);
307 setActive(app.id().name());
308 delegate.notify(new ApplicationEvent(APP_ACTIVATED, app));
309
310 } else if (state == DEACTIVATED) {
311 clearActive(app.id().name());
312 delegate.notify(new ApplicationEvent(APP_DEACTIVATED, app));
313 }
314 } else if (event.type() == REMOVE) {
315 delegate.notify(new ApplicationEvent(APP_UNINSTALLED, app));
316 purgeApplication(app.id().name());
317 }
318 }
319 }
320
321 /**
322 * Determines if the application bits are available locally.
323 */
324 private boolean appBitsAvailable(Application app) {
325 try {
326 ApplicationDescription appDesc = getApplicationDescription(app.id().name());
327 return appDesc.version().equals(app.version());
328 } catch (ApplicationException e) {
329 return false;
330 }
331 }
332
333 /**
334 * Fetches the bits from the cluster peers if necessary.
335 */
336 private void fetchBitsIfNeeded(Application app) {
337 if (!appBitsAvailable(app)) {
338 fetchBits(app);
339 }
340 }
341
342 /**
343 * Installs the application if necessary from the application peers.
344 */
345 private void installAppIfNeeded(Application app) {
346 if (!appBitsAvailable(app)) {
347 fetchBits(app);
348 delegate.notify(new ApplicationEvent(APP_INSTALLED, app));
349 }
350 }
351
352 /**
353 * Fetches the bits from the cluster peers.
354 */
355 private void fetchBits(Application app) {
356 ControllerNode localNode = clusterService.getLocalNode();
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800357 CountDownLatch latch = new CountDownLatch(1);
358
359 // FIXME: send message with name & version to make sure we don't get served old bits
360
361 log.info("Downloading bits for application {}", app.id().name());
362 for (ControllerNode node : clusterService.getNodes()) {
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700363 if (latch.getCount() == 0) {
364 break;
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800365 }
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700366 if (node.equals(localNode)) {
367 continue;
368 }
369 clusterCommunicator.sendAndReceive(app.id().name(),
Thomas Vachuskad0d58542015-06-03 12:38:44 -0700370 APP_BITS_REQUEST,
371 s -> s.getBytes(Charsets.UTF_8),
372 Function.identity(),
373 node.id())
374 .whenCompleteAsync((bits, error) -> {
375 if (error == null && latch.getCount() > 0) {
376 saveApplication(new ByteArrayInputStream(bits));
377 log.info("Downloaded bits for application {} from node {}",
378 app.id().name(), node.id());
379 latch.countDown();
380 } else if (error != null) {
381 log.warn("Unable to fetch bits for application {} from node {}",
382 app.id().name(), node.id());
383 }
384 }, executor);
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800385 }
386
387 try {
388 if (!latch.await(FETCH_TIMEOUT_MS, MILLISECONDS)) {
389 log.warn("Unable to fetch bits for application {}", app.id().name());
390 }
391 } catch (InterruptedException e) {
392 log.warn("Interrupted while fetching bits for application {}", app.id().name());
393 }
394 }
395
396 /**
397 * Responder to requests for application bits.
398 */
399 private class InternalBitServer implements ClusterMessageHandler {
400 @Override
401 public void handle(ClusterMessage message) {
Yuta HIGUCHI80292052015-02-10 23:11:59 -0800402 String name = new String(message.payload(), Charsets.UTF_8);
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800403 try {
404 message.respond(toByteArray(getApplicationInputStream(name)));
405 } catch (Exception e) {
406 log.debug("Unable to read bits for application {}", name);
407 }
408 }
409 }
Thomas Vachuskad0d58542015-06-03 12:38:44 -0700410
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800411 /**
412 * Prunes applications which are not in the map, but are on disk.
413 */
414 private void pruneUninstalledApps() {
415 for (String name : getApplicationNames()) {
416 if (getApplication(getId(name)) == null) {
417 Application app = registerApp(getApplicationDescription(name));
418 delegate.notify(new ApplicationEvent(APP_UNINSTALLED, app));
419 purgeApplication(app.id().name());
420 }
421 }
422 }
423
424 /**
425 * Produces a registered application from the supplied description.
426 */
427 private Application registerApp(ApplicationDescription appDesc) {
428 ApplicationId appId = idStore.registerApplication(appDesc.name());
429 return new DefaultApplication(appId, appDesc.version(), appDesc.description(),
Changhoon Yoonbdeb88a2015-05-12 20:35:31 +0900430 appDesc.origin(), appDesc.role(), appDesc.permissions(),
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800431 appDesc.featuresRepo(), appDesc.features());
432 }
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800433}