Added a LogicalClockService for ordering arbitrary events in the cluster. Updated couple of areas that are currently vulnerable to clock skew
Change-Id: I14548ecb3c783104de8d72cbb5eb21de6ece08ed
diff --git a/core/store/dist/src/main/java/org/onosproject/store/app/GossipApplicationStore.java b/core/store/dist/src/main/java/org/onosproject/store/app/GossipApplicationStore.java
index 03b5b5e..5c1fc33 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/app/GossipApplicationStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/app/GossipApplicationStore.java
@@ -18,6 +18,7 @@
import com.google.common.base.Charsets;
import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.ListenableFuture;
+
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
@@ -44,12 +45,11 @@
import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
import org.onosproject.store.cluster.messaging.MessageSubject;
import org.onosproject.store.impl.MultiValuedTimestamp;
-import org.onosproject.store.impl.WallclockClockManager;
import org.onosproject.store.serializers.KryoNamespaces;
-import org.onosproject.store.service.ClockService;
import org.onosproject.store.service.EventuallyConsistentMap;
import org.onosproject.store.service.EventuallyConsistentMapEvent;
import org.onosproject.store.service.EventuallyConsistentMapListener;
+import org.onosproject.store.service.LogicalClockService;
import org.onosproject.store.service.StorageService;
import org.slf4j.Logger;
@@ -61,8 +61,6 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.atomic.AtomicLong;
-
import static com.google.common.io.ByteStreams.toByteArray;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.onlab.util.Tools.groupedThreads;
@@ -115,9 +113,10 @@
protected StorageService storageService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected ApplicationIdStore idStore;
+ protected LogicalClockService clockService;
- private final AtomicLong sequence = new AtomicLong();
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected ApplicationIdStore idStore;
@Activate
public void activate() {
@@ -135,24 +134,16 @@
// FIXME: Consider consolidating into a single map.
- ClockService<ApplicationId, Application> appsClockService = (appId, app) ->
- new MultiValuedTimestamp<>(getUpdateTime(appId.name()),
- sequence.incrementAndGet());
-
apps = storageService.<ApplicationId, Application>eventuallyConsistentMapBuilder()
.withName("apps")
.withSerializer(serializer)
- .withClockService(appsClockService)
+ .withClockService((k, v) -> clockService.getTimestamp())
.build();
- ClockService<Application, InternalState> statesClockService = (app, state) ->
- new MultiValuedTimestamp<>(getUpdateTime(app.id().name()),
- sequence.incrementAndGet());
-
states = storageService.<Application, InternalState>eventuallyConsistentMapBuilder()
.withName("app-states")
.withSerializer(serializer)
- .withClockService(statesClockService)
+ .withClockService((k, v) -> clockService.getTimestamp())
.build();
states.addListener(new InternalAppStatesListener());
@@ -160,7 +151,7 @@
permissions = storageService.<Application, Set<Permission>>eventuallyConsistentMapBuilder()
.withName("app-permissions")
.withSerializer(serializer)
- .withClockService(new WallclockClockManager<>())
+ .withClockService((k, v) -> clockService.getTimestamp())
.build();
log.info("Started");