blob: 1c3c0987447b18cc2d544458785e616d092032b9 [file] [log] [blame]
Thomas Vachuska90b453f2015-01-30 18:57:14 -08001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.app;
17
Yuta HIGUCHI80292052015-02-10 23:11:59 -080018import com.google.common.base.Charsets;
Thomas Vachuska90b453f2015-01-30 18:57:14 -080019import com.google.common.collect.ImmutableSet;
20import com.google.common.util.concurrent.ListenableFuture;
Madan Jampani2af244a2015-02-22 13:12:01 -080021
Thomas Vachuska90b453f2015-01-30 18:57:14 -080022import org.apache.felix.scr.annotations.Activate;
23import org.apache.felix.scr.annotations.Component;
24import org.apache.felix.scr.annotations.Deactivate;
25import org.apache.felix.scr.annotations.Reference;
26import org.apache.felix.scr.annotations.ReferenceCardinality;
27import org.apache.felix.scr.annotations.Service;
28import org.onlab.util.KryoNamespace;
29import org.onosproject.app.ApplicationDescription;
30import org.onosproject.app.ApplicationEvent;
31import org.onosproject.app.ApplicationException;
32import org.onosproject.app.ApplicationState;
33import org.onosproject.app.ApplicationStore;
34import org.onosproject.cluster.ClusterService;
35import org.onosproject.cluster.ControllerNode;
36import org.onosproject.common.app.ApplicationArchive;
37import org.onosproject.core.Application;
38import org.onosproject.core.ApplicationId;
39import org.onosproject.core.ApplicationIdStore;
40import org.onosproject.core.DefaultApplication;
41import org.onosproject.core.Permission;
42import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
43import org.onosproject.store.cluster.messaging.ClusterMessage;
44import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
45import org.onosproject.store.cluster.messaging.MessageSubject;
Jonathan Hart77bdd262015-02-03 09:07:48 -080046import org.onosproject.store.ecmap.EventuallyConsistentMap;
47import org.onosproject.store.ecmap.EventuallyConsistentMapEvent;
48import org.onosproject.store.ecmap.EventuallyConsistentMapImpl;
49import org.onosproject.store.ecmap.EventuallyConsistentMapListener;
Thomas Vachuska90b453f2015-01-30 18:57:14 -080050import org.onosproject.store.impl.WallclockClockManager;
51import org.onosproject.store.serializers.KryoNamespaces;
52import org.slf4j.Logger;
53
54import java.io.ByteArrayInputStream;
55import java.io.IOException;
56import java.io.InputStream;
Thomas Vachuska90b453f2015-01-30 18:57:14 -080057import java.util.Set;
58import java.util.concurrent.CountDownLatch;
Madan Jampani2af244a2015-02-22 13:12:01 -080059import java.util.concurrent.ExecutorService;
Thomas Vachuska90b453f2015-01-30 18:57:14 -080060import java.util.concurrent.Executors;
61import java.util.concurrent.ScheduledExecutorService;
62
63import static com.google.common.io.ByteStreams.toByteArray;
64import static java.util.concurrent.TimeUnit.MILLISECONDS;
Thomas Vachuska6f94ded2015-02-21 14:02:38 -080065import static org.onlab.util.Tools.groupedThreads;
Thomas Vachuska90b453f2015-01-30 18:57:14 -080066import static org.onosproject.app.ApplicationEvent.Type.*;
Thomas Vachuska6f94ded2015-02-21 14:02:38 -080067import static org.onosproject.store.app.GossipApplicationStore.InternalState.*;
Jonathan Hart77bdd262015-02-03 09:07:48 -080068import static org.onosproject.store.ecmap.EventuallyConsistentMapEvent.Type.PUT;
69import static org.onosproject.store.ecmap.EventuallyConsistentMapEvent.Type.REMOVE;
Thomas Vachuska90b453f2015-01-30 18:57:14 -080070import static org.slf4j.LoggerFactory.getLogger;
71
72/**
73 * Manages inventory of applications in a distributed data store that uses
74 * optimistic replication and gossip based anti-entropy techniques.
75 */
76@Component(immediate = true)
77@Service
78public class GossipApplicationStore extends ApplicationArchive
79 implements ApplicationStore {
80
81 private final Logger log = getLogger(getClass());
82
83 private static final MessageSubject APP_BITS_REQUEST = new MessageSubject("app-bits-request");
84
85 private static final int FETCH_TIMEOUT_MS = 10_000;
86 private static final int LOAD_TIMEOUT_MS = 5_000;
87
88 public enum InternalState {
89 INSTALLED, ACTIVATED, DEACTIVATED
90 }
91
92 private final ScheduledExecutorService executor =
Thomas Vachuska6f94ded2015-02-21 14:02:38 -080093 Executors.newSingleThreadScheduledExecutor(groupedThreads("onos/app", "store"));
Thomas Vachuska90b453f2015-01-30 18:57:14 -080094
Madan Jampani2af244a2015-02-22 13:12:01 -080095 private ExecutorService messageHandlingExecutor;
96
Thomas Vachuska90b453f2015-01-30 18:57:14 -080097 private EventuallyConsistentMap<ApplicationId, Application> apps;
98 private EventuallyConsistentMap<Application, InternalState> states;
99 private EventuallyConsistentMap<Application, Set<Permission>> permissions;
100
101 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
102 protected ClusterCommunicationService clusterCommunicator;
103
104 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
105 protected ClusterService clusterService;
106
107 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
108 protected ApplicationIdStore idStore;
109
110 @Activate
111 public void activate() {
112 KryoNamespace.Builder intentSerializer = KryoNamespace.newBuilder()
113 .register(KryoNamespaces.API)
114 .register(InternalState.class);
115
Madan Jampani2af244a2015-02-22 13:12:01 -0800116 messageHandlingExecutor = Executors.newSingleThreadExecutor(
117 groupedThreads("onos/store/app", "message-handler"));
118
119 clusterCommunicator.addSubscriber(APP_BITS_REQUEST, new InternalBitServer(), messageHandlingExecutor);
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800120
121 apps = new EventuallyConsistentMapImpl<>("apps", clusterService,
122 clusterCommunicator,
123 intentSerializer,
124 new WallclockClockManager<>());
125
126 states = new EventuallyConsistentMapImpl<>("app-states",
127 clusterService,
128 clusterCommunicator,
129 intentSerializer,
130 new WallclockClockManager<>());
131 states.addListener(new InternalAppStatesListener());
132
133 permissions = new EventuallyConsistentMapImpl<>("app-permissions",
134 clusterService,
135 clusterCommunicator,
136 intentSerializer,
137 new WallclockClockManager<>());
138
139 // FIXME: figure out load from disk; this will require resolving the dual authority problem
140
141 executor.schedule(this::pruneUninstalledApps, LOAD_TIMEOUT_MS, MILLISECONDS);
142
143 log.info("Started");
144 }
145
146 private void loadFromDisk() {
147 for (String name : getApplicationNames()) {
148 create(getApplicationDescription(name));
149 // load app permissions
150 }
151 }
152
153 @Deactivate
154 public void deactivate() {
Madan Jampani2af244a2015-02-22 13:12:01 -0800155 clusterCommunicator.removeSubscriber(APP_BITS_REQUEST);
156 messageHandlingExecutor.shutdown();
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800157 apps.destroy();
158 states.destroy();
159 permissions.destroy();
160 log.info("Stopped");
161 }
162
163 @Override
164 public Set<Application> getApplications() {
165 return ImmutableSet.copyOf(apps.values());
166 }
167
168 @Override
169 public ApplicationId getId(String name) {
170 return idStore.getAppId(name);
171 }
172
173 @Override
174 public Application getApplication(ApplicationId appId) {
175 return apps.get(appId);
176 }
177
178 @Override
179 public ApplicationState getState(ApplicationId appId) {
180 Application app = apps.get(appId);
181 InternalState s = app == null ? null : states.get(app);
182 return s == null ? null : s == ACTIVATED ?
183 ApplicationState.ACTIVE : ApplicationState.INSTALLED;
184 }
185
186 @Override
187 public Application create(InputStream appDescStream) {
188 ApplicationDescription appDesc = saveApplication(appDescStream);
189 return create(appDesc);
190 }
191
192 private Application create(ApplicationDescription appDesc) {
193 Application app = registerApp(appDesc);
194 apps.put(app.id(), app);
195 states.put(app, INSTALLED);
196 return app;
197 }
198
199 @Override
200 public void remove(ApplicationId appId) {
201 Application app = apps.get(appId);
202 if (app != null) {
203 apps.remove(appId);
204 states.remove(app);
205 permissions.remove(app);
206 }
207 }
208
209 @Override
210 public void activate(ApplicationId appId) {
211 Application app = apps.get(appId);
212 if (app != null) {
213 states.put(app, ACTIVATED);
214 }
215 }
216
217 @Override
218 public void deactivate(ApplicationId appId) {
219 Application app = apps.get(appId);
220 if (app != null) {
221 states.put(app, DEACTIVATED);
222 }
223 }
224
225 @Override
226 public Set<Permission> getPermissions(ApplicationId appId) {
227 Application app = apps.get(appId);
228 return app != null ? permissions.get(app) : null;
229 }
230
231 @Override
232 public void setPermissions(ApplicationId appId, Set<Permission> permissions) {
233 Application app = getApplication(appId);
234 if (app != null) {
235 this.permissions.put(app, permissions);
236 delegate.notify(new ApplicationEvent(APP_PERMISSIONS_CHANGED, app));
237 }
238 }
239
240 /**
241 * Listener to application state distributed map changes.
242 */
243 private final class InternalAppStatesListener
244 implements EventuallyConsistentMapListener<Application, InternalState> {
245 @Override
246 public void event(EventuallyConsistentMapEvent<Application, InternalState> event) {
247 Application app = event.key();
248 InternalState state = event.value();
249
250 if (event.type() == PUT) {
251 if (state == INSTALLED) {
252 fetchBitsIfNeeded(app);
253 delegate.notify(new ApplicationEvent(APP_INSTALLED, app));
254
255 } else if (state == ACTIVATED) {
256 installAppIfNeeded(app);
257 setActive(app.id().name());
258 delegate.notify(new ApplicationEvent(APP_ACTIVATED, app));
259
260 } else if (state == DEACTIVATED) {
261 clearActive(app.id().name());
262 delegate.notify(new ApplicationEvent(APP_DEACTIVATED, app));
263 }
264 } else if (event.type() == REMOVE) {
265 delegate.notify(new ApplicationEvent(APP_UNINSTALLED, app));
266 purgeApplication(app.id().name());
267 }
268 }
269 }
270
271 /**
272 * Determines if the application bits are available locally.
273 */
274 private boolean appBitsAvailable(Application app) {
275 try {
276 ApplicationDescription appDesc = getApplicationDescription(app.id().name());
277 return appDesc.version().equals(app.version());
278 } catch (ApplicationException e) {
279 return false;
280 }
281 }
282
283 /**
284 * Fetches the bits from the cluster peers if necessary.
285 */
286 private void fetchBitsIfNeeded(Application app) {
287 if (!appBitsAvailable(app)) {
288 fetchBits(app);
289 }
290 }
291
292 /**
293 * Installs the application if necessary from the application peers.
294 */
295 private void installAppIfNeeded(Application app) {
296 if (!appBitsAvailable(app)) {
297 fetchBits(app);
298 delegate.notify(new ApplicationEvent(APP_INSTALLED, app));
299 }
300 }
301
302 /**
303 * Fetches the bits from the cluster peers.
304 */
305 private void fetchBits(Application app) {
306 ControllerNode localNode = clusterService.getLocalNode();
307 ClusterMessage message = new ClusterMessage(localNode.id(), APP_BITS_REQUEST,
Yuta HIGUCHI80292052015-02-10 23:11:59 -0800308 app.id().name().getBytes(Charsets.UTF_8));
309 //Map<ControllerNode, ListenableFuture<byte[]>> futures = new HashMap<>();
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800310 CountDownLatch latch = new CountDownLatch(1);
311
312 // FIXME: send message with name & version to make sure we don't get served old bits
313
314 log.info("Downloading bits for application {}", app.id().name());
315 for (ControllerNode node : clusterService.getNodes()) {
316 try {
317 ListenableFuture<byte[]> future = clusterCommunicator.sendAndReceive(message, node.id());
318 future.addListener(new InternalBitListener(app, node, future, latch), executor);
319 } catch (IOException e) {
320 log.debug("Unable to request bits for application {} from node {}",
321 app.id().name(), node.id());
322 }
323 }
324
325 try {
326 if (!latch.await(FETCH_TIMEOUT_MS, MILLISECONDS)) {
327 log.warn("Unable to fetch bits for application {}", app.id().name());
328 }
329 } catch (InterruptedException e) {
330 log.warn("Interrupted while fetching bits for application {}", app.id().name());
331 }
332 }
333
334 /**
335 * Responder to requests for application bits.
336 */
337 private class InternalBitServer implements ClusterMessageHandler {
338 @Override
339 public void handle(ClusterMessage message) {
Yuta HIGUCHI80292052015-02-10 23:11:59 -0800340 String name = new String(message.payload(), Charsets.UTF_8);
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800341 try {
342 message.respond(toByteArray(getApplicationInputStream(name)));
343 } catch (Exception e) {
344 log.debug("Unable to read bits for application {}", name);
345 }
346 }
347 }
348
349 /**
350 * Processes completed fetch requests.
351 */
352 private class InternalBitListener implements Runnable {
353 private final Application app;
354 private final ControllerNode node;
355 private final ListenableFuture<byte[]> future;
356 private final CountDownLatch latch;
357
358 public InternalBitListener(Application app, ControllerNode node,
359 ListenableFuture<byte[]> future, CountDownLatch latch) {
360 this.app = app;
361 this.node = node;
362 this.future = future;
363 this.latch = latch;
364 }
365
366 @Override
367 public void run() {
368 if (latch.getCount() > 0 && !future.isCancelled()) {
369 try {
370 byte[] bits = future.get(1, MILLISECONDS);
371 saveApplication(new ByteArrayInputStream(bits));
372 log.info("Downloaded bits for application {} from node {}",
373 app.id().name(), node.id());
374 latch.countDown();
375 } catch (Exception e) {
376 log.warn("Unable to fetch bits for application {} from node {}",
377 app.id().name(), node.id());
378 }
379 }
380 }
381 }
382
383 /**
384 * Prunes applications which are not in the map, but are on disk.
385 */
386 private void pruneUninstalledApps() {
387 for (String name : getApplicationNames()) {
388 if (getApplication(getId(name)) == null) {
389 Application app = registerApp(getApplicationDescription(name));
390 delegate.notify(new ApplicationEvent(APP_UNINSTALLED, app));
391 purgeApplication(app.id().name());
392 }
393 }
394 }
395
396 /**
397 * Produces a registered application from the supplied description.
398 */
399 private Application registerApp(ApplicationDescription appDesc) {
400 ApplicationId appId = idStore.registerApplication(appDesc.name());
401 return new DefaultApplication(appId, appDesc.version(), appDesc.description(),
402 appDesc.origin(), appDesc.permissions(),
403 appDesc.featuresRepo(), appDesc.features());
404 }
405}
406