blob: 936cf4eedcb9bb19c8e3aad9ebe734de9e868748 [file] [log] [blame]
Thomas Vachuska90b453f2015-01-30 18:57:14 -08001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.app;
17
Yuta HIGUCHI80292052015-02-10 23:11:59 -080018import com.google.common.base.Charsets;
Thomas Vachuska90b453f2015-01-30 18:57:14 -080019import com.google.common.collect.ImmutableSet;
Thomas Vachuska90b453f2015-01-30 18:57:14 -080020import org.apache.felix.scr.annotations.Activate;
21import org.apache.felix.scr.annotations.Component;
22import org.apache.felix.scr.annotations.Deactivate;
23import org.apache.felix.scr.annotations.Reference;
24import org.apache.felix.scr.annotations.ReferenceCardinality;
25import org.apache.felix.scr.annotations.Service;
26import org.onlab.util.KryoNamespace;
27import org.onosproject.app.ApplicationDescription;
28import org.onosproject.app.ApplicationEvent;
29import org.onosproject.app.ApplicationException;
30import org.onosproject.app.ApplicationState;
31import org.onosproject.app.ApplicationStore;
Thomas Vachuskacf960112015-03-06 22:36:51 -080032import org.onosproject.app.ApplicationStoreDelegate;
Thomas Vachuska90b453f2015-01-30 18:57:14 -080033import org.onosproject.cluster.ClusterService;
34import org.onosproject.cluster.ControllerNode;
35import org.onosproject.common.app.ApplicationArchive;
36import org.onosproject.core.Application;
37import org.onosproject.core.ApplicationId;
38import org.onosproject.core.ApplicationIdStore;
39import org.onosproject.core.DefaultApplication;
40import org.onosproject.core.Permission;
41import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
42import org.onosproject.store.cluster.messaging.ClusterMessage;
43import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
44import org.onosproject.store.cluster.messaging.MessageSubject;
Thomas Vachuska90b453f2015-01-30 18:57:14 -080045import org.onosproject.store.serializers.KryoNamespaces;
Jonathan Hart6ec029a2015-03-24 17:12:35 -070046import org.onosproject.store.service.EventuallyConsistentMap;
47import org.onosproject.store.service.EventuallyConsistentMapEvent;
48import org.onosproject.store.service.EventuallyConsistentMapListener;
Madan Jampani3e033bd2015-04-08 13:03:49 -070049import org.onosproject.store.service.LogicalClockService;
Thomas Vachuskad0d58542015-06-03 12:38:44 -070050import org.onosproject.store.service.MultiValuedTimestamp;
Jonathan Hart6ec029a2015-03-24 17:12:35 -070051import org.onosproject.store.service.StorageService;
Thomas Vachuska90b453f2015-01-30 18:57:14 -080052import org.slf4j.Logger;
53
54import java.io.ByteArrayInputStream;
Thomas Vachuska90b453f2015-01-30 18:57:14 -080055import java.io.InputStream;
Thomas Vachuska90b453f2015-01-30 18:57:14 -080056import java.util.Set;
57import java.util.concurrent.CountDownLatch;
Madan Jampani2af244a2015-02-22 13:12:01 -080058import java.util.concurrent.ExecutorService;
Thomas Vachuska90b453f2015-01-30 18:57:14 -080059import java.util.concurrent.Executors;
60import java.util.concurrent.ScheduledExecutorService;
Madan Jampani2bfa94c2015-04-11 05:03:49 -070061import java.util.function.Function;
62
Thomas Vachuska90b453f2015-01-30 18:57:14 -080063import static com.google.common.io.ByteStreams.toByteArray;
64import static java.util.concurrent.TimeUnit.MILLISECONDS;
Thomas Vachuskad0d58542015-06-03 12:38:44 -070065import static org.onlab.util.Tools.delay;
Thomas Vachuska6f94ded2015-02-21 14:02:38 -080066import static org.onlab.util.Tools.groupedThreads;
Thomas Vachuskad0d58542015-06-03 12:38:44 -070067import static org.onosproject.app.ApplicationEvent.Type.*;
68import static org.onosproject.store.app.GossipApplicationStore.InternalState.*;
Jonathan Hart6ec029a2015-03-24 17:12:35 -070069import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.PUT;
70import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.REMOVE;
Thomas Vachuska90b453f2015-01-30 18:57:14 -080071import static org.slf4j.LoggerFactory.getLogger;
72
73/**
74 * Manages inventory of applications in a distributed data store that uses
75 * optimistic replication and gossip based anti-entropy techniques.
76 */
77@Component(immediate = true)
78@Service
79public class GossipApplicationStore extends ApplicationArchive
80 implements ApplicationStore {
81
82 private final Logger log = getLogger(getClass());
83
84 private static final MessageSubject APP_BITS_REQUEST = new MessageSubject("app-bits-request");
85
Thomas Vachuskad0d58542015-06-03 12:38:44 -070086 private static final int MAX_LOAD_RETRIES = 3;
87 private static final int RETRY_DELAY_MS = 2_000;
88
Thomas Vachuska90b453f2015-01-30 18:57:14 -080089 private static final int FETCH_TIMEOUT_MS = 10_000;
90 private static final int LOAD_TIMEOUT_MS = 5_000;
91
92 public enum InternalState {
93 INSTALLED, ACTIVATED, DEACTIVATED
94 }
95
Madan Jampani6b5b7172015-02-23 13:02:26 -080096 private ScheduledExecutorService executor;
Madan Jampani2af244a2015-02-22 13:12:01 -080097 private ExecutorService messageHandlingExecutor;
98
Thomas Vachuska90b453f2015-01-30 18:57:14 -080099 private EventuallyConsistentMap<ApplicationId, Application> apps;
100 private EventuallyConsistentMap<Application, InternalState> states;
101 private EventuallyConsistentMap<Application, Set<Permission>> permissions;
102
103 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
104 protected ClusterCommunicationService clusterCommunicator;
105
106 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
107 protected ClusterService clusterService;
108
109 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700110 protected StorageService storageService;
111
112 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Madan Jampani3e033bd2015-04-08 13:03:49 -0700113 protected LogicalClockService clockService;
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800114
Madan Jampani3e033bd2015-04-08 13:03:49 -0700115 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
116 protected ApplicationIdStore idStore;
Thomas Vachuskacf960112015-03-06 22:36:51 -0800117
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800118 @Activate
119 public void activate() {
Thomas Vachuskacf960112015-03-06 22:36:51 -0800120 KryoNamespace.Builder serializer = KryoNamespace.newBuilder()
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800121 .register(KryoNamespaces.API)
Thomas Vachuskacf960112015-03-06 22:36:51 -0800122 .register(MultiValuedTimestamp.class)
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800123 .register(InternalState.class);
124
Madan Jampani6b5b7172015-02-23 13:02:26 -0800125 executor = Executors.newSingleThreadScheduledExecutor(groupedThreads("onos/app", "store"));
126
Madan Jampani2af244a2015-02-22 13:12:01 -0800127 messageHandlingExecutor = Executors.newSingleThreadExecutor(
128 groupedThreads("onos/store/app", "message-handler"));
129
130 clusterCommunicator.addSubscriber(APP_BITS_REQUEST, new InternalBitServer(), messageHandlingExecutor);
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800131
Thomas Vachuskacf960112015-03-06 22:36:51 -0800132 // FIXME: Consider consolidating into a single map.
133
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700134 apps = storageService.<ApplicationId, Application>eventuallyConsistentMapBuilder()
135 .withName("apps")
136 .withSerializer(serializer)
Madan Jampani3e033bd2015-04-08 13:03:49 -0700137 .withClockService((k, v) -> clockService.getTimestamp())
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700138 .build();
Thomas Vachuskacf960112015-03-06 22:36:51 -0800139
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700140 states = storageService.<Application, InternalState>eventuallyConsistentMapBuilder()
141 .withName("app-states")
142 .withSerializer(serializer)
Madan Jampani3e033bd2015-04-08 13:03:49 -0700143 .withClockService((k, v) -> clockService.getTimestamp())
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700144 .build();
145
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800146 states.addListener(new InternalAppStatesListener());
147
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700148 permissions = storageService.<Application, Set<Permission>>eventuallyConsistentMapBuilder()
149 .withName("app-permissions")
150 .withSerializer(serializer)
Madan Jampani3e033bd2015-04-08 13:03:49 -0700151 .withClockService((k, v) -> clockService.getTimestamp())
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700152 .build();
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800153
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800154 log.info("Started");
155 }
156
Thomas Vachuskacf960112015-03-06 22:36:51 -0800157 /**
158 * Loads the application inventory from the disk and activates apps if
159 * they are marked to be active.
160 */
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800161 private void loadFromDisk() {
162 for (String name : getApplicationNames()) {
Thomas Vachuska62f04a42015-04-22 14:38:34 -0700163 for (int i = 0; i < MAX_LOAD_RETRIES; i++) {
164 try {
165 Application app = create(getApplicationDescription(name), false);
166 if (app != null && isActive(app.id().name())) {
167 activate(app.id(), false);
168 // load app permissions
169 }
170 } catch (Exception e) {
171 log.warn("Unable to load application {} from disk; retrying", name);
Thomas Vachuskad0d58542015-06-03 12:38:44 -0700172 delay(RETRY_DELAY_MS); // FIXME: This is a deliberate hack; fix in Drake
Thomas Vachuska62f04a42015-04-22 14:38:34 -0700173 }
Thomas Vachuskacf960112015-03-06 22:36:51 -0800174 }
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800175 }
176 }
177
178 @Deactivate
179 public void deactivate() {
Madan Jampani2af244a2015-02-22 13:12:01 -0800180 clusterCommunicator.removeSubscriber(APP_BITS_REQUEST);
181 messageHandlingExecutor.shutdown();
Madan Jampani6b5b7172015-02-23 13:02:26 -0800182 executor.shutdown();
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800183 apps.destroy();
184 states.destroy();
185 permissions.destroy();
186 log.info("Stopped");
187 }
188
189 @Override
Thomas Vachuskacf960112015-03-06 22:36:51 -0800190 public void setDelegate(ApplicationStoreDelegate delegate) {
191 super.setDelegate(delegate);
192 loadFromDisk();
193// executor.schedule(this::pruneUninstalledApps, LOAD_TIMEOUT_MS, MILLISECONDS);
194 }
195
196 @Override
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800197 public Set<Application> getApplications() {
198 return ImmutableSet.copyOf(apps.values());
199 }
200
201 @Override
202 public ApplicationId getId(String name) {
203 return idStore.getAppId(name);
204 }
205
206 @Override
207 public Application getApplication(ApplicationId appId) {
208 return apps.get(appId);
209 }
210
211 @Override
212 public ApplicationState getState(ApplicationId appId) {
213 Application app = apps.get(appId);
214 InternalState s = app == null ? null : states.get(app);
215 return s == null ? null : s == ACTIVATED ?
216 ApplicationState.ACTIVE : ApplicationState.INSTALLED;
217 }
218
219 @Override
220 public Application create(InputStream appDescStream) {
221 ApplicationDescription appDesc = saveApplication(appDescStream);
Thomas Vachuska161baf52015-03-27 16:15:39 -0700222 return create(appDesc, true);
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800223 }
224
Thomas Vachuska161baf52015-03-27 16:15:39 -0700225 private Application create(ApplicationDescription appDesc, boolean updateTime) {
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800226 Application app = registerApp(appDesc);
Thomas Vachuska161baf52015-03-27 16:15:39 -0700227 if (updateTime) {
228 updateTime(app.id().name());
229 }
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800230 apps.put(app.id(), app);
231 states.put(app, INSTALLED);
232 return app;
233 }
234
235 @Override
236 public void remove(ApplicationId appId) {
237 Application app = apps.get(appId);
238 if (app != null) {
239 apps.remove(appId);
240 states.remove(app);
241 permissions.remove(app);
242 }
243 }
244
245 @Override
246 public void activate(ApplicationId appId) {
Thomas Vachuska161baf52015-03-27 16:15:39 -0700247 activate(appId, true);
248 }
249
250 private void activate(ApplicationId appId, boolean updateTime) {
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800251 Application app = apps.get(appId);
252 if (app != null) {
Thomas Vachuska161baf52015-03-27 16:15:39 -0700253 if (updateTime) {
254 updateTime(appId.name());
255 }
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800256 states.put(app, ACTIVATED);
257 }
258 }
259
260 @Override
261 public void deactivate(ApplicationId appId) {
262 Application app = apps.get(appId);
263 if (app != null) {
Thomas Vachuska161baf52015-03-27 16:15:39 -0700264 updateTime(appId.name());
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800265 states.put(app, DEACTIVATED);
266 }
267 }
268
269 @Override
270 public Set<Permission> getPermissions(ApplicationId appId) {
271 Application app = apps.get(appId);
272 return app != null ? permissions.get(app) : null;
273 }
274
275 @Override
276 public void setPermissions(ApplicationId appId, Set<Permission> permissions) {
277 Application app = getApplication(appId);
278 if (app != null) {
279 this.permissions.put(app, permissions);
280 delegate.notify(new ApplicationEvent(APP_PERMISSIONS_CHANGED, app));
281 }
282 }
283
284 /**
285 * Listener to application state distributed map changes.
286 */
287 private final class InternalAppStatesListener
288 implements EventuallyConsistentMapListener<Application, InternalState> {
289 @Override
290 public void event(EventuallyConsistentMapEvent<Application, InternalState> event) {
Thomas Vachuska4fcdae72015-03-24 10:22:54 -0700291 // If we do not have a delegate, refuse to process any events entirely.
292 // This is to allow the anti-entropy to kick in and process the events
293 // perhaps a bit later, but with opportunity to notify delegate.
294 if (delegate == null) {
295 return;
296 }
297
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800298 Application app = event.key();
299 InternalState state = event.value();
300
301 if (event.type() == PUT) {
302 if (state == INSTALLED) {
303 fetchBitsIfNeeded(app);
304 delegate.notify(new ApplicationEvent(APP_INSTALLED, app));
305
306 } else if (state == ACTIVATED) {
307 installAppIfNeeded(app);
308 setActive(app.id().name());
309 delegate.notify(new ApplicationEvent(APP_ACTIVATED, app));
310
311 } else if (state == DEACTIVATED) {
312 clearActive(app.id().name());
313 delegate.notify(new ApplicationEvent(APP_DEACTIVATED, app));
314 }
315 } else if (event.type() == REMOVE) {
316 delegate.notify(new ApplicationEvent(APP_UNINSTALLED, app));
317 purgeApplication(app.id().name());
318 }
319 }
320 }
321
322 /**
323 * Determines if the application bits are available locally.
324 */
325 private boolean appBitsAvailable(Application app) {
326 try {
327 ApplicationDescription appDesc = getApplicationDescription(app.id().name());
328 return appDesc.version().equals(app.version());
329 } catch (ApplicationException e) {
330 return false;
331 }
332 }
333
334 /**
335 * Fetches the bits from the cluster peers if necessary.
336 */
337 private void fetchBitsIfNeeded(Application app) {
338 if (!appBitsAvailable(app)) {
339 fetchBits(app);
340 }
341 }
342
343 /**
344 * Installs the application if necessary from the application peers.
345 */
346 private void installAppIfNeeded(Application app) {
347 if (!appBitsAvailable(app)) {
348 fetchBits(app);
349 delegate.notify(new ApplicationEvent(APP_INSTALLED, app));
350 }
351 }
352
353 /**
354 * Fetches the bits from the cluster peers.
355 */
356 private void fetchBits(Application app) {
357 ControllerNode localNode = clusterService.getLocalNode();
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800358 CountDownLatch latch = new CountDownLatch(1);
359
360 // FIXME: send message with name & version to make sure we don't get served old bits
361
362 log.info("Downloading bits for application {}", app.id().name());
363 for (ControllerNode node : clusterService.getNodes()) {
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700364 if (latch.getCount() == 0) {
365 break;
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800366 }
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700367 if (node.equals(localNode)) {
368 continue;
369 }
370 clusterCommunicator.sendAndReceive(app.id().name(),
Thomas Vachuskad0d58542015-06-03 12:38:44 -0700371 APP_BITS_REQUEST,
372 s -> s.getBytes(Charsets.UTF_8),
373 Function.identity(),
374 node.id())
375 .whenCompleteAsync((bits, error) -> {
376 if (error == null && latch.getCount() > 0) {
377 saveApplication(new ByteArrayInputStream(bits));
378 log.info("Downloaded bits for application {} from node {}",
379 app.id().name(), node.id());
380 latch.countDown();
381 } else if (error != null) {
382 log.warn("Unable to fetch bits for application {} from node {}",
383 app.id().name(), node.id());
384 }
385 }, executor);
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800386 }
387
388 try {
389 if (!latch.await(FETCH_TIMEOUT_MS, MILLISECONDS)) {
390 log.warn("Unable to fetch bits for application {}", app.id().name());
391 }
392 } catch (InterruptedException e) {
393 log.warn("Interrupted while fetching bits for application {}", app.id().name());
394 }
395 }
396
397 /**
398 * Responder to requests for application bits.
399 */
400 private class InternalBitServer implements ClusterMessageHandler {
401 @Override
402 public void handle(ClusterMessage message) {
Yuta HIGUCHI80292052015-02-10 23:11:59 -0800403 String name = new String(message.payload(), Charsets.UTF_8);
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800404 try {
405 message.respond(toByteArray(getApplicationInputStream(name)));
406 } catch (Exception e) {
407 log.debug("Unable to read bits for application {}", name);
408 }
409 }
410 }
Thomas Vachuskad0d58542015-06-03 12:38:44 -0700411
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800412 /**
413 * Prunes applications which are not in the map, but are on disk.
414 */
415 private void pruneUninstalledApps() {
416 for (String name : getApplicationNames()) {
417 if (getApplication(getId(name)) == null) {
418 Application app = registerApp(getApplicationDescription(name));
419 delegate.notify(new ApplicationEvent(APP_UNINSTALLED, app));
420 purgeApplication(app.id().name());
421 }
422 }
423 }
424
425 /**
426 * Produces a registered application from the supplied description.
427 */
428 private Application registerApp(ApplicationDescription appDesc) {
429 ApplicationId appId = idStore.registerApplication(appDesc.name());
430 return new DefaultApplication(appId, appDesc.version(), appDesc.description(),
Changhoon Yoonbdeb88a2015-05-12 20:35:31 +0900431 appDesc.origin(), appDesc.role(), appDesc.permissions(),
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800432 appDesc.featuresRepo(), appDesc.features());
433 }
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800434}