blob: d8ce7d561bd003c05b78bd8e48e659d926f87abf [file] [log] [blame]
Thomas Vachuska90b453f2015-01-30 18:57:14 -08001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.app;
17
Yuta HIGUCHI80292052015-02-10 23:11:59 -080018import com.google.common.base.Charsets;
Thomas Vachuska90b453f2015-01-30 18:57:14 -080019import com.google.common.collect.ImmutableSet;
20import com.google.common.util.concurrent.ListenableFuture;
21import org.apache.felix.scr.annotations.Activate;
22import org.apache.felix.scr.annotations.Component;
23import org.apache.felix.scr.annotations.Deactivate;
24import org.apache.felix.scr.annotations.Reference;
25import org.apache.felix.scr.annotations.ReferenceCardinality;
26import org.apache.felix.scr.annotations.Service;
27import org.onlab.util.KryoNamespace;
28import org.onosproject.app.ApplicationDescription;
29import org.onosproject.app.ApplicationEvent;
30import org.onosproject.app.ApplicationException;
31import org.onosproject.app.ApplicationState;
32import org.onosproject.app.ApplicationStore;
Thomas Vachuskacf960112015-03-06 22:36:51 -080033import org.onosproject.app.ApplicationStoreDelegate;
Thomas Vachuska90b453f2015-01-30 18:57:14 -080034import org.onosproject.cluster.ClusterService;
35import org.onosproject.cluster.ControllerNode;
36import org.onosproject.common.app.ApplicationArchive;
37import org.onosproject.core.Application;
38import org.onosproject.core.ApplicationId;
39import org.onosproject.core.ApplicationIdStore;
40import org.onosproject.core.DefaultApplication;
41import org.onosproject.core.Permission;
42import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
43import org.onosproject.store.cluster.messaging.ClusterMessage;
44import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
45import org.onosproject.store.cluster.messaging.MessageSubject;
Jonathan Hart77bdd262015-02-03 09:07:48 -080046import org.onosproject.store.ecmap.EventuallyConsistentMap;
47import org.onosproject.store.ecmap.EventuallyConsistentMapEvent;
48import org.onosproject.store.ecmap.EventuallyConsistentMapImpl;
49import org.onosproject.store.ecmap.EventuallyConsistentMapListener;
Thomas Vachuskacf960112015-03-06 22:36:51 -080050import org.onosproject.store.impl.ClockService;
51import org.onosproject.store.impl.MultiValuedTimestamp;
Thomas Vachuska90b453f2015-01-30 18:57:14 -080052import org.onosproject.store.impl.WallclockClockManager;
53import org.onosproject.store.serializers.KryoNamespaces;
54import org.slf4j.Logger;
55
56import java.io.ByteArrayInputStream;
57import java.io.IOException;
58import java.io.InputStream;
Thomas Vachuska90b453f2015-01-30 18:57:14 -080059import java.util.Set;
60import java.util.concurrent.CountDownLatch;
Madan Jampani2af244a2015-02-22 13:12:01 -080061import java.util.concurrent.ExecutorService;
Thomas Vachuska90b453f2015-01-30 18:57:14 -080062import java.util.concurrent.Executors;
63import java.util.concurrent.ScheduledExecutorService;
Thomas Vachuskacf960112015-03-06 22:36:51 -080064import java.util.concurrent.atomic.AtomicLong;
Thomas Vachuska90b453f2015-01-30 18:57:14 -080065
66import static com.google.common.io.ByteStreams.toByteArray;
67import static java.util.concurrent.TimeUnit.MILLISECONDS;
Thomas Vachuska6f94ded2015-02-21 14:02:38 -080068import static org.onlab.util.Tools.groupedThreads;
Thomas Vachuska90b453f2015-01-30 18:57:14 -080069import static org.onosproject.app.ApplicationEvent.Type.*;
Thomas Vachuska6f94ded2015-02-21 14:02:38 -080070import static org.onosproject.store.app.GossipApplicationStore.InternalState.*;
Jonathan Hart77bdd262015-02-03 09:07:48 -080071import static org.onosproject.store.ecmap.EventuallyConsistentMapEvent.Type.PUT;
72import static org.onosproject.store.ecmap.EventuallyConsistentMapEvent.Type.REMOVE;
Thomas Vachuska90b453f2015-01-30 18:57:14 -080073import static org.slf4j.LoggerFactory.getLogger;
74
75/**
76 * Manages inventory of applications in a distributed data store that uses
77 * optimistic replication and gossip based anti-entropy techniques.
78 */
79@Component(immediate = true)
80@Service
81public class GossipApplicationStore extends ApplicationArchive
82 implements ApplicationStore {
83
84 private final Logger log = getLogger(getClass());
85
86 private static final MessageSubject APP_BITS_REQUEST = new MessageSubject("app-bits-request");
87
88 private static final int FETCH_TIMEOUT_MS = 10_000;
89 private static final int LOAD_TIMEOUT_MS = 5_000;
90
91 public enum InternalState {
92 INSTALLED, ACTIVATED, DEACTIVATED
93 }
94
Madan Jampani6b5b7172015-02-23 13:02:26 -080095 private ScheduledExecutorService executor;
Madan Jampani2af244a2015-02-22 13:12:01 -080096 private ExecutorService messageHandlingExecutor;
97
Thomas Vachuska90b453f2015-01-30 18:57:14 -080098 private EventuallyConsistentMap<ApplicationId, Application> apps;
99 private EventuallyConsistentMap<Application, InternalState> states;
100 private EventuallyConsistentMap<Application, Set<Permission>> permissions;
101
102 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
103 protected ClusterCommunicationService clusterCommunicator;
104
105 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
106 protected ClusterService clusterService;
107
108 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
109 protected ApplicationIdStore idStore;
110
Thomas Vachuskacf960112015-03-06 22:36:51 -0800111 private final AtomicLong sequence = new AtomicLong();
112
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800113 @Activate
114 public void activate() {
Thomas Vachuskacf960112015-03-06 22:36:51 -0800115 KryoNamespace.Builder serializer = KryoNamespace.newBuilder()
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800116 .register(KryoNamespaces.API)
Thomas Vachuskacf960112015-03-06 22:36:51 -0800117 .register(MultiValuedTimestamp.class)
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800118 .register(InternalState.class);
119
Madan Jampani6b5b7172015-02-23 13:02:26 -0800120 executor = Executors.newSingleThreadScheduledExecutor(groupedThreads("onos/app", "store"));
121
Madan Jampani2af244a2015-02-22 13:12:01 -0800122 messageHandlingExecutor = Executors.newSingleThreadExecutor(
123 groupedThreads("onos/store/app", "message-handler"));
124
125 clusterCommunicator.addSubscriber(APP_BITS_REQUEST, new InternalBitServer(), messageHandlingExecutor);
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800126
Thomas Vachuskacf960112015-03-06 22:36:51 -0800127 // FIXME: Consider consolidating into a single map.
128
129 ClockService<ApplicationId, Application> appsClockService = (appId, app) ->
130 new MultiValuedTimestamp<>(getUpdateTime(appId.name()),
131 sequence.incrementAndGet());
132
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800133 apps = new EventuallyConsistentMapImpl<>("apps", clusterService,
134 clusterCommunicator,
Thomas Vachuskacf960112015-03-06 22:36:51 -0800135 serializer,
136 appsClockService);
137
138 ClockService<Application, InternalState> statesClockService = (app, state) ->
139 new MultiValuedTimestamp<>(getUpdateTime(app.id().name()),
140 sequence.incrementAndGet());
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800141
142 states = new EventuallyConsistentMapImpl<>("app-states",
143 clusterService,
144 clusterCommunicator,
Thomas Vachuskacf960112015-03-06 22:36:51 -0800145 serializer,
146 statesClockService);
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800147 states.addListener(new InternalAppStatesListener());
148
149 permissions = new EventuallyConsistentMapImpl<>("app-permissions",
150 clusterService,
151 clusterCommunicator,
Thomas Vachuskacf960112015-03-06 22:36:51 -0800152 serializer,
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800153 new WallclockClockManager<>());
154
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800155 log.info("Started");
156 }
157
Thomas Vachuskacf960112015-03-06 22:36:51 -0800158 /**
159 * Loads the application inventory from the disk and activates apps if
160 * they are marked to be active.
161 */
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800162 private void loadFromDisk() {
163 for (String name : getApplicationNames()) {
Thomas Vachuskacf960112015-03-06 22:36:51 -0800164 Application app = create(getApplicationDescription(name));
165 if (app != null && isActive(app.id().name())) {
166 activate(app.id());
167 // load app permissions
168 }
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800169 }
170 }
171
172 @Deactivate
173 public void deactivate() {
Madan Jampani2af244a2015-02-22 13:12:01 -0800174 clusterCommunicator.removeSubscriber(APP_BITS_REQUEST);
175 messageHandlingExecutor.shutdown();
Madan Jampani6b5b7172015-02-23 13:02:26 -0800176 executor.shutdown();
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800177 apps.destroy();
178 states.destroy();
179 permissions.destroy();
180 log.info("Stopped");
181 }
182
183 @Override
Thomas Vachuskacf960112015-03-06 22:36:51 -0800184 public void setDelegate(ApplicationStoreDelegate delegate) {
185 super.setDelegate(delegate);
186 loadFromDisk();
187// executor.schedule(this::pruneUninstalledApps, LOAD_TIMEOUT_MS, MILLISECONDS);
188 }
189
190 @Override
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800191 public Set<Application> getApplications() {
192 return ImmutableSet.copyOf(apps.values());
193 }
194
195 @Override
196 public ApplicationId getId(String name) {
197 return idStore.getAppId(name);
198 }
199
200 @Override
201 public Application getApplication(ApplicationId appId) {
202 return apps.get(appId);
203 }
204
205 @Override
206 public ApplicationState getState(ApplicationId appId) {
207 Application app = apps.get(appId);
208 InternalState s = app == null ? null : states.get(app);
209 return s == null ? null : s == ACTIVATED ?
210 ApplicationState.ACTIVE : ApplicationState.INSTALLED;
211 }
212
213 @Override
214 public Application create(InputStream appDescStream) {
215 ApplicationDescription appDesc = saveApplication(appDescStream);
216 return create(appDesc);
217 }
218
219 private Application create(ApplicationDescription appDesc) {
220 Application app = registerApp(appDesc);
221 apps.put(app.id(), app);
222 states.put(app, INSTALLED);
223 return app;
224 }
225
226 @Override
227 public void remove(ApplicationId appId) {
228 Application app = apps.get(appId);
229 if (app != null) {
230 apps.remove(appId);
231 states.remove(app);
232 permissions.remove(app);
233 }
234 }
235
236 @Override
237 public void activate(ApplicationId appId) {
238 Application app = apps.get(appId);
239 if (app != null) {
240 states.put(app, ACTIVATED);
241 }
242 }
243
244 @Override
245 public void deactivate(ApplicationId appId) {
246 Application app = apps.get(appId);
247 if (app != null) {
248 states.put(app, DEACTIVATED);
249 }
250 }
251
252 @Override
253 public Set<Permission> getPermissions(ApplicationId appId) {
254 Application app = apps.get(appId);
255 return app != null ? permissions.get(app) : null;
256 }
257
258 @Override
259 public void setPermissions(ApplicationId appId, Set<Permission> permissions) {
260 Application app = getApplication(appId);
261 if (app != null) {
262 this.permissions.put(app, permissions);
263 delegate.notify(new ApplicationEvent(APP_PERMISSIONS_CHANGED, app));
264 }
265 }
266
267 /**
268 * Listener to application state distributed map changes.
269 */
270 private final class InternalAppStatesListener
271 implements EventuallyConsistentMapListener<Application, InternalState> {
272 @Override
273 public void event(EventuallyConsistentMapEvent<Application, InternalState> event) {
274 Application app = event.key();
275 InternalState state = event.value();
276
277 if (event.type() == PUT) {
278 if (state == INSTALLED) {
279 fetchBitsIfNeeded(app);
280 delegate.notify(new ApplicationEvent(APP_INSTALLED, app));
281
282 } else if (state == ACTIVATED) {
283 installAppIfNeeded(app);
284 setActive(app.id().name());
285 delegate.notify(new ApplicationEvent(APP_ACTIVATED, app));
286
287 } else if (state == DEACTIVATED) {
288 clearActive(app.id().name());
289 delegate.notify(new ApplicationEvent(APP_DEACTIVATED, app));
290 }
291 } else if (event.type() == REMOVE) {
292 delegate.notify(new ApplicationEvent(APP_UNINSTALLED, app));
293 purgeApplication(app.id().name());
294 }
295 }
296 }
297
298 /**
299 * Determines if the application bits are available locally.
300 */
301 private boolean appBitsAvailable(Application app) {
302 try {
303 ApplicationDescription appDesc = getApplicationDescription(app.id().name());
304 return appDesc.version().equals(app.version());
305 } catch (ApplicationException e) {
306 return false;
307 }
308 }
309
310 /**
311 * Fetches the bits from the cluster peers if necessary.
312 */
313 private void fetchBitsIfNeeded(Application app) {
314 if (!appBitsAvailable(app)) {
315 fetchBits(app);
316 }
317 }
318
319 /**
320 * Installs the application if necessary from the application peers.
321 */
322 private void installAppIfNeeded(Application app) {
323 if (!appBitsAvailable(app)) {
324 fetchBits(app);
325 delegate.notify(new ApplicationEvent(APP_INSTALLED, app));
326 }
327 }
328
329 /**
330 * Fetches the bits from the cluster peers.
331 */
332 private void fetchBits(Application app) {
333 ControllerNode localNode = clusterService.getLocalNode();
334 ClusterMessage message = new ClusterMessage(localNode.id(), APP_BITS_REQUEST,
Yuta HIGUCHI80292052015-02-10 23:11:59 -0800335 app.id().name().getBytes(Charsets.UTF_8));
336 //Map<ControllerNode, ListenableFuture<byte[]>> futures = new HashMap<>();
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800337 CountDownLatch latch = new CountDownLatch(1);
338
339 // FIXME: send message with name & version to make sure we don't get served old bits
340
341 log.info("Downloading bits for application {}", app.id().name());
342 for (ControllerNode node : clusterService.getNodes()) {
343 try {
344 ListenableFuture<byte[]> future = clusterCommunicator.sendAndReceive(message, node.id());
345 future.addListener(new InternalBitListener(app, node, future, latch), executor);
346 } catch (IOException e) {
347 log.debug("Unable to request bits for application {} from node {}",
348 app.id().name(), node.id());
349 }
350 }
351
352 try {
353 if (!latch.await(FETCH_TIMEOUT_MS, MILLISECONDS)) {
354 log.warn("Unable to fetch bits for application {}", app.id().name());
355 }
356 } catch (InterruptedException e) {
357 log.warn("Interrupted while fetching bits for application {}", app.id().name());
358 }
359 }
360
361 /**
362 * Responder to requests for application bits.
363 */
364 private class InternalBitServer implements ClusterMessageHandler {
365 @Override
366 public void handle(ClusterMessage message) {
Yuta HIGUCHI80292052015-02-10 23:11:59 -0800367 String name = new String(message.payload(), Charsets.UTF_8);
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800368 try {
369 message.respond(toByteArray(getApplicationInputStream(name)));
370 } catch (Exception e) {
371 log.debug("Unable to read bits for application {}", name);
372 }
373 }
374 }
375
376 /**
377 * Processes completed fetch requests.
378 */
379 private class InternalBitListener implements Runnable {
380 private final Application app;
381 private final ControllerNode node;
382 private final ListenableFuture<byte[]> future;
383 private final CountDownLatch latch;
384
385 public InternalBitListener(Application app, ControllerNode node,
386 ListenableFuture<byte[]> future, CountDownLatch latch) {
387 this.app = app;
388 this.node = node;
389 this.future = future;
390 this.latch = latch;
391 }
392
393 @Override
394 public void run() {
395 if (latch.getCount() > 0 && !future.isCancelled()) {
396 try {
397 byte[] bits = future.get(1, MILLISECONDS);
398 saveApplication(new ByteArrayInputStream(bits));
399 log.info("Downloaded bits for application {} from node {}",
400 app.id().name(), node.id());
401 latch.countDown();
402 } catch (Exception e) {
403 log.warn("Unable to fetch bits for application {} from node {}",
404 app.id().name(), node.id());
405 }
406 }
407 }
408 }
409
410 /**
411 * Prunes applications which are not in the map, but are on disk.
412 */
413 private void pruneUninstalledApps() {
414 for (String name : getApplicationNames()) {
415 if (getApplication(getId(name)) == null) {
416 Application app = registerApp(getApplicationDescription(name));
417 delegate.notify(new ApplicationEvent(APP_UNINSTALLED, app));
418 purgeApplication(app.id().name());
419 }
420 }
421 }
422
423 /**
424 * Produces a registered application from the supplied description.
425 */
426 private Application registerApp(ApplicationDescription appDesc) {
427 ApplicationId appId = idStore.registerApplication(appDesc.name());
428 return new DefaultApplication(appId, appDesc.version(), appDesc.description(),
429 appDesc.origin(), appDesc.permissions(),
430 appDesc.featuresRepo(), appDesc.features());
431 }
Thomas Vachuskacf960112015-03-06 22:36:51 -0800432
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800433}
434