blob: 0a5f8e7f6af9b901005684921099e174d5cfa5fa [file] [log] [blame]
Thomas Vachuska90b453f2015-01-30 18:57:14 -08001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.app;
17
Yuta HIGUCHI80292052015-02-10 23:11:59 -080018import com.google.common.base.Charsets;
Thomas Vachuska90b453f2015-01-30 18:57:14 -080019import com.google.common.collect.ImmutableSet;
20import com.google.common.util.concurrent.ListenableFuture;
21import org.apache.felix.scr.annotations.Activate;
22import org.apache.felix.scr.annotations.Component;
23import org.apache.felix.scr.annotations.Deactivate;
24import org.apache.felix.scr.annotations.Reference;
25import org.apache.felix.scr.annotations.ReferenceCardinality;
26import org.apache.felix.scr.annotations.Service;
27import org.onlab.util.KryoNamespace;
28import org.onosproject.app.ApplicationDescription;
29import org.onosproject.app.ApplicationEvent;
30import org.onosproject.app.ApplicationException;
31import org.onosproject.app.ApplicationState;
32import org.onosproject.app.ApplicationStore;
33import org.onosproject.cluster.ClusterService;
34import org.onosproject.cluster.ControllerNode;
35import org.onosproject.common.app.ApplicationArchive;
36import org.onosproject.core.Application;
37import org.onosproject.core.ApplicationId;
38import org.onosproject.core.ApplicationIdStore;
39import org.onosproject.core.DefaultApplication;
40import org.onosproject.core.Permission;
41import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
42import org.onosproject.store.cluster.messaging.ClusterMessage;
43import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
44import org.onosproject.store.cluster.messaging.MessageSubject;
Jonathan Hart77bdd262015-02-03 09:07:48 -080045import org.onosproject.store.ecmap.EventuallyConsistentMap;
46import org.onosproject.store.ecmap.EventuallyConsistentMapEvent;
47import org.onosproject.store.ecmap.EventuallyConsistentMapImpl;
48import org.onosproject.store.ecmap.EventuallyConsistentMapListener;
Thomas Vachuska90b453f2015-01-30 18:57:14 -080049import org.onosproject.store.impl.WallclockClockManager;
50import org.onosproject.store.serializers.KryoNamespaces;
51import org.slf4j.Logger;
52
53import java.io.ByteArrayInputStream;
54import java.io.IOException;
55import java.io.InputStream;
Thomas Vachuska90b453f2015-01-30 18:57:14 -080056import java.util.Set;
57import java.util.concurrent.CountDownLatch;
58import java.util.concurrent.Executors;
59import java.util.concurrent.ScheduledExecutorService;
60
61import static com.google.common.io.ByteStreams.toByteArray;
62import static java.util.concurrent.TimeUnit.MILLISECONDS;
63import static org.onlab.util.Tools.namedThreads;
64import static org.onosproject.app.ApplicationEvent.Type.*;
Jonathan Hart77bdd262015-02-03 09:07:48 -080065import static org.onosproject.store.app.GossipApplicationStore.InternalState.ACTIVATED;
66import static org.onosproject.store.app.GossipApplicationStore.InternalState.DEACTIVATED;
67import static org.onosproject.store.app.GossipApplicationStore.InternalState.INSTALLED;
68import static org.onosproject.store.ecmap.EventuallyConsistentMapEvent.Type.PUT;
69import static org.onosproject.store.ecmap.EventuallyConsistentMapEvent.Type.REMOVE;
Thomas Vachuska90b453f2015-01-30 18:57:14 -080070import static org.slf4j.LoggerFactory.getLogger;
71
72/**
73 * Manages inventory of applications in a distributed data store that uses
74 * optimistic replication and gossip based anti-entropy techniques.
75 */
76@Component(immediate = true)
77@Service
78public class GossipApplicationStore extends ApplicationArchive
79 implements ApplicationStore {
80
81 private final Logger log = getLogger(getClass());
82
83 private static final MessageSubject APP_BITS_REQUEST = new MessageSubject("app-bits-request");
84
85 private static final int FETCH_TIMEOUT_MS = 10_000;
86 private static final int LOAD_TIMEOUT_MS = 5_000;
87
88 public enum InternalState {
89 INSTALLED, ACTIVATED, DEACTIVATED
90 }
91
92 private final ScheduledExecutorService executor =
93 Executors.newSingleThreadScheduledExecutor(namedThreads("onos-app-store"));
94
95 private EventuallyConsistentMap<ApplicationId, Application> apps;
96 private EventuallyConsistentMap<Application, InternalState> states;
97 private EventuallyConsistentMap<Application, Set<Permission>> permissions;
98
99 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
100 protected ClusterCommunicationService clusterCommunicator;
101
102 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
103 protected ClusterService clusterService;
104
105 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
106 protected ApplicationIdStore idStore;
107
108 @Activate
109 public void activate() {
110 KryoNamespace.Builder intentSerializer = KryoNamespace.newBuilder()
111 .register(KryoNamespaces.API)
112 .register(InternalState.class);
113
114 clusterCommunicator.addSubscriber(APP_BITS_REQUEST, new InternalBitServer());
115
116 apps = new EventuallyConsistentMapImpl<>("apps", clusterService,
117 clusterCommunicator,
118 intentSerializer,
119 new WallclockClockManager<>());
120
121 states = new EventuallyConsistentMapImpl<>("app-states",
122 clusterService,
123 clusterCommunicator,
124 intentSerializer,
125 new WallclockClockManager<>());
126 states.addListener(new InternalAppStatesListener());
127
128 permissions = new EventuallyConsistentMapImpl<>("app-permissions",
129 clusterService,
130 clusterCommunicator,
131 intentSerializer,
132 new WallclockClockManager<>());
133
134 // FIXME: figure out load from disk; this will require resolving the dual authority problem
135
136 executor.schedule(this::pruneUninstalledApps, LOAD_TIMEOUT_MS, MILLISECONDS);
137
138 log.info("Started");
139 }
140
141 private void loadFromDisk() {
142 for (String name : getApplicationNames()) {
143 create(getApplicationDescription(name));
144 // load app permissions
145 }
146 }
147
148 @Deactivate
149 public void deactivate() {
150 apps.destroy();
151 states.destroy();
152 permissions.destroy();
153 log.info("Stopped");
154 }
155
156 @Override
157 public Set<Application> getApplications() {
158 return ImmutableSet.copyOf(apps.values());
159 }
160
161 @Override
162 public ApplicationId getId(String name) {
163 return idStore.getAppId(name);
164 }
165
166 @Override
167 public Application getApplication(ApplicationId appId) {
168 return apps.get(appId);
169 }
170
171 @Override
172 public ApplicationState getState(ApplicationId appId) {
173 Application app = apps.get(appId);
174 InternalState s = app == null ? null : states.get(app);
175 return s == null ? null : s == ACTIVATED ?
176 ApplicationState.ACTIVE : ApplicationState.INSTALLED;
177 }
178
179 @Override
180 public Application create(InputStream appDescStream) {
181 ApplicationDescription appDesc = saveApplication(appDescStream);
182 return create(appDesc);
183 }
184
185 private Application create(ApplicationDescription appDesc) {
186 Application app = registerApp(appDesc);
187 apps.put(app.id(), app);
188 states.put(app, INSTALLED);
189 return app;
190 }
191
192 @Override
193 public void remove(ApplicationId appId) {
194 Application app = apps.get(appId);
195 if (app != null) {
196 apps.remove(appId);
197 states.remove(app);
198 permissions.remove(app);
199 }
200 }
201
202 @Override
203 public void activate(ApplicationId appId) {
204 Application app = apps.get(appId);
205 if (app != null) {
206 states.put(app, ACTIVATED);
207 }
208 }
209
210 @Override
211 public void deactivate(ApplicationId appId) {
212 Application app = apps.get(appId);
213 if (app != null) {
214 states.put(app, DEACTIVATED);
215 }
216 }
217
218 @Override
219 public Set<Permission> getPermissions(ApplicationId appId) {
220 Application app = apps.get(appId);
221 return app != null ? permissions.get(app) : null;
222 }
223
224 @Override
225 public void setPermissions(ApplicationId appId, Set<Permission> permissions) {
226 Application app = getApplication(appId);
227 if (app != null) {
228 this.permissions.put(app, permissions);
229 delegate.notify(new ApplicationEvent(APP_PERMISSIONS_CHANGED, app));
230 }
231 }
232
233 /**
234 * Listener to application state distributed map changes.
235 */
236 private final class InternalAppStatesListener
237 implements EventuallyConsistentMapListener<Application, InternalState> {
238 @Override
239 public void event(EventuallyConsistentMapEvent<Application, InternalState> event) {
240 Application app = event.key();
241 InternalState state = event.value();
242
243 if (event.type() == PUT) {
244 if (state == INSTALLED) {
245 fetchBitsIfNeeded(app);
246 delegate.notify(new ApplicationEvent(APP_INSTALLED, app));
247
248 } else if (state == ACTIVATED) {
249 installAppIfNeeded(app);
250 setActive(app.id().name());
251 delegate.notify(new ApplicationEvent(APP_ACTIVATED, app));
252
253 } else if (state == DEACTIVATED) {
254 clearActive(app.id().name());
255 delegate.notify(new ApplicationEvent(APP_DEACTIVATED, app));
256 }
257 } else if (event.type() == REMOVE) {
258 delegate.notify(new ApplicationEvent(APP_UNINSTALLED, app));
259 purgeApplication(app.id().name());
260 }
261 }
262 }
263
264 /**
265 * Determines if the application bits are available locally.
266 */
267 private boolean appBitsAvailable(Application app) {
268 try {
269 ApplicationDescription appDesc = getApplicationDescription(app.id().name());
270 return appDesc.version().equals(app.version());
271 } catch (ApplicationException e) {
272 return false;
273 }
274 }
275
276 /**
277 * Fetches the bits from the cluster peers if necessary.
278 */
279 private void fetchBitsIfNeeded(Application app) {
280 if (!appBitsAvailable(app)) {
281 fetchBits(app);
282 }
283 }
284
285 /**
286 * Installs the application if necessary from the application peers.
287 */
288 private void installAppIfNeeded(Application app) {
289 if (!appBitsAvailable(app)) {
290 fetchBits(app);
291 delegate.notify(new ApplicationEvent(APP_INSTALLED, app));
292 }
293 }
294
295 /**
296 * Fetches the bits from the cluster peers.
297 */
298 private void fetchBits(Application app) {
299 ControllerNode localNode = clusterService.getLocalNode();
300 ClusterMessage message = new ClusterMessage(localNode.id(), APP_BITS_REQUEST,
Yuta HIGUCHI80292052015-02-10 23:11:59 -0800301 app.id().name().getBytes(Charsets.UTF_8));
302 //Map<ControllerNode, ListenableFuture<byte[]>> futures = new HashMap<>();
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800303 CountDownLatch latch = new CountDownLatch(1);
304
305 // FIXME: send message with name & version to make sure we don't get served old bits
306
307 log.info("Downloading bits for application {}", app.id().name());
308 for (ControllerNode node : clusterService.getNodes()) {
309 try {
310 ListenableFuture<byte[]> future = clusterCommunicator.sendAndReceive(message, node.id());
311 future.addListener(new InternalBitListener(app, node, future, latch), executor);
312 } catch (IOException e) {
313 log.debug("Unable to request bits for application {} from node {}",
314 app.id().name(), node.id());
315 }
316 }
317
318 try {
319 if (!latch.await(FETCH_TIMEOUT_MS, MILLISECONDS)) {
320 log.warn("Unable to fetch bits for application {}", app.id().name());
321 }
322 } catch (InterruptedException e) {
323 log.warn("Interrupted while fetching bits for application {}", app.id().name());
324 }
325 }
326
327 /**
328 * Responder to requests for application bits.
329 */
330 private class InternalBitServer implements ClusterMessageHandler {
331 @Override
332 public void handle(ClusterMessage message) {
Yuta HIGUCHI80292052015-02-10 23:11:59 -0800333 String name = new String(message.payload(), Charsets.UTF_8);
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800334 try {
335 message.respond(toByteArray(getApplicationInputStream(name)));
336 } catch (Exception e) {
337 log.debug("Unable to read bits for application {}", name);
338 }
339 }
340 }
341
342 /**
343 * Processes completed fetch requests.
344 */
345 private class InternalBitListener implements Runnable {
346 private final Application app;
347 private final ControllerNode node;
348 private final ListenableFuture<byte[]> future;
349 private final CountDownLatch latch;
350
351 public InternalBitListener(Application app, ControllerNode node,
352 ListenableFuture<byte[]> future, CountDownLatch latch) {
353 this.app = app;
354 this.node = node;
355 this.future = future;
356 this.latch = latch;
357 }
358
359 @Override
360 public void run() {
361 if (latch.getCount() > 0 && !future.isCancelled()) {
362 try {
363 byte[] bits = future.get(1, MILLISECONDS);
364 saveApplication(new ByteArrayInputStream(bits));
365 log.info("Downloaded bits for application {} from node {}",
366 app.id().name(), node.id());
367 latch.countDown();
368 } catch (Exception e) {
369 log.warn("Unable to fetch bits for application {} from node {}",
370 app.id().name(), node.id());
371 }
372 }
373 }
374 }
375
376 /**
377 * Prunes applications which are not in the map, but are on disk.
378 */
379 private void pruneUninstalledApps() {
380 for (String name : getApplicationNames()) {
381 if (getApplication(getId(name)) == null) {
382 Application app = registerApp(getApplicationDescription(name));
383 delegate.notify(new ApplicationEvent(APP_UNINSTALLED, app));
384 purgeApplication(app.id().name());
385 }
386 }
387 }
388
389 /**
390 * Produces a registered application from the supplied description.
391 */
392 private Application registerApp(ApplicationDescription appDesc) {
393 ApplicationId appId = idStore.registerApplication(appDesc.name());
394 return new DefaultApplication(appId, appDesc.version(), appDesc.description(),
395 appDesc.origin(), appDesc.permissions(),
396 appDesc.featuresRepo(), appDesc.features());
397 }
398}
399