Added a LogicalClockService for ordering arbitrary events in the cluster. Updated couple of areas that are currently vulnerable to clock skew
Change-Id: I14548ecb3c783104de8d72cbb5eb21de6ece08ed
diff --git a/cli/src/main/java/org/onosproject/cli/net/CountersListCommand.java b/cli/src/main/java/org/onosproject/cli/net/CountersListCommand.java
index 2adba25..039702d 100644
--- a/cli/src/main/java/org/onosproject/cli/net/CountersListCommand.java
+++ b/cli/src/main/java/org/onosproject/cli/net/CountersListCommand.java
@@ -33,7 +33,7 @@
description = "Lists information about atomic counters in the system")
public class CountersListCommand extends AbstractShellCommand {
- private static final String FMT = "name=%s next_value=%d";
+ private static final String FMT = "name=%s value=%d";
/**
* Displays counters as text.
@@ -41,7 +41,7 @@
* @param mapInfo map descriptions
*/
private void displayCounters(Map<String, Long> counters) {
- counters.forEach((name, nextValue) -> print(FMT, name, nextValue));
+ counters.forEach((name, value) -> print(FMT, name, value));
}
/**
diff --git a/cli/src/main/java/org/onosproject/cli/net/MapsListCommand.java b/cli/src/main/java/org/onosproject/cli/net/MapsListCommand.java
index 13e8867..27cc525 100644
--- a/cli/src/main/java/org/onosproject/cli/net/MapsListCommand.java
+++ b/cli/src/main/java/org/onosproject/cli/net/MapsListCommand.java
@@ -37,7 +37,7 @@
// TODO: Add support to display different eventually
// consistent maps as well.
- private static final String FMT = "%-20s %8s";
+ private static final String FMT = "name=%s size=%d";
/**
* Displays map info as text.
@@ -45,17 +45,9 @@
* @param mapInfo map descriptions
*/
private void displayMaps(List<MapInfo> mapInfo) {
- print("------------------------------");
- print(FMT, "Name", "Size");
- print("------------------------------");
-
-
for (MapInfo info : mapInfo) {
print(FMT, info.name(), info.size());
}
- if (mapInfo.size() > 0) {
- print("------------------------------");
- }
}
/**
diff --git a/core/api/src/main/java/org/onosproject/store/service/LogicalClockService.java b/core/api/src/main/java/org/onosproject/store/service/LogicalClockService.java
new file mode 100644
index 0000000..799f723
--- /dev/null
+++ b/core/api/src/main/java/org/onosproject/store/service/LogicalClockService.java
@@ -0,0 +1,20 @@
+package org.onosproject.store.service;
+
+import org.onosproject.store.Timestamp;
+
+/**
+ * Service that issues logical timestamps.
+ * <p>
+ * The logical timestamps are useful for establishing a total ordering of
+ * arbitrary cluster wide events without relying on a fully synchronized
+ * system clock (wall clock)
+ */
+public interface LogicalClockService {
+
+ /**
+ * Generates a new logical timestamp.
+ *
+ * @return timestamp
+ */
+ Timestamp getTimestamp();
+}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/app/GossipApplicationStore.java b/core/store/dist/src/main/java/org/onosproject/store/app/GossipApplicationStore.java
index 03b5b5e..5c1fc33 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/app/GossipApplicationStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/app/GossipApplicationStore.java
@@ -18,6 +18,7 @@
import com.google.common.base.Charsets;
import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.ListenableFuture;
+
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
@@ -44,12 +45,11 @@
import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
import org.onosproject.store.cluster.messaging.MessageSubject;
import org.onosproject.store.impl.MultiValuedTimestamp;
-import org.onosproject.store.impl.WallclockClockManager;
import org.onosproject.store.serializers.KryoNamespaces;
-import org.onosproject.store.service.ClockService;
import org.onosproject.store.service.EventuallyConsistentMap;
import org.onosproject.store.service.EventuallyConsistentMapEvent;
import org.onosproject.store.service.EventuallyConsistentMapListener;
+import org.onosproject.store.service.LogicalClockService;
import org.onosproject.store.service.StorageService;
import org.slf4j.Logger;
@@ -61,8 +61,6 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.atomic.AtomicLong;
-
import static com.google.common.io.ByteStreams.toByteArray;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.onlab.util.Tools.groupedThreads;
@@ -115,9 +113,10 @@
protected StorageService storageService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected ApplicationIdStore idStore;
+ protected LogicalClockService clockService;
- private final AtomicLong sequence = new AtomicLong();
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected ApplicationIdStore idStore;
@Activate
public void activate() {
@@ -135,24 +134,16 @@
// FIXME: Consider consolidating into a single map.
- ClockService<ApplicationId, Application> appsClockService = (appId, app) ->
- new MultiValuedTimestamp<>(getUpdateTime(appId.name()),
- sequence.incrementAndGet());
-
apps = storageService.<ApplicationId, Application>eventuallyConsistentMapBuilder()
.withName("apps")
.withSerializer(serializer)
- .withClockService(appsClockService)
+ .withClockService((k, v) -> clockService.getTimestamp())
.build();
- ClockService<Application, InternalState> statesClockService = (app, state) ->
- new MultiValuedTimestamp<>(getUpdateTime(app.id().name()),
- sequence.incrementAndGet());
-
states = storageService.<Application, InternalState>eventuallyConsistentMapBuilder()
.withName("app-states")
.withSerializer(serializer)
- .withClockService(statesClockService)
+ .withClockService((k, v) -> clockService.getTimestamp())
.build();
states.addListener(new InternalAppStatesListener());
@@ -160,7 +151,7 @@
permissions = storageService.<Application, Set<Permission>>eventuallyConsistentMapBuilder()
.withName("app-permissions")
.withSerializer(serializer)
- .withClockService(new WallclockClockManager<>())
+ .withClockService((k, v) -> clockService.getTimestamp())
.build();
log.info("Started");
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cfg/GossipComponentConfigStore.java b/core/store/dist/src/main/java/org/onosproject/store/cfg/GossipComponentConfigStore.java
index 13690d4..8d14b4e 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/cfg/GossipComponentConfigStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/cfg/GossipComponentConfigStore.java
@@ -26,11 +26,11 @@
import org.onosproject.cfg.ComponentConfigStore;
import org.onosproject.cfg.ComponentConfigStoreDelegate;
import org.onosproject.store.AbstractStore;
-import org.onosproject.store.impl.WallclockClockManager;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.service.EventuallyConsistentMap;
import org.onosproject.store.service.EventuallyConsistentMapEvent;
import org.onosproject.store.service.EventuallyConsistentMapListener;
+import org.onosproject.store.service.LogicalClockService;
import org.onosproject.store.service.StorageService;
import org.slf4j.Logger;
@@ -59,6 +59,9 @@
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected StorageService storageService;
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected LogicalClockService clockService;
+
@Activate
public void activate() {
KryoNamespace.Builder serializer = KryoNamespace.newBuilder()
@@ -67,7 +70,7 @@
properties = storageService.<String, String>eventuallyConsistentMapBuilder()
.withName("cfg")
.withSerializer(serializer)
- .withClockService(new WallclockClockManager<>())
+ .withClockService((k, v) -> clockService.getTimestamp())
.build();
properties.addListener(new InternalPropertiesListener());
diff --git a/core/store/dist/src/main/java/org/onosproject/store/core/impl/LogicalClockManager.java b/core/store/dist/src/main/java/org/onosproject/store/core/impl/LogicalClockManager.java
new file mode 100644
index 0000000..69a50f0
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/core/impl/LogicalClockManager.java
@@ -0,0 +1,51 @@
+package org.onosproject.store.core.impl;
+
+import static org.slf4j.LoggerFactory.getLogger;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.apache.felix.scr.annotations.Service;
+import org.onosproject.store.Timestamp;
+import org.onosproject.store.impl.LogicalTimestamp;
+import org.onosproject.store.service.AtomicCounter;
+import org.onosproject.store.service.LogicalClockService;
+import org.onosproject.store.service.StorageService;
+import org.slf4j.Logger;
+
+/**
+ * LogicalClockService implementation based on a AtomicCounter.
+ */
+@Component(immediate = true, enabled = true)
+@Service
+public class LogicalClockManager implements LogicalClockService {
+
+ private final Logger log = getLogger(getClass());
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected StorageService storageService;
+
+ private static final String SYSTEM_LOGICAL_CLOCK_COUNTER_NAME = "sys-clock-counter";
+ private AtomicCounter atomicCounter;
+
+ @Activate
+ public void activate() {
+ atomicCounter = storageService.atomicCounterBuilder()
+ .withName(SYSTEM_LOGICAL_CLOCK_COUNTER_NAME)
+ .withPartitionsDisabled()
+ .build();
+ log.info("Started.");
+ }
+
+ @Deactivate
+ public void deactivate() {
+ log.info("Stopped.");
+ }
+
+ @Override
+ public Timestamp getTimestamp() {
+ return new LogicalTimestamp(atomicCounter.incrementAndGet());
+ }
+}
\ No newline at end of file
diff --git a/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java b/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java
index 0487084..99eef3d 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java
@@ -18,6 +18,7 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
+
import org.apache.commons.lang3.RandomUtils;
import org.apache.commons.lang3.mutable.MutableBoolean;
import org.apache.commons.lang3.tuple.Pair;
@@ -32,6 +33,7 @@
import org.onosproject.store.cluster.messaging.ClusterMessage;
import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
import org.onosproject.store.cluster.messaging.MessageSubject;
+import org.onosproject.store.impl.LogicalTimestamp;
import org.onosproject.store.impl.Timestamped;
import org.onosproject.store.impl.WallClockTimestamp;
import org.onosproject.store.serializers.KryoSerializer;
@@ -223,6 +225,7 @@
protected void setupKryoPool() {
// Add the map's internal helper classes to the user-supplied serializer
serializerPool = builder
+ .register(LogicalTimestamp.class)
.register(WallClockTimestamp.class)
.register(PutEntry.class)
.register(RemoveEntry.class)
diff --git a/core/store/dist/src/main/java/org/onosproject/store/impl/LogicalTimestamp.java b/core/store/dist/src/main/java/org/onosproject/store/impl/LogicalTimestamp.java
new file mode 100644
index 0000000..5ae8b4f
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/impl/LogicalTimestamp.java
@@ -0,0 +1,68 @@
+package org.onosproject.store.impl;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import java.util.Objects;
+
+import org.onosproject.store.Timestamp;
+
+import com.google.common.base.MoreObjects;
+import com.google.common.collect.ComparisonChain;
+
+/**
+ * Timestamp based on logical sequence value.
+ * <p>
+ * LogicalTimestamps are ordered by their sequence values.
+ */
+public class LogicalTimestamp implements Timestamp {
+
+ private final long value;
+
+ public LogicalTimestamp(long value) {
+ this.value = value;
+ }
+
+ @Override
+ public int compareTo(Timestamp o) {
+ checkArgument(o instanceof LogicalTimestamp,
+ "Must be LogicalTimestamp", o);
+ LogicalTimestamp that = (LogicalTimestamp) o;
+
+ return ComparisonChain.start()
+ .compare(this.value, that.value)
+ .result();
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(value);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (!(obj instanceof LogicalTimestamp)) {
+ return false;
+ }
+ LogicalTimestamp that = (LogicalTimestamp) obj;
+ return Objects.equals(this.value, that.value);
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(getClass())
+ .add("value", value)
+ .toString();
+ }
+
+ /**
+ * Returns the sequence value.
+ *
+ * @return sequence value
+ */
+ public long value() {
+ return this.value;
+ }
+}
diff --git a/core/store/dist/src/test/java/org/onosproject/store/ecmap/EventuallyConsistentMapImplTest.java b/core/store/dist/src/test/java/org/onosproject/store/ecmap/EventuallyConsistentMapImplTest.java
index 8629fcd..0a3d5e6 100644
--- a/core/store/dist/src/test/java/org/onosproject/store/ecmap/EventuallyConsistentMapImplTest.java
+++ b/core/store/dist/src/test/java/org/onosproject/store/ecmap/EventuallyConsistentMapImplTest.java
@@ -20,6 +20,7 @@
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
+
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -35,6 +36,7 @@
import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
import org.onosproject.store.cluster.messaging.MessageSubject;
import org.onosproject.store.service.ClockService;
+import org.onosproject.store.impl.LogicalTimestamp;
import org.onosproject.store.impl.WallClockTimestamp;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.serializers.KryoSerializer;
@@ -102,6 +104,7 @@
.register(KryoNamespaces.API)
.register(TestTimestamp.class)
// Below is the classes that the map internally registers
+ .register(LogicalTimestamp.class)
.register(WallClockTimestamp.class)
.register(PutEntry.class)
.register(RemoveEntry.class)