blob: 805c9cae30a900c9ce25d7d0f3e59e5aae708705 [file] [log] [blame]
Thomas Vachuska90b453f2015-01-30 18:57:14 -08001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.app;
17
Yuta HIGUCHI80292052015-02-10 23:11:59 -080018import com.google.common.base.Charsets;
Thomas Vachuska90b453f2015-01-30 18:57:14 -080019import com.google.common.collect.ImmutableSet;
20import com.google.common.util.concurrent.ListenableFuture;
Madan Jampani2af244a2015-02-22 13:12:01 -080021
Thomas Vachuska90b453f2015-01-30 18:57:14 -080022import org.apache.felix.scr.annotations.Activate;
23import org.apache.felix.scr.annotations.Component;
24import org.apache.felix.scr.annotations.Deactivate;
25import org.apache.felix.scr.annotations.Reference;
26import org.apache.felix.scr.annotations.ReferenceCardinality;
27import org.apache.felix.scr.annotations.Service;
28import org.onlab.util.KryoNamespace;
29import org.onosproject.app.ApplicationDescription;
30import org.onosproject.app.ApplicationEvent;
31import org.onosproject.app.ApplicationException;
32import org.onosproject.app.ApplicationState;
33import org.onosproject.app.ApplicationStore;
34import org.onosproject.cluster.ClusterService;
35import org.onosproject.cluster.ControllerNode;
36import org.onosproject.common.app.ApplicationArchive;
37import org.onosproject.core.Application;
38import org.onosproject.core.ApplicationId;
39import org.onosproject.core.ApplicationIdStore;
40import org.onosproject.core.DefaultApplication;
41import org.onosproject.core.Permission;
42import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
43import org.onosproject.store.cluster.messaging.ClusterMessage;
44import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
45import org.onosproject.store.cluster.messaging.MessageSubject;
Jonathan Hart77bdd262015-02-03 09:07:48 -080046import org.onosproject.store.ecmap.EventuallyConsistentMap;
47import org.onosproject.store.ecmap.EventuallyConsistentMapEvent;
48import org.onosproject.store.ecmap.EventuallyConsistentMapImpl;
49import org.onosproject.store.ecmap.EventuallyConsistentMapListener;
Thomas Vachuska90b453f2015-01-30 18:57:14 -080050import org.onosproject.store.impl.WallclockClockManager;
51import org.onosproject.store.serializers.KryoNamespaces;
52import org.slf4j.Logger;
53
54import java.io.ByteArrayInputStream;
55import java.io.IOException;
56import java.io.InputStream;
Thomas Vachuska90b453f2015-01-30 18:57:14 -080057import java.util.Set;
58import java.util.concurrent.CountDownLatch;
Madan Jampani2af244a2015-02-22 13:12:01 -080059import java.util.concurrent.ExecutorService;
Thomas Vachuska90b453f2015-01-30 18:57:14 -080060import java.util.concurrent.Executors;
61import java.util.concurrent.ScheduledExecutorService;
62
63import static com.google.common.io.ByteStreams.toByteArray;
64import static java.util.concurrent.TimeUnit.MILLISECONDS;
Thomas Vachuska6f94ded2015-02-21 14:02:38 -080065import static org.onlab.util.Tools.groupedThreads;
Thomas Vachuska90b453f2015-01-30 18:57:14 -080066import static org.onosproject.app.ApplicationEvent.Type.*;
Thomas Vachuska6f94ded2015-02-21 14:02:38 -080067import static org.onosproject.store.app.GossipApplicationStore.InternalState.*;
Jonathan Hart77bdd262015-02-03 09:07:48 -080068import static org.onosproject.store.ecmap.EventuallyConsistentMapEvent.Type.PUT;
69import static org.onosproject.store.ecmap.EventuallyConsistentMapEvent.Type.REMOVE;
Thomas Vachuska90b453f2015-01-30 18:57:14 -080070import static org.slf4j.LoggerFactory.getLogger;
71
72/**
73 * Manages inventory of applications in a distributed data store that uses
74 * optimistic replication and gossip based anti-entropy techniques.
75 */
76@Component(immediate = true)
77@Service
78public class GossipApplicationStore extends ApplicationArchive
79 implements ApplicationStore {
80
81 private final Logger log = getLogger(getClass());
82
83 private static final MessageSubject APP_BITS_REQUEST = new MessageSubject("app-bits-request");
84
85 private static final int FETCH_TIMEOUT_MS = 10_000;
86 private static final int LOAD_TIMEOUT_MS = 5_000;
87
88 public enum InternalState {
89 INSTALLED, ACTIVATED, DEACTIVATED
90 }
91
Madan Jampani6b5b7172015-02-23 13:02:26 -080092 private ScheduledExecutorService executor;
Madan Jampani2af244a2015-02-22 13:12:01 -080093 private ExecutorService messageHandlingExecutor;
94
Thomas Vachuska90b453f2015-01-30 18:57:14 -080095 private EventuallyConsistentMap<ApplicationId, Application> apps;
96 private EventuallyConsistentMap<Application, InternalState> states;
97 private EventuallyConsistentMap<Application, Set<Permission>> permissions;
98
99 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
100 protected ClusterCommunicationService clusterCommunicator;
101
102 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
103 protected ClusterService clusterService;
104
105 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
106 protected ApplicationIdStore idStore;
107
108 @Activate
109 public void activate() {
110 KryoNamespace.Builder intentSerializer = KryoNamespace.newBuilder()
111 .register(KryoNamespaces.API)
112 .register(InternalState.class);
113
Madan Jampani6b5b7172015-02-23 13:02:26 -0800114 executor = Executors.newSingleThreadScheduledExecutor(groupedThreads("onos/app", "store"));
115
Madan Jampani2af244a2015-02-22 13:12:01 -0800116 messageHandlingExecutor = Executors.newSingleThreadExecutor(
117 groupedThreads("onos/store/app", "message-handler"));
118
119 clusterCommunicator.addSubscriber(APP_BITS_REQUEST, new InternalBitServer(), messageHandlingExecutor);
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800120
121 apps = new EventuallyConsistentMapImpl<>("apps", clusterService,
122 clusterCommunicator,
123 intentSerializer,
124 new WallclockClockManager<>());
125
126 states = new EventuallyConsistentMapImpl<>("app-states",
127 clusterService,
128 clusterCommunicator,
129 intentSerializer,
130 new WallclockClockManager<>());
131 states.addListener(new InternalAppStatesListener());
132
133 permissions = new EventuallyConsistentMapImpl<>("app-permissions",
134 clusterService,
135 clusterCommunicator,
136 intentSerializer,
137 new WallclockClockManager<>());
138
139 // FIXME: figure out load from disk; this will require resolving the dual authority problem
140
141 executor.schedule(this::pruneUninstalledApps, LOAD_TIMEOUT_MS, MILLISECONDS);
142
143 log.info("Started");
144 }
145
146 private void loadFromDisk() {
147 for (String name : getApplicationNames()) {
148 create(getApplicationDescription(name));
149 // load app permissions
150 }
151 }
152
153 @Deactivate
154 public void deactivate() {
Madan Jampani2af244a2015-02-22 13:12:01 -0800155 clusterCommunicator.removeSubscriber(APP_BITS_REQUEST);
156 messageHandlingExecutor.shutdown();
Madan Jampani6b5b7172015-02-23 13:02:26 -0800157 executor.shutdown();
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800158 apps.destroy();
159 states.destroy();
160 permissions.destroy();
161 log.info("Stopped");
162 }
163
164 @Override
165 public Set<Application> getApplications() {
166 return ImmutableSet.copyOf(apps.values());
167 }
168
169 @Override
170 public ApplicationId getId(String name) {
171 return idStore.getAppId(name);
172 }
173
174 @Override
175 public Application getApplication(ApplicationId appId) {
176 return apps.get(appId);
177 }
178
179 @Override
180 public ApplicationState getState(ApplicationId appId) {
181 Application app = apps.get(appId);
182 InternalState s = app == null ? null : states.get(app);
183 return s == null ? null : s == ACTIVATED ?
184 ApplicationState.ACTIVE : ApplicationState.INSTALLED;
185 }
186
187 @Override
188 public Application create(InputStream appDescStream) {
189 ApplicationDescription appDesc = saveApplication(appDescStream);
190 return create(appDesc);
191 }
192
193 private Application create(ApplicationDescription appDesc) {
194 Application app = registerApp(appDesc);
195 apps.put(app.id(), app);
196 states.put(app, INSTALLED);
197 return app;
198 }
199
200 @Override
201 public void remove(ApplicationId appId) {
202 Application app = apps.get(appId);
203 if (app != null) {
204 apps.remove(appId);
205 states.remove(app);
206 permissions.remove(app);
207 }
208 }
209
210 @Override
211 public void activate(ApplicationId appId) {
212 Application app = apps.get(appId);
213 if (app != null) {
214 states.put(app, ACTIVATED);
215 }
216 }
217
218 @Override
219 public void deactivate(ApplicationId appId) {
220 Application app = apps.get(appId);
221 if (app != null) {
222 states.put(app, DEACTIVATED);
223 }
224 }
225
226 @Override
227 public Set<Permission> getPermissions(ApplicationId appId) {
228 Application app = apps.get(appId);
229 return app != null ? permissions.get(app) : null;
230 }
231
232 @Override
233 public void setPermissions(ApplicationId appId, Set<Permission> permissions) {
234 Application app = getApplication(appId);
235 if (app != null) {
236 this.permissions.put(app, permissions);
237 delegate.notify(new ApplicationEvent(APP_PERMISSIONS_CHANGED, app));
238 }
239 }
240
241 /**
242 * Listener to application state distributed map changes.
243 */
244 private final class InternalAppStatesListener
245 implements EventuallyConsistentMapListener<Application, InternalState> {
246 @Override
247 public void event(EventuallyConsistentMapEvent<Application, InternalState> event) {
248 Application app = event.key();
249 InternalState state = event.value();
250
251 if (event.type() == PUT) {
252 if (state == INSTALLED) {
253 fetchBitsIfNeeded(app);
254 delegate.notify(new ApplicationEvent(APP_INSTALLED, app));
255
256 } else if (state == ACTIVATED) {
257 installAppIfNeeded(app);
258 setActive(app.id().name());
259 delegate.notify(new ApplicationEvent(APP_ACTIVATED, app));
260
261 } else if (state == DEACTIVATED) {
262 clearActive(app.id().name());
263 delegate.notify(new ApplicationEvent(APP_DEACTIVATED, app));
264 }
265 } else if (event.type() == REMOVE) {
266 delegate.notify(new ApplicationEvent(APP_UNINSTALLED, app));
267 purgeApplication(app.id().name());
268 }
269 }
270 }
271
272 /**
273 * Determines if the application bits are available locally.
274 */
275 private boolean appBitsAvailable(Application app) {
276 try {
277 ApplicationDescription appDesc = getApplicationDescription(app.id().name());
278 return appDesc.version().equals(app.version());
279 } catch (ApplicationException e) {
280 return false;
281 }
282 }
283
284 /**
285 * Fetches the bits from the cluster peers if necessary.
286 */
287 private void fetchBitsIfNeeded(Application app) {
288 if (!appBitsAvailable(app)) {
289 fetchBits(app);
290 }
291 }
292
293 /**
294 * Installs the application if necessary from the application peers.
295 */
296 private void installAppIfNeeded(Application app) {
297 if (!appBitsAvailable(app)) {
298 fetchBits(app);
299 delegate.notify(new ApplicationEvent(APP_INSTALLED, app));
300 }
301 }
302
303 /**
304 * Fetches the bits from the cluster peers.
305 */
306 private void fetchBits(Application app) {
307 ControllerNode localNode = clusterService.getLocalNode();
308 ClusterMessage message = new ClusterMessage(localNode.id(), APP_BITS_REQUEST,
Yuta HIGUCHI80292052015-02-10 23:11:59 -0800309 app.id().name().getBytes(Charsets.UTF_8));
310 //Map<ControllerNode, ListenableFuture<byte[]>> futures = new HashMap<>();
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800311 CountDownLatch latch = new CountDownLatch(1);
312
313 // FIXME: send message with name & version to make sure we don't get served old bits
314
315 log.info("Downloading bits for application {}", app.id().name());
316 for (ControllerNode node : clusterService.getNodes()) {
317 try {
318 ListenableFuture<byte[]> future = clusterCommunicator.sendAndReceive(message, node.id());
319 future.addListener(new InternalBitListener(app, node, future, latch), executor);
320 } catch (IOException e) {
321 log.debug("Unable to request bits for application {} from node {}",
322 app.id().name(), node.id());
323 }
324 }
325
326 try {
327 if (!latch.await(FETCH_TIMEOUT_MS, MILLISECONDS)) {
328 log.warn("Unable to fetch bits for application {}", app.id().name());
329 }
330 } catch (InterruptedException e) {
331 log.warn("Interrupted while fetching bits for application {}", app.id().name());
332 }
333 }
334
335 /**
336 * Responder to requests for application bits.
337 */
338 private class InternalBitServer implements ClusterMessageHandler {
339 @Override
340 public void handle(ClusterMessage message) {
Yuta HIGUCHI80292052015-02-10 23:11:59 -0800341 String name = new String(message.payload(), Charsets.UTF_8);
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800342 try {
343 message.respond(toByteArray(getApplicationInputStream(name)));
344 } catch (Exception e) {
345 log.debug("Unable to read bits for application {}", name);
346 }
347 }
348 }
349
350 /**
351 * Processes completed fetch requests.
352 */
353 private class InternalBitListener implements Runnable {
354 private final Application app;
355 private final ControllerNode node;
356 private final ListenableFuture<byte[]> future;
357 private final CountDownLatch latch;
358
359 public InternalBitListener(Application app, ControllerNode node,
360 ListenableFuture<byte[]> future, CountDownLatch latch) {
361 this.app = app;
362 this.node = node;
363 this.future = future;
364 this.latch = latch;
365 }
366
367 @Override
368 public void run() {
369 if (latch.getCount() > 0 && !future.isCancelled()) {
370 try {
371 byte[] bits = future.get(1, MILLISECONDS);
372 saveApplication(new ByteArrayInputStream(bits));
373 log.info("Downloaded bits for application {} from node {}",
374 app.id().name(), node.id());
375 latch.countDown();
376 } catch (Exception e) {
377 log.warn("Unable to fetch bits for application {} from node {}",
378 app.id().name(), node.id());
379 }
380 }
381 }
382 }
383
384 /**
385 * Prunes applications which are not in the map, but are on disk.
386 */
387 private void pruneUninstalledApps() {
388 for (String name : getApplicationNames()) {
389 if (getApplication(getId(name)) == null) {
390 Application app = registerApp(getApplicationDescription(name));
391 delegate.notify(new ApplicationEvent(APP_UNINSTALLED, app));
392 purgeApplication(app.id().name());
393 }
394 }
395 }
396
397 /**
398 * Produces a registered application from the supplied description.
399 */
400 private Application registerApp(ApplicationDescription appDesc) {
401 ApplicationId appId = idStore.registerApplication(appDesc.name());
402 return new DefaultApplication(appId, appDesc.version(), appDesc.description(),
403 appDesc.origin(), appDesc.permissions(),
404 appDesc.featuresRepo(), appDesc.features());
405 }
406}
407