blob: 39fddd802c8ceb5dd7bc2c9d77673a5be5e3954f [file] [log] [blame]
Thomas Vachuska90b453f2015-01-30 18:57:14 -08001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.app;
17
18import com.google.common.collect.ImmutableSet;
19import com.google.common.util.concurrent.ListenableFuture;
20import org.apache.felix.scr.annotations.Activate;
21import org.apache.felix.scr.annotations.Component;
22import org.apache.felix.scr.annotations.Deactivate;
23import org.apache.felix.scr.annotations.Reference;
24import org.apache.felix.scr.annotations.ReferenceCardinality;
25import org.apache.felix.scr.annotations.Service;
26import org.onlab.util.KryoNamespace;
27import org.onosproject.app.ApplicationDescription;
28import org.onosproject.app.ApplicationEvent;
29import org.onosproject.app.ApplicationException;
30import org.onosproject.app.ApplicationState;
31import org.onosproject.app.ApplicationStore;
32import org.onosproject.cluster.ClusterService;
33import org.onosproject.cluster.ControllerNode;
34import org.onosproject.common.app.ApplicationArchive;
35import org.onosproject.core.Application;
36import org.onosproject.core.ApplicationId;
37import org.onosproject.core.ApplicationIdStore;
38import org.onosproject.core.DefaultApplication;
39import org.onosproject.core.Permission;
40import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
41import org.onosproject.store.cluster.messaging.ClusterMessage;
42import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
43import org.onosproject.store.cluster.messaging.MessageSubject;
44import org.onosproject.store.impl.EventuallyConsistentMap;
45import org.onosproject.store.impl.EventuallyConsistentMapEvent;
46import org.onosproject.store.impl.EventuallyConsistentMapImpl;
47import org.onosproject.store.impl.EventuallyConsistentMapListener;
48import org.onosproject.store.impl.WallclockClockManager;
49import org.onosproject.store.serializers.KryoNamespaces;
50import org.slf4j.Logger;
51
52import java.io.ByteArrayInputStream;
53import java.io.IOException;
54import java.io.InputStream;
55import java.util.HashMap;
56import java.util.Map;
57import java.util.Set;
58import java.util.concurrent.CountDownLatch;
59import java.util.concurrent.Executors;
60import java.util.concurrent.ScheduledExecutorService;
61
62import static com.google.common.io.ByteStreams.toByteArray;
63import static java.util.concurrent.TimeUnit.MILLISECONDS;
64import static org.onlab.util.Tools.namedThreads;
65import static org.onosproject.app.ApplicationEvent.Type.*;
66import static org.onosproject.store.app.GossipApplicationStore.InternalState.*;
67import static org.onosproject.store.impl.EventuallyConsistentMapEvent.Type.PUT;
68import static org.onosproject.store.impl.EventuallyConsistentMapEvent.Type.REMOVE;
69import static org.slf4j.LoggerFactory.getLogger;
70
71/**
72 * Manages inventory of applications in a distributed data store that uses
73 * optimistic replication and gossip based anti-entropy techniques.
74 */
75@Component(immediate = true)
76@Service
77public class GossipApplicationStore extends ApplicationArchive
78 implements ApplicationStore {
79
80 private final Logger log = getLogger(getClass());
81
82 private static final MessageSubject APP_BITS_REQUEST = new MessageSubject("app-bits-request");
83
84 private static final int FETCH_TIMEOUT_MS = 10_000;
85 private static final int LOAD_TIMEOUT_MS = 5_000;
86
87 public enum InternalState {
88 INSTALLED, ACTIVATED, DEACTIVATED
89 }
90
91 private final ScheduledExecutorService executor =
92 Executors.newSingleThreadScheduledExecutor(namedThreads("onos-app-store"));
93
94 private EventuallyConsistentMap<ApplicationId, Application> apps;
95 private EventuallyConsistentMap<Application, InternalState> states;
96 private EventuallyConsistentMap<Application, Set<Permission>> permissions;
97
98 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
99 protected ClusterCommunicationService clusterCommunicator;
100
101 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
102 protected ClusterService clusterService;
103
104 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
105 protected ApplicationIdStore idStore;
106
107 @Activate
108 public void activate() {
109 KryoNamespace.Builder intentSerializer = KryoNamespace.newBuilder()
110 .register(KryoNamespaces.API)
111 .register(InternalState.class);
112
113 clusterCommunicator.addSubscriber(APP_BITS_REQUEST, new InternalBitServer());
114
115 apps = new EventuallyConsistentMapImpl<>("apps", clusterService,
116 clusterCommunicator,
117 intentSerializer,
118 new WallclockClockManager<>());
119
120 states = new EventuallyConsistentMapImpl<>("app-states",
121 clusterService,
122 clusterCommunicator,
123 intentSerializer,
124 new WallclockClockManager<>());
125 states.addListener(new InternalAppStatesListener());
126
127 permissions = new EventuallyConsistentMapImpl<>("app-permissions",
128 clusterService,
129 clusterCommunicator,
130 intentSerializer,
131 new WallclockClockManager<>());
132
133 // FIXME: figure out load from disk; this will require resolving the dual authority problem
134
135 executor.schedule(this::pruneUninstalledApps, LOAD_TIMEOUT_MS, MILLISECONDS);
136
137 log.info("Started");
138 }
139
140 private void loadFromDisk() {
141 for (String name : getApplicationNames()) {
142 create(getApplicationDescription(name));
143 // load app permissions
144 }
145 }
146
147 @Deactivate
148 public void deactivate() {
149 apps.destroy();
150 states.destroy();
151 permissions.destroy();
152 log.info("Stopped");
153 }
154
155 @Override
156 public Set<Application> getApplications() {
157 return ImmutableSet.copyOf(apps.values());
158 }
159
160 @Override
161 public ApplicationId getId(String name) {
162 return idStore.getAppId(name);
163 }
164
165 @Override
166 public Application getApplication(ApplicationId appId) {
167 return apps.get(appId);
168 }
169
170 @Override
171 public ApplicationState getState(ApplicationId appId) {
172 Application app = apps.get(appId);
173 InternalState s = app == null ? null : states.get(app);
174 return s == null ? null : s == ACTIVATED ?
175 ApplicationState.ACTIVE : ApplicationState.INSTALLED;
176 }
177
178 @Override
179 public Application create(InputStream appDescStream) {
180 ApplicationDescription appDesc = saveApplication(appDescStream);
181 return create(appDesc);
182 }
183
184 private Application create(ApplicationDescription appDesc) {
185 Application app = registerApp(appDesc);
186 apps.put(app.id(), app);
187 states.put(app, INSTALLED);
188 return app;
189 }
190
191 @Override
192 public void remove(ApplicationId appId) {
193 Application app = apps.get(appId);
194 if (app != null) {
195 apps.remove(appId);
196 states.remove(app);
197 permissions.remove(app);
198 }
199 }
200
201 @Override
202 public void activate(ApplicationId appId) {
203 Application app = apps.get(appId);
204 if (app != null) {
205 states.put(app, ACTIVATED);
206 }
207 }
208
209 @Override
210 public void deactivate(ApplicationId appId) {
211 Application app = apps.get(appId);
212 if (app != null) {
213 states.put(app, DEACTIVATED);
214 }
215 }
216
217 @Override
218 public Set<Permission> getPermissions(ApplicationId appId) {
219 Application app = apps.get(appId);
220 return app != null ? permissions.get(app) : null;
221 }
222
223 @Override
224 public void setPermissions(ApplicationId appId, Set<Permission> permissions) {
225 Application app = getApplication(appId);
226 if (app != null) {
227 this.permissions.put(app, permissions);
228 delegate.notify(new ApplicationEvent(APP_PERMISSIONS_CHANGED, app));
229 }
230 }
231
232 /**
233 * Listener to application state distributed map changes.
234 */
235 private final class InternalAppStatesListener
236 implements EventuallyConsistentMapListener<Application, InternalState> {
237 @Override
238 public void event(EventuallyConsistentMapEvent<Application, InternalState> event) {
239 Application app = event.key();
240 InternalState state = event.value();
241
242 if (event.type() == PUT) {
243 if (state == INSTALLED) {
244 fetchBitsIfNeeded(app);
245 delegate.notify(new ApplicationEvent(APP_INSTALLED, app));
246
247 } else if (state == ACTIVATED) {
248 installAppIfNeeded(app);
249 setActive(app.id().name());
250 delegate.notify(new ApplicationEvent(APP_ACTIVATED, app));
251
252 } else if (state == DEACTIVATED) {
253 clearActive(app.id().name());
254 delegate.notify(new ApplicationEvent(APP_DEACTIVATED, app));
255 }
256 } else if (event.type() == REMOVE) {
257 delegate.notify(new ApplicationEvent(APP_UNINSTALLED, app));
258 purgeApplication(app.id().name());
259 }
260 }
261 }
262
263 /**
264 * Determines if the application bits are available locally.
265 */
266 private boolean appBitsAvailable(Application app) {
267 try {
268 ApplicationDescription appDesc = getApplicationDescription(app.id().name());
269 return appDesc.version().equals(app.version());
270 } catch (ApplicationException e) {
271 return false;
272 }
273 }
274
275 /**
276 * Fetches the bits from the cluster peers if necessary.
277 */
278 private void fetchBitsIfNeeded(Application app) {
279 if (!appBitsAvailable(app)) {
280 fetchBits(app);
281 }
282 }
283
284 /**
285 * Installs the application if necessary from the application peers.
286 */
287 private void installAppIfNeeded(Application app) {
288 if (!appBitsAvailable(app)) {
289 fetchBits(app);
290 delegate.notify(new ApplicationEvent(APP_INSTALLED, app));
291 }
292 }
293
294 /**
295 * Fetches the bits from the cluster peers.
296 */
297 private void fetchBits(Application app) {
298 ControllerNode localNode = clusterService.getLocalNode();
299 ClusterMessage message = new ClusterMessage(localNode.id(), APP_BITS_REQUEST,
300 app.id().name().getBytes());
301 Map<ControllerNode, ListenableFuture<byte[]>> futures = new HashMap<>();
302 CountDownLatch latch = new CountDownLatch(1);
303
304 // FIXME: send message with name & version to make sure we don't get served old bits
305
306 log.info("Downloading bits for application {}", app.id().name());
307 for (ControllerNode node : clusterService.getNodes()) {
308 try {
309 ListenableFuture<byte[]> future = clusterCommunicator.sendAndReceive(message, node.id());
310 future.addListener(new InternalBitListener(app, node, future, latch), executor);
311 } catch (IOException e) {
312 log.debug("Unable to request bits for application {} from node {}",
313 app.id().name(), node.id());
314 }
315 }
316
317 try {
318 if (!latch.await(FETCH_TIMEOUT_MS, MILLISECONDS)) {
319 log.warn("Unable to fetch bits for application {}", app.id().name());
320 }
321 } catch (InterruptedException e) {
322 log.warn("Interrupted while fetching bits for application {}", app.id().name());
323 }
324 }
325
326 /**
327 * Responder to requests for application bits.
328 */
329 private class InternalBitServer implements ClusterMessageHandler {
330 @Override
331 public void handle(ClusterMessage message) {
332 String name = new String(message.payload());
333 try {
334 message.respond(toByteArray(getApplicationInputStream(name)));
335 } catch (Exception e) {
336 log.debug("Unable to read bits for application {}", name);
337 }
338 }
339 }
340
341 /**
342 * Processes completed fetch requests.
343 */
344 private class InternalBitListener implements Runnable {
345 private final Application app;
346 private final ControllerNode node;
347 private final ListenableFuture<byte[]> future;
348 private final CountDownLatch latch;
349
350 public InternalBitListener(Application app, ControllerNode node,
351 ListenableFuture<byte[]> future, CountDownLatch latch) {
352 this.app = app;
353 this.node = node;
354 this.future = future;
355 this.latch = latch;
356 }
357
358 @Override
359 public void run() {
360 if (latch.getCount() > 0 && !future.isCancelled()) {
361 try {
362 byte[] bits = future.get(1, MILLISECONDS);
363 saveApplication(new ByteArrayInputStream(bits));
364 log.info("Downloaded bits for application {} from node {}",
365 app.id().name(), node.id());
366 latch.countDown();
367 } catch (Exception e) {
368 log.warn("Unable to fetch bits for application {} from node {}",
369 app.id().name(), node.id());
370 }
371 }
372 }
373 }
374
375 /**
376 * Prunes applications which are not in the map, but are on disk.
377 */
378 private void pruneUninstalledApps() {
379 for (String name : getApplicationNames()) {
380 if (getApplication(getId(name)) == null) {
381 Application app = registerApp(getApplicationDescription(name));
382 delegate.notify(new ApplicationEvent(APP_UNINSTALLED, app));
383 purgeApplication(app.id().name());
384 }
385 }
386 }
387
388 /**
389 * Produces a registered application from the supplied description.
390 */
391 private Application registerApp(ApplicationDescription appDesc) {
392 ApplicationId appId = idStore.registerApplication(appDesc.name());
393 return new DefaultApplication(appId, appDesc.version(), appDesc.description(),
394 appDesc.origin(), appDesc.permissions(),
395 appDesc.featuresRepo(), appDesc.features());
396 }
397}
398