blob: 0fd21d0805bb0804581c298de9d587655390e84a [file] [log] [blame]
Thomas Vachuska90b453f2015-01-30 18:57:14 -08001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.app;
17
Yuta HIGUCHI80292052015-02-10 23:11:59 -080018import com.google.common.base.Charsets;
Thomas Vachuska90b453f2015-01-30 18:57:14 -080019import com.google.common.collect.ImmutableSet;
20import com.google.common.util.concurrent.ListenableFuture;
21import org.apache.felix.scr.annotations.Activate;
22import org.apache.felix.scr.annotations.Component;
23import org.apache.felix.scr.annotations.Deactivate;
24import org.apache.felix.scr.annotations.Reference;
25import org.apache.felix.scr.annotations.ReferenceCardinality;
26import org.apache.felix.scr.annotations.Service;
27import org.onlab.util.KryoNamespace;
28import org.onosproject.app.ApplicationDescription;
29import org.onosproject.app.ApplicationEvent;
30import org.onosproject.app.ApplicationException;
31import org.onosproject.app.ApplicationState;
32import org.onosproject.app.ApplicationStore;
33import org.onosproject.cluster.ClusterService;
34import org.onosproject.cluster.ControllerNode;
35import org.onosproject.common.app.ApplicationArchive;
36import org.onosproject.core.Application;
37import org.onosproject.core.ApplicationId;
38import org.onosproject.core.ApplicationIdStore;
39import org.onosproject.core.DefaultApplication;
40import org.onosproject.core.Permission;
41import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
42import org.onosproject.store.cluster.messaging.ClusterMessage;
43import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
44import org.onosproject.store.cluster.messaging.MessageSubject;
Jonathan Hart77bdd262015-02-03 09:07:48 -080045import org.onosproject.store.ecmap.EventuallyConsistentMap;
46import org.onosproject.store.ecmap.EventuallyConsistentMapEvent;
47import org.onosproject.store.ecmap.EventuallyConsistentMapImpl;
48import org.onosproject.store.ecmap.EventuallyConsistentMapListener;
Thomas Vachuska90b453f2015-01-30 18:57:14 -080049import org.onosproject.store.impl.WallclockClockManager;
50import org.onosproject.store.serializers.KryoNamespaces;
51import org.slf4j.Logger;
52
53import java.io.ByteArrayInputStream;
54import java.io.IOException;
55import java.io.InputStream;
Thomas Vachuska90b453f2015-01-30 18:57:14 -080056import java.util.Set;
57import java.util.concurrent.CountDownLatch;
58import java.util.concurrent.Executors;
59import java.util.concurrent.ScheduledExecutorService;
60
61import static com.google.common.io.ByteStreams.toByteArray;
62import static java.util.concurrent.TimeUnit.MILLISECONDS;
Thomas Vachuska6f94ded2015-02-21 14:02:38 -080063import static org.onlab.util.Tools.groupedThreads;
Thomas Vachuska90b453f2015-01-30 18:57:14 -080064import static org.onosproject.app.ApplicationEvent.Type.*;
Thomas Vachuska6f94ded2015-02-21 14:02:38 -080065import static org.onosproject.store.app.GossipApplicationStore.InternalState.*;
Jonathan Hart77bdd262015-02-03 09:07:48 -080066import static org.onosproject.store.ecmap.EventuallyConsistentMapEvent.Type.PUT;
67import static org.onosproject.store.ecmap.EventuallyConsistentMapEvent.Type.REMOVE;
Thomas Vachuska90b453f2015-01-30 18:57:14 -080068import static org.slf4j.LoggerFactory.getLogger;
69
70/**
71 * Manages inventory of applications in a distributed data store that uses
72 * optimistic replication and gossip based anti-entropy techniques.
73 */
74@Component(immediate = true)
75@Service
76public class GossipApplicationStore extends ApplicationArchive
77 implements ApplicationStore {
78
79 private final Logger log = getLogger(getClass());
80
81 private static final MessageSubject APP_BITS_REQUEST = new MessageSubject("app-bits-request");
82
83 private static final int FETCH_TIMEOUT_MS = 10_000;
84 private static final int LOAD_TIMEOUT_MS = 5_000;
85
86 public enum InternalState {
87 INSTALLED, ACTIVATED, DEACTIVATED
88 }
89
90 private final ScheduledExecutorService executor =
Thomas Vachuska6f94ded2015-02-21 14:02:38 -080091 Executors.newSingleThreadScheduledExecutor(groupedThreads("onos/app", "store"));
Thomas Vachuska90b453f2015-01-30 18:57:14 -080092
93 private EventuallyConsistentMap<ApplicationId, Application> apps;
94 private EventuallyConsistentMap<Application, InternalState> states;
95 private EventuallyConsistentMap<Application, Set<Permission>> permissions;
96
97 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
98 protected ClusterCommunicationService clusterCommunicator;
99
100 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
101 protected ClusterService clusterService;
102
103 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
104 protected ApplicationIdStore idStore;
105
106 @Activate
107 public void activate() {
108 KryoNamespace.Builder intentSerializer = KryoNamespace.newBuilder()
109 .register(KryoNamespaces.API)
110 .register(InternalState.class);
111
112 clusterCommunicator.addSubscriber(APP_BITS_REQUEST, new InternalBitServer());
113
114 apps = new EventuallyConsistentMapImpl<>("apps", clusterService,
115 clusterCommunicator,
116 intentSerializer,
117 new WallclockClockManager<>());
118
119 states = new EventuallyConsistentMapImpl<>("app-states",
120 clusterService,
121 clusterCommunicator,
122 intentSerializer,
123 new WallclockClockManager<>());
124 states.addListener(new InternalAppStatesListener());
125
126 permissions = new EventuallyConsistentMapImpl<>("app-permissions",
127 clusterService,
128 clusterCommunicator,
129 intentSerializer,
130 new WallclockClockManager<>());
131
132 // FIXME: figure out load from disk; this will require resolving the dual authority problem
133
134 executor.schedule(this::pruneUninstalledApps, LOAD_TIMEOUT_MS, MILLISECONDS);
135
136 log.info("Started");
137 }
138
139 private void loadFromDisk() {
140 for (String name : getApplicationNames()) {
141 create(getApplicationDescription(name));
142 // load app permissions
143 }
144 }
145
146 @Deactivate
147 public void deactivate() {
148 apps.destroy();
149 states.destroy();
150 permissions.destroy();
151 log.info("Stopped");
152 }
153
154 @Override
155 public Set<Application> getApplications() {
156 return ImmutableSet.copyOf(apps.values());
157 }
158
159 @Override
160 public ApplicationId getId(String name) {
161 return idStore.getAppId(name);
162 }
163
164 @Override
165 public Application getApplication(ApplicationId appId) {
166 return apps.get(appId);
167 }
168
169 @Override
170 public ApplicationState getState(ApplicationId appId) {
171 Application app = apps.get(appId);
172 InternalState s = app == null ? null : states.get(app);
173 return s == null ? null : s == ACTIVATED ?
174 ApplicationState.ACTIVE : ApplicationState.INSTALLED;
175 }
176
177 @Override
178 public Application create(InputStream appDescStream) {
179 ApplicationDescription appDesc = saveApplication(appDescStream);
180 return create(appDesc);
181 }
182
183 private Application create(ApplicationDescription appDesc) {
184 Application app = registerApp(appDesc);
185 apps.put(app.id(), app);
186 states.put(app, INSTALLED);
187 return app;
188 }
189
190 @Override
191 public void remove(ApplicationId appId) {
192 Application app = apps.get(appId);
193 if (app != null) {
194 apps.remove(appId);
195 states.remove(app);
196 permissions.remove(app);
197 }
198 }
199
200 @Override
201 public void activate(ApplicationId appId) {
202 Application app = apps.get(appId);
203 if (app != null) {
204 states.put(app, ACTIVATED);
205 }
206 }
207
208 @Override
209 public void deactivate(ApplicationId appId) {
210 Application app = apps.get(appId);
211 if (app != null) {
212 states.put(app, DEACTIVATED);
213 }
214 }
215
216 @Override
217 public Set<Permission> getPermissions(ApplicationId appId) {
218 Application app = apps.get(appId);
219 return app != null ? permissions.get(app) : null;
220 }
221
222 @Override
223 public void setPermissions(ApplicationId appId, Set<Permission> permissions) {
224 Application app = getApplication(appId);
225 if (app != null) {
226 this.permissions.put(app, permissions);
227 delegate.notify(new ApplicationEvent(APP_PERMISSIONS_CHANGED, app));
228 }
229 }
230
231 /**
232 * Listener to application state distributed map changes.
233 */
234 private final class InternalAppStatesListener
235 implements EventuallyConsistentMapListener<Application, InternalState> {
236 @Override
237 public void event(EventuallyConsistentMapEvent<Application, InternalState> event) {
238 Application app = event.key();
239 InternalState state = event.value();
240
241 if (event.type() == PUT) {
242 if (state == INSTALLED) {
243 fetchBitsIfNeeded(app);
244 delegate.notify(new ApplicationEvent(APP_INSTALLED, app));
245
246 } else if (state == ACTIVATED) {
247 installAppIfNeeded(app);
248 setActive(app.id().name());
249 delegate.notify(new ApplicationEvent(APP_ACTIVATED, app));
250
251 } else if (state == DEACTIVATED) {
252 clearActive(app.id().name());
253 delegate.notify(new ApplicationEvent(APP_DEACTIVATED, app));
254 }
255 } else if (event.type() == REMOVE) {
256 delegate.notify(new ApplicationEvent(APP_UNINSTALLED, app));
257 purgeApplication(app.id().name());
258 }
259 }
260 }
261
262 /**
263 * Determines if the application bits are available locally.
264 */
265 private boolean appBitsAvailable(Application app) {
266 try {
267 ApplicationDescription appDesc = getApplicationDescription(app.id().name());
268 return appDesc.version().equals(app.version());
269 } catch (ApplicationException e) {
270 return false;
271 }
272 }
273
274 /**
275 * Fetches the bits from the cluster peers if necessary.
276 */
277 private void fetchBitsIfNeeded(Application app) {
278 if (!appBitsAvailable(app)) {
279 fetchBits(app);
280 }
281 }
282
283 /**
284 * Installs the application if necessary from the application peers.
285 */
286 private void installAppIfNeeded(Application app) {
287 if (!appBitsAvailable(app)) {
288 fetchBits(app);
289 delegate.notify(new ApplicationEvent(APP_INSTALLED, app));
290 }
291 }
292
293 /**
294 * Fetches the bits from the cluster peers.
295 */
296 private void fetchBits(Application app) {
297 ControllerNode localNode = clusterService.getLocalNode();
298 ClusterMessage message = new ClusterMessage(localNode.id(), APP_BITS_REQUEST,
Yuta HIGUCHI80292052015-02-10 23:11:59 -0800299 app.id().name().getBytes(Charsets.UTF_8));
300 //Map<ControllerNode, ListenableFuture<byte[]>> futures = new HashMap<>();
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800301 CountDownLatch latch = new CountDownLatch(1);
302
303 // FIXME: send message with name & version to make sure we don't get served old bits
304
305 log.info("Downloading bits for application {}", app.id().name());
306 for (ControllerNode node : clusterService.getNodes()) {
307 try {
308 ListenableFuture<byte[]> future = clusterCommunicator.sendAndReceive(message, node.id());
309 future.addListener(new InternalBitListener(app, node, future, latch), executor);
310 } catch (IOException e) {
311 log.debug("Unable to request bits for application {} from node {}",
312 app.id().name(), node.id());
313 }
314 }
315
316 try {
317 if (!latch.await(FETCH_TIMEOUT_MS, MILLISECONDS)) {
318 log.warn("Unable to fetch bits for application {}", app.id().name());
319 }
320 } catch (InterruptedException e) {
321 log.warn("Interrupted while fetching bits for application {}", app.id().name());
322 }
323 }
324
325 /**
326 * Responder to requests for application bits.
327 */
328 private class InternalBitServer implements ClusterMessageHandler {
329 @Override
330 public void handle(ClusterMessage message) {
Yuta HIGUCHI80292052015-02-10 23:11:59 -0800331 String name = new String(message.payload(), Charsets.UTF_8);
Thomas Vachuska90b453f2015-01-30 18:57:14 -0800332 try {
333 message.respond(toByteArray(getApplicationInputStream(name)));
334 } catch (Exception e) {
335 log.debug("Unable to read bits for application {}", name);
336 }
337 }
338 }
339
340 /**
341 * Processes completed fetch requests.
342 */
343 private class InternalBitListener implements Runnable {
344 private final Application app;
345 private final ControllerNode node;
346 private final ListenableFuture<byte[]> future;
347 private final CountDownLatch latch;
348
349 public InternalBitListener(Application app, ControllerNode node,
350 ListenableFuture<byte[]> future, CountDownLatch latch) {
351 this.app = app;
352 this.node = node;
353 this.future = future;
354 this.latch = latch;
355 }
356
357 @Override
358 public void run() {
359 if (latch.getCount() > 0 && !future.isCancelled()) {
360 try {
361 byte[] bits = future.get(1, MILLISECONDS);
362 saveApplication(new ByteArrayInputStream(bits));
363 log.info("Downloaded bits for application {} from node {}",
364 app.id().name(), node.id());
365 latch.countDown();
366 } catch (Exception e) {
367 log.warn("Unable to fetch bits for application {} from node {}",
368 app.id().name(), node.id());
369 }
370 }
371 }
372 }
373
374 /**
375 * Prunes applications which are not in the map, but are on disk.
376 */
377 private void pruneUninstalledApps() {
378 for (String name : getApplicationNames()) {
379 if (getApplication(getId(name)) == null) {
380 Application app = registerApp(getApplicationDescription(name));
381 delegate.notify(new ApplicationEvent(APP_UNINSTALLED, app));
382 purgeApplication(app.id().name());
383 }
384 }
385 }
386
387 /**
388 * Produces a registered application from the supplied description.
389 */
390 private Application registerApp(ApplicationDescription appDesc) {
391 ApplicationId appId = idStore.registerApplication(appDesc.name());
392 return new DefaultApplication(appId, appDesc.version(), appDesc.description(),
393 appDesc.origin(), appDesc.permissions(),
394 appDesc.featuresRepo(), appDesc.features());
395 }
396}
397