Adding persistence to the gossip application store.
Change-Id: Ib1382f9d1009297dde902f0d3e0daf27596587c5
diff --git a/core/store/dist/src/main/java/org/onosproject/store/app/GossipApplicationStore.java b/core/store/dist/src/main/java/org/onosproject/store/app/GossipApplicationStore.java
index 805c9ca..d8ce7d5 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/app/GossipApplicationStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/app/GossipApplicationStore.java
@@ -18,7 +18,6 @@
import com.google.common.base.Charsets;
import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.ListenableFuture;
-
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
@@ -31,6 +30,7 @@
import org.onosproject.app.ApplicationException;
import org.onosproject.app.ApplicationState;
import org.onosproject.app.ApplicationStore;
+import org.onosproject.app.ApplicationStoreDelegate;
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.ControllerNode;
import org.onosproject.common.app.ApplicationArchive;
@@ -47,6 +47,8 @@
import org.onosproject.store.ecmap.EventuallyConsistentMapEvent;
import org.onosproject.store.ecmap.EventuallyConsistentMapImpl;
import org.onosproject.store.ecmap.EventuallyConsistentMapListener;
+import org.onosproject.store.impl.ClockService;
+import org.onosproject.store.impl.MultiValuedTimestamp;
import org.onosproject.store.impl.WallclockClockManager;
import org.onosproject.store.serializers.KryoNamespaces;
import org.slf4j.Logger;
@@ -59,6 +61,7 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.atomic.AtomicLong;
import static com.google.common.io.ByteStreams.toByteArray;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
@@ -105,10 +108,13 @@
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ApplicationIdStore idStore;
+ private final AtomicLong sequence = new AtomicLong();
+
@Activate
public void activate() {
- KryoNamespace.Builder intentSerializer = KryoNamespace.newBuilder()
+ KryoNamespace.Builder serializer = KryoNamespace.newBuilder()
.register(KryoNamespaces.API)
+ .register(MultiValuedTimestamp.class)
.register(InternalState.class);
executor = Executors.newSingleThreadScheduledExecutor(groupedThreads("onos/app", "store"));
@@ -118,35 +124,48 @@
clusterCommunicator.addSubscriber(APP_BITS_REQUEST, new InternalBitServer(), messageHandlingExecutor);
+ // FIXME: Consider consolidating into a single map.
+
+ ClockService<ApplicationId, Application> appsClockService = (appId, app) ->
+ new MultiValuedTimestamp<>(getUpdateTime(appId.name()),
+ sequence.incrementAndGet());
+
apps = new EventuallyConsistentMapImpl<>("apps", clusterService,
clusterCommunicator,
- intentSerializer,
- new WallclockClockManager<>());
+ serializer,
+ appsClockService);
+
+ ClockService<Application, InternalState> statesClockService = (app, state) ->
+ new MultiValuedTimestamp<>(getUpdateTime(app.id().name()),
+ sequence.incrementAndGet());
states = new EventuallyConsistentMapImpl<>("app-states",
clusterService,
clusterCommunicator,
- intentSerializer,
- new WallclockClockManager<>());
+ serializer,
+ statesClockService);
states.addListener(new InternalAppStatesListener());
permissions = new EventuallyConsistentMapImpl<>("app-permissions",
clusterService,
clusterCommunicator,
- intentSerializer,
+ serializer,
new WallclockClockManager<>());
- // FIXME: figure out load from disk; this will require resolving the dual authority problem
-
- executor.schedule(this::pruneUninstalledApps, LOAD_TIMEOUT_MS, MILLISECONDS);
-
log.info("Started");
}
+ /**
+ * Loads the application inventory from the disk and activates apps if
+ * they are marked to be active.
+ */
private void loadFromDisk() {
for (String name : getApplicationNames()) {
- create(getApplicationDescription(name));
- // load app permissions
+ Application app = create(getApplicationDescription(name));
+ if (app != null && isActive(app.id().name())) {
+ activate(app.id());
+ // load app permissions
+ }
}
}
@@ -162,6 +181,13 @@
}
@Override
+ public void setDelegate(ApplicationStoreDelegate delegate) {
+ super.setDelegate(delegate);
+ loadFromDisk();
+// executor.schedule(this::pruneUninstalledApps, LOAD_TIMEOUT_MS, MILLISECONDS);
+ }
+
+ @Override
public Set<Application> getApplications() {
return ImmutableSet.copyOf(apps.values());
}
@@ -403,5 +429,6 @@
appDesc.origin(), appDesc.permissions(),
appDesc.featuresRepo(), appDesc.features());
}
+
}