blob: 935561d44011fc7c6ea027849148cc4f8821ec8e [file] [log] [blame]
Thomas Vachuska90b453f2015-01-30 18:57:14 -08001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.app;
17
Yuta HIGUCHI80292052015-02-10 23:11:59 -080018import com.google.common.base.Charsets;
Thomas Vachuska90b453f2015-01-30 18:57:14 -080019import com.google.common.collect.ImmutableSet;
20import com.google.common.util.concurrent.ListenableFuture;
21import org.apache.felix.scr.annotations.Activate;
22import org.apache.felix.scr.annotations.Component;
23import org.apache.felix.scr.annotations.Deactivate;
24import org.apache.felix.scr.annotations.Reference;
25import org.apache.felix.scr.annotations.ReferenceCardinality;
26import org.apache.felix.scr.annotations.Service;
27import org.onlab.util.KryoNamespace;
28import org.onosproject.app.ApplicationDescription;
29import org.onosproject.app.ApplicationEvent;
30import org.onosproject.app.ApplicationException;
31import org.onosproject.app.ApplicationState;
32import org.onosproject.app.ApplicationStore;
Thomas Vachuskacf960112015-03-06 22:36:51 -080033import org.onosproject.app.ApplicationStoreDelegate;
Thomas Vachuska90b453f2015-01-30 18:57:14 -080034import org.onosproject.cluster.ClusterService;
35import org.onosproject.cluster.ControllerNode;
36import org.onosproject.common.app.ApplicationArchive;
37import org.onosproject.core.Application;
38import org.onosproject.core.ApplicationId;
39import org.onosproject.core.ApplicationIdStore;
40import org.onosproject.core.DefaultApplication;
41import org.onosproject.core.Permission;
42import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
43import org.onosproject.store.cluster.messaging.ClusterMessage;
44import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
45import org.onosproject.store.cluster.messaging.MessageSubject;
Jonathan Hart77bdd262015-02-03 09:07:48 -080046import org.onosproject.store.ecmap.EventuallyConsistentMap;
47import org.onosproject.store.ecmap.EventuallyConsistentMapEvent;
48import org.onosproject.store.ecmap.EventuallyConsistentMapImpl;
49import org.onosproject.store.ecmap.EventuallyConsistentMapListener;
Thomas Vachuskacf960112015-03-06 22:36:51 -080050import org.onosproject.store.impl.ClockService;
51import org.onosproject.store.impl.MultiValuedTimestamp;
Thomas Vachuska90b453f2015-01-30 18:57:14 -080052import org.onosproject.store.impl.WallclockClockManager;
53import org.onosproject.store.serializers.KryoNamespaces;
54import org.slf4j.Logger;
55
56import java.io.ByteArrayInputStream;
57import java.io.IOException;
58import java.io.InputStream;
Thomas Vachuska90b453f2015-01-30 18:57:14 -080059import java.util.Set;
60import java.util.concurrent.CountDownLatch;
Madan Jampani2af244a2015-02-22 13:12:01 -080061import java.util.concurrent.ExecutorService;
Thomas Vachuska90b453f2015-01-30 18:57:14 -080062import java.util.concurrent.Executors;
63import java.util.concurrent.ScheduledExecutorService;
Thomas Vachuskacf960112015-03-06 22:36:51 -080064import java.util.concurrent.atomic.AtomicLong;
Thomas Vachuska90b453f2015-01-30 18:57:14 -080065
66import static com.google.common.io.ByteStreams.toByteArray;
67import static java.util.concurrent.TimeUnit.MILLISECONDS;
Thomas Vachuska6f94ded2015-02-21 14:02:38 -080068import static org.onlab.util.Tools.groupedThreads;
Thomas Vachuska90b453f2015-01-30 18:57:14 -080069import static org.onosproject.app.ApplicationEvent.Type.*;
Thomas Vachuska6f94ded2015-02-21 14:02:38 -080070import static org.onosproject.store.app.GossipApplicationStore.InternalState.*;
Jonathan Hart77bdd262015-02-03 09:07:48 -080071import static org.onosproject.store.ecmap.EventuallyConsistentMapEvent.Type.PUT;
72import static org.onosproject.store.ecmap.EventuallyConsistentMapEvent.Type.REMOVE;
Thomas Vachuska90b453f2015-01-30 18:57:14 -080073import static org.slf4j.LoggerFactory.getLogger;
74
75/**
76 * Manages inventory of applications in a distributed data store that uses
77 * optimistic replication and gossip based anti-entropy techniques.
78 */
79@Component(immediate = true)
80@Service
81public class GossipApplicationStore extends ApplicationArchive
82 implements ApplicationStore {
83
84 private final Logger log = getLogger(getClass());
85
86 private static final MessageSubject APP_BITS_REQUEST = new MessageSubject("app-bits-request");
87
88 private static final int FETCH_TIMEOUT_MS = 10_000;
89 private static final int LOAD_TIMEOUT_MS = 5_000;
90
91 public enum InternalState {
92 INSTALLED, ACTIVATED, DEACTIVATED
93 }
94
Madan Jampani6b5b7172015-02-23 13:02:26 -080095 private ScheduledExecutorService executor;
Madan Jampani2af244a2015-02-22 13:12:01 -080096 private ExecutorService messageHandlingExecutor;
97
Thomas Vachuska90b453f2015-01-30 18:57:14 -080098 private EventuallyConsistentMap<ApplicationId, Application> apps;
99 private EventuallyConsistentMap<Application, InternalState> states;
100 private EventuallyConsistentMap<Application, Set<Permission>> permissions;
101
102 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
103 protected ClusterCommunicationService clusterCommunicator;
104
105 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
106 protected ClusterService clusterService;
107
108 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
109 protected ApplicationIdStore idStore;
110
Thomas Vachuskacf960112015-03-06 22:36:51 -0800111 private final AtomicLong sequence = new AtomicLong();
112
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800113 @Activate
114 public void activate() {
Thomas Vachuskacf960112015-03-06 22:36:51 -0800115 KryoNamespace.Builder serializer = KryoNamespace.newBuilder()
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800116 .register(KryoNamespaces.API)
Thomas Vachuskacf960112015-03-06 22:36:51 -0800117 .register(MultiValuedTimestamp.class)
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800118 .register(InternalState.class);
119
Madan Jampani6b5b7172015-02-23 13:02:26 -0800120 executor = Executors.newSingleThreadScheduledExecutor(groupedThreads("onos/app", "store"));
121
Madan Jampani2af244a2015-02-22 13:12:01 -0800122 messageHandlingExecutor = Executors.newSingleThreadExecutor(
123 groupedThreads("onos/store/app", "message-handler"));
124
125 clusterCommunicator.addSubscriber(APP_BITS_REQUEST, new InternalBitServer(), messageHandlingExecutor);
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800126
Thomas Vachuskacf960112015-03-06 22:36:51 -0800127 // FIXME: Consider consolidating into a single map.
128
129 ClockService<ApplicationId, Application> appsClockService = (appId, app) ->
130 new MultiValuedTimestamp<>(getUpdateTime(appId.name()),
131 sequence.incrementAndGet());
132
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800133 apps = new EventuallyConsistentMapImpl<>("apps", clusterService,
134 clusterCommunicator,
Thomas Vachuskacf960112015-03-06 22:36:51 -0800135 serializer,
136 appsClockService);
137
138 ClockService<Application, InternalState> statesClockService = (app, state) ->
139 new MultiValuedTimestamp<>(getUpdateTime(app.id().name()),
140 sequence.incrementAndGet());
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800141
142 states = new EventuallyConsistentMapImpl<>("app-states",
143 clusterService,
144 clusterCommunicator,
Thomas Vachuskacf960112015-03-06 22:36:51 -0800145 serializer,
146 statesClockService);
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800147 states.addListener(new InternalAppStatesListener());
148
149 permissions = new EventuallyConsistentMapImpl<>("app-permissions",
150 clusterService,
151 clusterCommunicator,
Thomas Vachuskacf960112015-03-06 22:36:51 -0800152 serializer,
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800153 new WallclockClockManager<>());
154
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800155 log.info("Started");
156 }
157
Thomas Vachuskacf960112015-03-06 22:36:51 -0800158 /**
159 * Loads the application inventory from the disk and activates apps if
160 * they are marked to be active.
161 */
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800162 private void loadFromDisk() {
163 for (String name : getApplicationNames()) {
Thomas Vachuskacf960112015-03-06 22:36:51 -0800164 Application app = create(getApplicationDescription(name));
165 if (app != null && isActive(app.id().name())) {
166 activate(app.id());
167 // load app permissions
168 }
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800169 }
170 }
171
172 @Deactivate
173 public void deactivate() {
Madan Jampani2af244a2015-02-22 13:12:01 -0800174 clusterCommunicator.removeSubscriber(APP_BITS_REQUEST);
175 messageHandlingExecutor.shutdown();
Madan Jampani6b5b7172015-02-23 13:02:26 -0800176 executor.shutdown();
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800177 apps.destroy();
178 states.destroy();
179 permissions.destroy();
180 log.info("Stopped");
181 }
182
183 @Override
Thomas Vachuskacf960112015-03-06 22:36:51 -0800184 public void setDelegate(ApplicationStoreDelegate delegate) {
185 super.setDelegate(delegate);
186 loadFromDisk();
187// executor.schedule(this::pruneUninstalledApps, LOAD_TIMEOUT_MS, MILLISECONDS);
188 }
189
190 @Override
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800191 public Set<Application> getApplications() {
192 return ImmutableSet.copyOf(apps.values());
193 }
194
195 @Override
196 public ApplicationId getId(String name) {
197 return idStore.getAppId(name);
198 }
199
200 @Override
201 public Application getApplication(ApplicationId appId) {
202 return apps.get(appId);
203 }
204
205 @Override
206 public ApplicationState getState(ApplicationId appId) {
207 Application app = apps.get(appId);
208 InternalState s = app == null ? null : states.get(app);
209 return s == null ? null : s == ACTIVATED ?
210 ApplicationState.ACTIVE : ApplicationState.INSTALLED;
211 }
212
213 @Override
214 public Application create(InputStream appDescStream) {
215 ApplicationDescription appDesc = saveApplication(appDescStream);
216 return create(appDesc);
217 }
218
219 private Application create(ApplicationDescription appDesc) {
220 Application app = registerApp(appDesc);
221 apps.put(app.id(), app);
222 states.put(app, INSTALLED);
223 return app;
224 }
225
226 @Override
227 public void remove(ApplicationId appId) {
228 Application app = apps.get(appId);
229 if (app != null) {
230 apps.remove(appId);
231 states.remove(app);
232 permissions.remove(app);
233 }
234 }
235
236 @Override
237 public void activate(ApplicationId appId) {
238 Application app = apps.get(appId);
239 if (app != null) {
240 states.put(app, ACTIVATED);
241 }
242 }
243
244 @Override
245 public void deactivate(ApplicationId appId) {
246 Application app = apps.get(appId);
247 if (app != null) {
248 states.put(app, DEACTIVATED);
249 }
250 }
251
252 @Override
253 public Set<Permission> getPermissions(ApplicationId appId) {
254 Application app = apps.get(appId);
255 return app != null ? permissions.get(app) : null;
256 }
257
258 @Override
259 public void setPermissions(ApplicationId appId, Set<Permission> permissions) {
260 Application app = getApplication(appId);
261 if (app != null) {
262 this.permissions.put(app, permissions);
263 delegate.notify(new ApplicationEvent(APP_PERMISSIONS_CHANGED, app));
264 }
265 }
266
267 /**
268 * Listener to application state distributed map changes.
269 */
270 private final class InternalAppStatesListener
271 implements EventuallyConsistentMapListener<Application, InternalState> {
272 @Override
273 public void event(EventuallyConsistentMapEvent<Application, InternalState> event) {
Thomas Vachuska4fcdae72015-03-24 10:22:54 -0700274 // If we do not have a delegate, refuse to process any events entirely.
275 // This is to allow the anti-entropy to kick in and process the events
276 // perhaps a bit later, but with opportunity to notify delegate.
277 if (delegate == null) {
278 return;
279 }
280
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800281 Application app = event.key();
282 InternalState state = event.value();
283
284 if (event.type() == PUT) {
285 if (state == INSTALLED) {
286 fetchBitsIfNeeded(app);
287 delegate.notify(new ApplicationEvent(APP_INSTALLED, app));
288
289 } else if (state == ACTIVATED) {
290 installAppIfNeeded(app);
291 setActive(app.id().name());
292 delegate.notify(new ApplicationEvent(APP_ACTIVATED, app));
293
294 } else if (state == DEACTIVATED) {
295 clearActive(app.id().name());
296 delegate.notify(new ApplicationEvent(APP_DEACTIVATED, app));
297 }
298 } else if (event.type() == REMOVE) {
299 delegate.notify(new ApplicationEvent(APP_UNINSTALLED, app));
300 purgeApplication(app.id().name());
301 }
302 }
303 }
304
305 /**
306 * Determines if the application bits are available locally.
307 */
308 private boolean appBitsAvailable(Application app) {
309 try {
310 ApplicationDescription appDesc = getApplicationDescription(app.id().name());
311 return appDesc.version().equals(app.version());
312 } catch (ApplicationException e) {
313 return false;
314 }
315 }
316
317 /**
318 * Fetches the bits from the cluster peers if necessary.
319 */
320 private void fetchBitsIfNeeded(Application app) {
321 if (!appBitsAvailable(app)) {
322 fetchBits(app);
323 }
324 }
325
326 /**
327 * Installs the application if necessary from the application peers.
328 */
329 private void installAppIfNeeded(Application app) {
330 if (!appBitsAvailable(app)) {
331 fetchBits(app);
332 delegate.notify(new ApplicationEvent(APP_INSTALLED, app));
333 }
334 }
335
336 /**
337 * Fetches the bits from the cluster peers.
338 */
339 private void fetchBits(Application app) {
340 ControllerNode localNode = clusterService.getLocalNode();
341 ClusterMessage message = new ClusterMessage(localNode.id(), APP_BITS_REQUEST,
Yuta HIGUCHI80292052015-02-10 23:11:59 -0800342 app.id().name().getBytes(Charsets.UTF_8));
343 //Map<ControllerNode, ListenableFuture<byte[]>> futures = new HashMap<>();
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800344 CountDownLatch latch = new CountDownLatch(1);
345
346 // FIXME: send message with name & version to make sure we don't get served old bits
347
348 log.info("Downloading bits for application {}", app.id().name());
349 for (ControllerNode node : clusterService.getNodes()) {
350 try {
351 ListenableFuture<byte[]> future = clusterCommunicator.sendAndReceive(message, node.id());
352 future.addListener(new InternalBitListener(app, node, future, latch), executor);
353 } catch (IOException e) {
354 log.debug("Unable to request bits for application {} from node {}",
355 app.id().name(), node.id());
356 }
357 }
358
359 try {
360 if (!latch.await(FETCH_TIMEOUT_MS, MILLISECONDS)) {
361 log.warn("Unable to fetch bits for application {}", app.id().name());
362 }
363 } catch (InterruptedException e) {
364 log.warn("Interrupted while fetching bits for application {}", app.id().name());
365 }
366 }
367
368 /**
369 * Responder to requests for application bits.
370 */
371 private class InternalBitServer implements ClusterMessageHandler {
372 @Override
373 public void handle(ClusterMessage message) {
Yuta HIGUCHI80292052015-02-10 23:11:59 -0800374 String name = new String(message.payload(), Charsets.UTF_8);
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800375 try {
376 message.respond(toByteArray(getApplicationInputStream(name)));
377 } catch (Exception e) {
378 log.debug("Unable to read bits for application {}", name);
379 }
380 }
381 }
382
383 /**
384 * Processes completed fetch requests.
385 */
386 private class InternalBitListener implements Runnable {
387 private final Application app;
388 private final ControllerNode node;
389 private final ListenableFuture<byte[]> future;
390 private final CountDownLatch latch;
391
392 public InternalBitListener(Application app, ControllerNode node,
393 ListenableFuture<byte[]> future, CountDownLatch latch) {
394 this.app = app;
395 this.node = node;
396 this.future = future;
397 this.latch = latch;
398 }
399
400 @Override
401 public void run() {
402 if (latch.getCount() > 0 && !future.isCancelled()) {
403 try {
404 byte[] bits = future.get(1, MILLISECONDS);
405 saveApplication(new ByteArrayInputStream(bits));
406 log.info("Downloaded bits for application {} from node {}",
407 app.id().name(), node.id());
408 latch.countDown();
409 } catch (Exception e) {
410 log.warn("Unable to fetch bits for application {} from node {}",
411 app.id().name(), node.id());
412 }
413 }
414 }
415 }
416
417 /**
418 * Prunes applications which are not in the map, but are on disk.
419 */
420 private void pruneUninstalledApps() {
421 for (String name : getApplicationNames()) {
422 if (getApplication(getId(name)) == null) {
423 Application app = registerApp(getApplicationDescription(name));
424 delegate.notify(new ApplicationEvent(APP_UNINSTALLED, app));
425 purgeApplication(app.id().name());
426 }
427 }
428 }
429
430 /**
431 * Produces a registered application from the supplied description.
432 */
433 private Application registerApp(ApplicationDescription appDesc) {
434 ApplicationId appId = idStore.registerApplication(appDesc.name());
435 return new DefaultApplication(appId, appDesc.version(), appDesc.description(),
436 appDesc.origin(), appDesc.permissions(),
437 appDesc.featuresRepo(), appDesc.features());
438 }
Thomas Vachuskacf960112015-03-06 22:36:51 -0800439
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800440}
441