SharedLog runtime proto
ONOS-1806, ONOS-1807, ONOS-1808
Change-Id: Ic86cb7bdc6b04a81180ab43afa01c1aea4ac18c7
diff --git a/conf/hazelcast.default.xml b/conf/hazelcast.default.xml
index 633f1a9..a05ff8c 100644
--- a/conf/hazelcast.default.xml
+++ b/conf/hazelcast.default.xml
@@ -29,6 +29,9 @@
<data-serializable-factories>
<data-serializable-factory factory-id="1">net.onrc.onos.core.datastore.hazelcast.VersionedValueSerializableFactory</data-serializable-factory>
</data-serializable-factories>
+ <serializers>
+ <serializer type-class="net.onrc.onos.core.util.distributed.sharedlog.internal.LogValue">net.onrc.onos.core.util.distributed.sharedlog.hazelcast.LogValueSerializer</serializer>
+ </serializers>
</serialization>
<!-- Configuration of maps used as a data store. -->
@@ -41,6 +44,38 @@
<!-- near cache must not be used -->
</map>
+ <!-- Configuration of maps used as a log store. -->
+ <map name="logmap://*">
+ <!-- must use 'sync' backup to imitate other data store -->
+ <backup-count>3</backup-count>
+ <async-backup-count>0</async-backup-count>
+ <!-- can be true, since log map value is basically immutable -->
+ <read-backup-data>true</read-backup-data>
+ <!-- near cache can be enabled -->
+ <near-cache>
+ <!-- TODO revisit random params -->
+ <max-size>5000</max-size>
+ <time-to-live-seconds>300</time-to-live-seconds>
+ <eviction-policy>LRU</eviction-policy>
+ </near-cache>
+ </map>
+
+ <!-- Configuration of maps used as a snap shot store. -->
+ <map name="ssmap://*">
+ <!-- must use 'sync' backup to imitate other data store -->
+ <backup-count>3</backup-count>
+ <async-backup-count>0</async-backup-count>
+ <!-- can be true, since snap shot map value is basically immutable -->
+ <read-backup-data>true</read-backup-data>
+ <!-- near cache can be enabled -->
+ <near-cache>
+ <!-- TODO revisit random params -->
+ <max-size>10</max-size>
+ <time-to-live-seconds>300</time-to-live-seconds>
+ <eviction-policy>LRU</eviction-policy>
+ </near-cache>
+ </map>
+
<map name="*">
<!--
diff --git a/conf/template/hazelcast.xml.template b/conf/template/hazelcast.xml.template
index 6e511ef..d06addc 100644
--- a/conf/template/hazelcast.xml.template
+++ b/conf/template/hazelcast.xml.template
@@ -32,6 +32,9 @@
<data-serializable-factories>
<data-serializable-factory factory-id="1">net.onrc.onos.core.datastore.hazelcast.VersionedValueSerializableFactory</data-serializable-factory>
</data-serializable-factories>
+ <serializers>
+ <serializer type-class="net.onrc.onos.core.util.distributed.sharedlog.internal.LogValue">net.onrc.onos.core.util.distributed.sharedlog.hazelcast.LogValueSerializer</serializer>
+ </serializers>
</serialization>
<!-- Configuration of maps used as a data store. -->
@@ -44,6 +47,38 @@
<!-- near cache must not be used -->
</map>
+ <!-- Configuration of maps used as a log store. -->
+ <map name="logmap://*">
+ <!-- must use 'sync' backup to imitate other data store -->
+ <backup-count>3</backup-count>
+ <async-backup-count>0</async-backup-count>
+ <!-- can be true, since log map value is basically immutable -->
+ <read-backup-data>true</read-backup-data>
+ <!-- near cache can be enabled -->
+ <near-cache>
+ <!-- TODO revisit random params -->
+ <max-size>5000</max-size>
+ <time-to-live-seconds>300</time-to-live-seconds>
+ <eviction-policy>LRU</eviction-policy>
+ </near-cache>
+ </map>
+
+ <!-- Configuration of maps used as a snap shot store. -->
+ <map name="ssmap://*">
+ <!-- must use 'sync' backup to imitate other data store -->
+ <backup-count>3</backup-count>
+ <async-backup-count>0</async-backup-count>
+ <!-- can be true, since snap shot map value is basically immutable -->
+ <read-backup-data>true</read-backup-data>
+ <!-- near cache can be enabled -->
+ <near-cache>
+ <!-- TODO revisit random params -->
+ <max-size>10</max-size>
+ <time-to-live-seconds>300</time-to-live-seconds>
+ <eviction-policy>LRU</eviction-policy>
+ </near-cache>
+ </map>
+
<map name="*">
<!--
diff --git a/pom.xml b/pom.xml
index 59bdf45..5d06100 100644
--- a/pom.xml
+++ b/pom.xml
@@ -630,6 +630,7 @@
<version>1.10</version>
</dependency>
<dependency>
+ <!-- Remove me when ready -->
<groupId>commons-lang</groupId>
<artifactId>commons-lang</artifactId>
<version>2.6</version>
@@ -645,6 +646,11 @@
<version>4.0</version>
</dependency>
<dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-lang3</artifactId>
+ <version>3.3.2</version>
+ </dependency>
+ <dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.6</version>
@@ -717,6 +723,11 @@
<artifactId>openflowj</artifactId>
<version>0.3.8-SNAPSHOT</version>
</dependency>
+ <dependency>
+ <groupId>joda-time</groupId>
+ <artifactId>joda-time</artifactId>
+ <version>2.4</version>
+ </dependency>
<!-- Floodlight's dependencies -->
<dependency>
<groupId>args4j</groupId>
diff --git a/src/main/java/net/onrc/onos/core/datastore/hazelcast/HZClient.java b/src/main/java/net/onrc/onos/core/datastore/hazelcast/HZClient.java
index eaa647c..f725248 100644
--- a/src/main/java/net/onrc/onos/core/datastore/hazelcast/HZClient.java
+++ b/src/main/java/net/onrc/onos/core/datastore/hazelcast/HZClient.java
@@ -18,6 +18,7 @@
import net.onrc.onos.core.datastore.hazelcast.HZTable.VersionedValue;
import net.onrc.onos.core.datastore.internal.IModifiableMultiEntryOperation;
import net.onrc.onos.core.datastore.utils.ByteArrayUtil;
+import net.onrc.onos.core.util.serializers.HazelcastSerializationConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -186,7 +187,7 @@
// This method is no longer required, if equibalent to the following
// is defined in hazelcast.xml
config.addDataSerializableFactoryClass(
- VersionedValueSerializableFactory.FACTORY_ID,
+ HazelcastSerializationConstants.VERSIONED_VALUE_SERIALIZABLE_FACTORY_ID,
VersionedValueSerializableFactory.class);
}
diff --git a/src/main/java/net/onrc/onos/core/datastore/hazelcast/HZTable.java b/src/main/java/net/onrc/onos/core/datastore/hazelcast/HZTable.java
index 0164433..d858423 100644
--- a/src/main/java/net/onrc/onos/core/datastore/hazelcast/HZTable.java
+++ b/src/main/java/net/onrc/onos/core/datastore/hazelcast/HZTable.java
@@ -17,6 +17,7 @@
import net.onrc.onos.core.datastore.ObjectExistsException;
import net.onrc.onos.core.datastore.WrongVersionException;
import net.onrc.onos.core.datastore.utils.ByteArrayUtil;
+import net.onrc.onos.core.util.serializers.HazelcastSerializationConstants;
import org.apache.commons.lang.ArrayUtils;
import org.slf4j.Logger;
@@ -117,12 +118,12 @@
@Override
public int getFactoryId() {
- return VersionedValueSerializableFactory.FACTORY_ID;
+ return HazelcastSerializationConstants.VERSIONED_VALUE_SERIALIZABLE_FACTORY_ID;
}
@Override
public int getId() {
- return VersionedValueSerializableFactory.VERSIONED_VALUE_ID;
+ return HazelcastSerializationConstants.VERSIONED_VALUE_TYPE_ID;
}
@Override
diff --git a/src/main/java/net/onrc/onos/core/datastore/hazelcast/VersionedValueSerializableFactory.java b/src/main/java/net/onrc/onos/core/datastore/hazelcast/VersionedValueSerializableFactory.java
index 7177695..d4ed1b2 100644
--- a/src/main/java/net/onrc/onos/core/datastore/hazelcast/VersionedValueSerializableFactory.java
+++ b/src/main/java/net/onrc/onos/core/datastore/hazelcast/VersionedValueSerializableFactory.java
@@ -1,5 +1,7 @@
package net.onrc.onos.core.datastore.hazelcast;
+import net.onrc.onos.core.util.serializers.HazelcastSerializationConstants;
+
import com.hazelcast.nio.serialization.DataSerializableFactory;
import com.hazelcast.nio.serialization.IdentifiedDataSerializable;
@@ -8,21 +10,11 @@
*/
public class VersionedValueSerializableFactory implements
DataSerializableFactory {
- // revisit these magic numbers
- /**
- * IdentifiedDataSerializable Factory ID.
- */
- public static final int FACTORY_ID = 1;
-
- /**
- * IdentifiedDataSerializable type ID for HZTable.VersionedValue class.
- */
- public static final int VERSIONED_VALUE_ID = 1;
@Override
public IdentifiedDataSerializable create(final int typeId) {
switch (typeId) {
- case VERSIONED_VALUE_ID:
+ case HazelcastSerializationConstants.VERSIONED_VALUE_TYPE_ID:
return new HZTable.VersionedValue();
default:
diff --git a/src/main/java/net/onrc/onos/core/util/distributed/sharedlog/ByteValue.java b/src/main/java/net/onrc/onos/core/util/distributed/sharedlog/ByteValue.java
new file mode 100644
index 0000000..26f7e75
--- /dev/null
+++ b/src/main/java/net/onrc/onos/core/util/distributed/sharedlog/ByteValue.java
@@ -0,0 +1,45 @@
+package net.onrc.onos.core.util.distributed.sharedlog;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.util.Arrays;
+
+import javax.annotation.concurrent.Immutable;
+
+import com.google.common.annotations.Beta;
+
+import net.onrc.onos.core.util.distributed.sharedlog.internal.LogValue;
+import net.onrc.onos.core.util.distributed.sharedlog.internal.SnapShotValue;
+
+// TODO register to Kryo?
+// TODO Should this be final, or should we allow sub-class
+// TODO Not sure if reusing for SnapShotValue is good idea
+/**
+ * Regular Log Map Value.
+ */
+@Beta
+@Immutable
+public final class ByteValue implements LogValue, SnapShotValue {
+
+ private final byte[] bytes;
+
+ /**
+ * Construct ByteValue.
+ *
+ * @param bytes must not be null
+ */
+ public ByteValue(final byte[] bytes) {
+ this.bytes = Arrays.copyOf(checkNotNull(bytes), bytes.length);
+ }
+
+ /**
+ * Gets the stored bytes.
+ *
+ * @return serialized bytes
+ */
+ public byte[] getBytes() {
+ return Arrays.copyOf(bytes, bytes.length);
+ }
+
+ // TODO toString?
+}
diff --git a/src/main/java/net/onrc/onos/core/util/distributed/sharedlog/LogEventListener.java b/src/main/java/net/onrc/onos/core/util/distributed/sharedlog/LogEventListener.java
new file mode 100644
index 0000000..51f9f85
--- /dev/null
+++ b/src/main/java/net/onrc/onos/core/util/distributed/sharedlog/LogEventListener.java
@@ -0,0 +1,19 @@
+package net.onrc.onos.core.util.distributed.sharedlog;
+
+import com.google.common.annotations.Beta;
+
+/**
+ * Listener interface for SharedLogObject consumer.
+ */
+@Beta
+public interface LogEventListener {
+
+ // TODO Whether to expose logValue is TBD
+ // if exposed, one may manually apply logValue without going through runtime
+ /**
+ * Notification for .
+ *
+ * @param seq updated log entry sequence number
+ */
+ public void logAdded(SeqNum seq/*, ByteValue logValue*/);
+}
diff --git a/src/main/java/net/onrc/onos/core/util/distributed/sharedlog/SeqNum.java b/src/main/java/net/onrc/onos/core/util/distributed/sharedlog/SeqNum.java
new file mode 100644
index 0000000..480138b
--- /dev/null
+++ b/src/main/java/net/onrc/onos/core/util/distributed/sharedlog/SeqNum.java
@@ -0,0 +1,197 @@
+package net.onrc.onos.core.util.distributed.sharedlog;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import javax.annotation.concurrent.Immutable;
+
+import com.google.common.annotations.Beta;
+import com.google.common.primitives.Longs;
+import com.google.common.primitives.UnsignedLong;
+import com.google.common.primitives.UnsignedLongs;
+
+// TODO register to Kryo
+/**
+ * Sequence number used as log entry key.
+ */
+@Beta
+@Immutable
+public final class SeqNum extends Number implements Comparable<SeqNum> {
+
+ /**
+ * Long value reserved for {@link #INITIAL}.
+ */
+ public static final long ZERO = 0L;
+
+ /**
+ * Special sequence number which will never have LogValue.
+ */
+ public static final SeqNum INITIAL = new SeqNum(ZERO);
+
+
+ private final long seq;
+
+ /**
+ * Constructor.
+ *
+ * @param number sequence number.
+ */
+ protected SeqNum(final long number) {
+ this.seq = number;
+ }
+
+ /**
+ * Gets an instance for specified number.
+ *
+ * @param number must not be {@link #ZERO}
+ * @return SeqNum
+ *
+ * @throws IllegalArgumentException if given number was {@link #ZERO}
+ */
+ public static SeqNum valueOf(final long number) {
+ checkArgument(number != ZERO);
+ return new SeqNum(number);
+ }
+
+ // TODO give better name?
+ /**
+ * Gets an instance for specified number.
+ *
+ * @param number sequence number. Can be {@link #ZERO}
+ * @return SeqNum
+ */
+ public static SeqNum anyValueOf(final long number) {
+ return new SeqNum(number);
+ }
+
+ /**
+ * Gets the next sequence number.
+ * <p>
+ * WARN: This is not a atomic sequencer,
+ * this method just returns the next number in sequence.
+ *
+ * @return next sequence number
+ */
+ public SeqNum next() {
+ return step(1);
+ }
+
+ /**
+ * Gets the previous sequence number.
+ * <p>
+ * WARN: This is not a atomic sequencer,
+ * this method just returns the previous number in sequence.
+ *
+ * @return prev sequence number
+ */
+ public SeqNum prev() {
+ return step(-1);
+ }
+
+ /**
+ * Gets the sequence number stepping forward/backward by {@code delta}.
+ *
+ * @param delta step
+ * @return sequence number
+ */
+ public SeqNum step(long delta) {
+ long next = seq + delta;
+ if (next == SeqNum.ZERO) {
+ if (delta >= 0) {
+ return SeqNum.valueOf(next + 1);
+ } else {
+ // XXX Revisit this behavior
+ return SeqNum.valueOf(next - 1);
+ }
+ }
+ return SeqNum.valueOf(next);
+ }
+
+ // TODO comparator which treats long as ring?
+
+ @Override
+ public int intValue() {
+ return UnsignedLong.valueOf(seq).intValue();
+ }
+
+ @Override
+ public long longValue() {
+ return seq;
+ }
+
+ @Override
+ public float floatValue() {
+ return UnsignedLong.valueOf(seq).floatValue();
+ }
+
+ @Override
+ public double doubleValue() {
+ return UnsignedLong.valueOf(seq).doubleValue();
+ }
+
+ @Override
+ public int hashCode() {
+ return Longs.hashCode(seq);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null) {
+ return false;
+ }
+ if (getClass() != obj.getClass()) {
+ return false;
+ }
+ SeqNum other = (SeqNum) obj;
+ return seq == other.seq;
+ }
+
+ @Override
+ public String toString() {
+ return UnsignedLongs.toString(seq);
+ }
+
+ @Override
+ public int compareTo(final SeqNum o) {
+ checkNotNull(o);
+
+ return Long.signum(-this.distance(o));
+ }
+
+ /**
+ * Returns the distance between other and this SeqNum.
+ *
+ * @param other other SeqNum
+ * @return {@code other - this}
+ * or -Long.MAX_VALUE, Log.MAX_VALUE if too far apart.
+ */
+ public long distance(final SeqNum other) {
+ checkNotNull(other);
+
+ // distance from INITIAL is always measured clockwise on the ring
+ if (this.equals(INITIAL)) {
+ return (other.seq >= 0) ? other.seq : Long.MAX_VALUE;
+ } else if (other.equals(INITIAL)) {
+ return (this.seq >= 0) ? -this.seq : -Long.MAX_VALUE;
+ }
+
+ /// other cases measures using "shorter" half of the ring
+ final long diff = other.seq - this.seq;
+ if (diff == Long.MIN_VALUE) {
+ // both arc is same distance
+ // treat arc including INITIAL as shorter
+ if (this.seq < 0) {
+ // clock wise arc contain INITIAL
+ return Long.MAX_VALUE;
+ } else {
+ // counter clock wise arc contain INITIAL
+ return -Long.MAX_VALUE;
+ }
+ } else {
+ return diff;
+ }
+ }
+}
diff --git a/src/main/java/net/onrc/onos/core/util/distributed/sharedlog/Sequencer.java b/src/main/java/net/onrc/onos/core/util/distributed/sharedlog/Sequencer.java
new file mode 100644
index 0000000..fa63c33
--- /dev/null
+++ b/src/main/java/net/onrc/onos/core/util/distributed/sharedlog/Sequencer.java
@@ -0,0 +1,27 @@
+package net.onrc.onos.core.util.distributed.sharedlog;
+
+import javax.annotation.concurrent.ThreadSafe;
+
+import com.google.common.annotations.Beta;
+
+/**
+ * Sequencer for LogBasedRuntime.
+ */
+@Beta
+@ThreadSafe
+public interface Sequencer {
+
+ /**
+ * Gets the current sequence number.
+ *
+ * @return current sequence number
+ */
+ public SeqNum get();
+
+ /**
+ * Gets the next sequence number.
+ *
+ * @return next sequence number
+ */
+ public SeqNum next();
+}
diff --git a/src/main/java/net/onrc/onos/core/util/distributed/sharedlog/SharedLogObject.java b/src/main/java/net/onrc/onos/core/util/distributed/sharedlog/SharedLogObject.java
new file mode 100644
index 0000000..75c155f
--- /dev/null
+++ b/src/main/java/net/onrc/onos/core/util/distributed/sharedlog/SharedLogObject.java
@@ -0,0 +1,129 @@
+package net.onrc.onos.core.util.distributed.sharedlog;
+
+import javax.annotation.concurrent.GuardedBy;
+import javax.annotation.concurrent.NotThreadSafe;
+
+import org.apache.commons.lang3.tuple.ImmutablePair;
+
+import com.google.common.annotations.Beta;
+
+import net.onrc.onos.core.util.distributed.sharedlog.internal.LogValue;
+import net.onrc.onos.core.util.distributed.sharedlog.internal.SnapShotValue;
+
+// TODO Should this be abstract class instead, to hide apply/isApplicable?
+// MEMO: Something SharedObject implementor defines
+/**
+ * Shared object backed by shared log structure.
+ *
+ * Object state must be updated only by {@link #apply(SeqNum, LogValue)}
+ */
+@Beta
+@NotThreadSafe // FIXME remove when we make these objects thread safe.
+public interface SharedLogObject {
+
+ /**
+ * ID of this Shared Object.
+ *
+ * @return SharedLogObjectID
+ */
+// @ThreadSafe // TODO find annotation for method or add javadoc
+ public SharedLogObjectID getObjectID();
+
+ /**
+ * Gets the current log sequence number this instance is on.
+ *
+ * @return Log version of this object.
+ */
+// @ThreadSafe
+ public SeqNum getSeqNum();
+
+ // FIXME Is there a good way to ensure this will be called only by Runtime
+ /**
+ * Apply changes to the shared object.
+ * <p/>
+ * Developer implementing shared object must implement this method to:
+ * <ul>
+ * <li>update local instance based on given logValue if ByteValue.</li>
+ * <li>update SeqNum to given seq.</li>
+ * </ul>
+ * <p/>
+ * Developer must also ensure that this method will never fail.
+ * Any potential error check should be checked beforehand on
+ * {@link #isApplicable(SeqNum, ByteValue)} call.
+ * <p/>
+ * Modification to this shared object instance should only happen
+ * inside this method.
+ * <p/>
+ * This method should only be called by the runtime.
+ *
+ * This method will be called as a side-effect of calling
+ * {@link net.onrc.onos.core.util.distributed.sharedlog.runtime
+ * .LogBasedRuntime#queryHelper(SharedLogObject)
+ * LogBasedRuntime#queryHelper(SharedLogObject)}.
+ *
+ * @param seq sequence number of the LogValue
+ * @param logValue {@link ByteValue} to apply or NoOp
+ */
+ @GuardedBy("acquireWriteLock()")
+ void apply(final SeqNum seq, final LogValue logValue);
+
+ /**
+ * Tests if given LogValue is applicable to this .
+ * <p/>
+ * This method will be called before {@link #apply(SeqNum, LogValue)} call.
+ * This method should be implemented to be side-effect free.
+ *
+ * @param seq sequence number of the LogValue
+ * @param logValue LogValue to test
+ * @return true if {@code data} is applicable
+ */
+ @GuardedBy("acquireWriteLock()")
+ public boolean isApplicable(final SeqNum seq, final ByteValue logValue);
+
+
+ // TODO give me better name for SharedLogObject#reset
+ /**
+ * Resets the object to specified snapshot value.
+ *
+ * @param seq Log version of this snapshot
+ * @param ssValue snapshot {@link ByteValue} to apply or NoOp representing initial state.
+ */
+ @GuardedBy("acquireWriteLock()")
+ void reset(final SeqNum seq, final SnapShotValue ssValue);
+
+ /**
+ * Creates a snapshot value of current object.
+ *
+ * @return (current log version, snapshot value)
+ */
+ @GuardedBy("acquireReadLock()")
+ ImmutablePair<SeqNum, ? extends SnapShotValue> createSnapshot();
+
+ /**
+ * Acquires read lock for this object.
+ * <p/>
+ * Note: Lock implementation must be reentrant.
+ */
+ public void acquireReadLock();
+
+ /**
+ * Releases read lock for this object.
+ * <p/>
+ * Note: Lock implementation must be reentrant.
+ */
+ public void releaseReadLock();
+
+ /**
+ * Acquires write lock for this object.
+ * <p/>
+ * Note: Lock implementation must be reentrant.
+ */
+ public void acquireWriteLock();
+
+ /**
+ * Releases write lock for this object.
+ * <p/>
+ * Note: Lock implementation must be reentrant.
+ */
+ public void releaseWriteLock();
+}
diff --git a/src/main/java/net/onrc/onos/core/util/distributed/sharedlog/SharedLogObjectID.java b/src/main/java/net/onrc/onos/core/util/distributed/sharedlog/SharedLogObjectID.java
new file mode 100644
index 0000000..abf9bf0
--- /dev/null
+++ b/src/main/java/net/onrc/onos/core/util/distributed/sharedlog/SharedLogObjectID.java
@@ -0,0 +1,19 @@
+package net.onrc.onos.core.util.distributed.sharedlog;
+
+import com.google.common.annotations.Beta;
+
+/**
+ * ID object to identify each SharedLogObject.
+ * <p/>
+ * Class implementing this interface must implement equals and hashCode.
+ */
+@Beta
+public interface SharedLogObjectID {
+
+ /**
+ * String name for SharedLogObject.
+ *
+ * @return String name for SharedLogObject
+ */
+ public String getObjectName();
+}
diff --git a/src/main/java/net/onrc/onos/core/util/distributed/sharedlog/example/LogAtomicLong.java b/src/main/java/net/onrc/onos/core/util/distributed/sharedlog/example/LogAtomicLong.java
new file mode 100644
index 0000000..b2bad85
--- /dev/null
+++ b/src/main/java/net/onrc/onos/core/util/distributed/sharedlog/example/LogAtomicLong.java
@@ -0,0 +1,384 @@
+package net.onrc.onos.core.util.distributed.sharedlog.example;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import net.onrc.onos.core.datastore.utils.KryoSerializer;
+import net.onrc.onos.core.datastore.utils.Serializer;
+import net.onrc.onos.core.util.distributed.sharedlog.ByteValue;
+import net.onrc.onos.core.util.distributed.sharedlog.SeqNum;
+import net.onrc.onos.core.util.distributed.sharedlog.SharedLogObject;
+import net.onrc.onos.core.util.distributed.sharedlog.SharedLogObjectID;
+import net.onrc.onos.core.util.distributed.sharedlog.exception.LogNotApplicable;
+import net.onrc.onos.core.util.distributed.sharedlog.exception.LogWriteTimedOut;
+import net.onrc.onos.core.util.distributed.sharedlog.internal.LogValue;
+import net.onrc.onos.core.util.distributed.sharedlog.internal.NoOp;
+import net.onrc.onos.core.util.distributed.sharedlog.internal.SnapShotValue;
+import net.onrc.onos.core.util.distributed.sharedlog.runtime.LogBasedRuntime;
+
+/**
+ * Example implementing CAS-able Long as {@link SharedLogObject}.
+ * <p>
+ * This is just an example implementation of SharedLogObject,
+ * not intended for actual use.
+ */
+public class LogAtomicLong implements SharedLogObject {
+
+ private static final Logger log = LoggerFactory
+ .getLogger(LogAtomicLong.class);
+
+ private static final Serializer SERIALIZER = new KryoSerializer(
+ SetEvent.class,
+ CompareAndSetEvent.class);
+
+
+ private final LogBasedRuntime runtime;
+ private final SharedLogObjectID oid;
+
+ private volatile SeqNum current;
+ private long value;
+
+ // adding volatile to `value` is good-enough for this example in reality
+ private ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
+ private Lock readLock = readWriteLock.readLock();
+ private Lock writeLock = readWriteLock.writeLock();
+
+
+ /**
+ * Constructor to create instance synced to latest snapshot.
+ *
+ * @param runtime {@link LogBasedRuntime} to use
+ * @param name name of this {@link SharedLogObject}
+ */
+ public LogAtomicLong(LogBasedRuntime runtime, String name) {
+ this(runtime, checkNotNull(runtime).getOid(name));
+ }
+
+ /**
+ * Constructor to create instance synced to latest snapshot.
+ *
+ * @param runtime {@link LogBasedRuntime} to use
+ * @param name name of this {@link SharedLogObject}
+ * @param ssId snap shot ID to initialize to
+ */
+ public LogAtomicLong(LogBasedRuntime runtime, String name, SeqNum ssId) {
+ this(runtime, checkNotNull(runtime).getOid(name), ssId);
+ }
+
+ /**
+ * Constructor to create instance synced to latest snapshot.
+ *
+ * @param runtime {@link LogBasedRuntime} to use
+ * @param oid {@link SharedLogObjectID} of this object
+ */
+ protected LogAtomicLong(LogBasedRuntime runtime, SharedLogObjectID oid) {
+ this(runtime, oid, checkNotNull(runtime).getLatestSnapShotId(oid));
+ }
+
+ /**
+ * Constructor to initialize to specified snap shot.
+ *
+ * @param runtime {@link LogBasedRuntime} to use
+ * @param oid {@link SharedLogObjectID} of this object
+ * @param ssId snap shot ID to initialize to
+ */
+ protected LogAtomicLong(LogBasedRuntime runtime, SharedLogObjectID oid, SeqNum ssId) {
+ this.runtime = checkNotNull(runtime);
+ this.oid = checkNotNull(oid);
+ boolean success = runtime.resetToSnapShot(this, ssId);
+ if (!success) {
+ throw new IllegalStateException(
+ "Failed to initialize to specified Snapshot " + ssId);
+ }
+ }
+
+
+ /**
+ * LogMessage for {@link LogAtomicLong#set(long)}.
+ */
+ private static final class SetEvent {
+
+ private final long newValue;
+
+ /**
+ * Constructor.
+ *
+ * @param newValue value to set
+ */
+ public SetEvent(long newValue) {
+ this.newValue = newValue;
+ }
+
+ /**
+ * Gets the new value to set.
+ *
+ * @return new value
+ */
+ public long getNewValue() {
+ return newValue;
+ }
+
+ /**
+ * Default constructor for deserializer.
+ */
+ @SuppressWarnings("unused")
+ @Deprecated
+ public SetEvent() {
+ this.newValue = 0L;
+ }
+ }
+
+ /**
+ * LogMessage for {@link LogAtomicLong#compareAndSet(long, long)}.
+ */
+ private static final class CompareAndSetEvent {
+
+ private final long expect;
+ private final long update;
+
+ /**
+ * Constructor.
+ *
+ * @param expect expected value
+ * @param update new value
+ */
+ public CompareAndSetEvent(long expect, long update) {
+ this.expect = expect;
+ this.update = update;
+ }
+
+ /**
+ * Gets the expected value.
+ *
+ * @return expected value
+ */
+ public long getExpect() {
+ return expect;
+ }
+
+ /**
+ * Gets the new value to update.
+ *
+ * @return new value to update
+ */
+ public long getUpdate() {
+ return update;
+ }
+
+ /**
+ * Default constructor for deserializer.
+ */
+ @SuppressWarnings("unused")
+ @Deprecated
+ public CompareAndSetEvent() {
+ this.expect = 0L;
+ this.update = 0L;
+ }
+ }
+
+
+ /**
+ * Sets the specified value regardless of current value.
+ *
+ * @param newValue new value to set
+ */
+ public void set(long newValue) {
+ ByteValue newLog = serialize(new SetEvent(newValue));
+
+ try {
+ // no need to replay to latest log (3rd option)
+ runtime.updateHelper(this, newLog, false);
+ } catch (LogNotApplicable e) {
+ log.error("Should never happen", e);
+ } catch (LogWriteTimedOut e) {
+ log.warn("Timeout, retrying", e);
+ set(newValue);
+ }
+ }
+
+ /**
+ * Compare and set new value.
+ *
+ * @param expect value expected
+ * @param update new value
+ * @return true if successfully updated
+ */
+ public boolean compareAndSet(long expect, long update) {
+ ByteValue newLog = serialize(new CompareAndSetEvent(expect, update));
+
+ try {
+ runtime.updateHelper(this, newLog, true);
+ return true;
+ } catch (LogNotApplicable e) {
+ return false;
+ } catch (LogWriteTimedOut e) {
+ log.warn("Timeout, retrying", e);
+ return compareAndSet(expect, update);
+ }
+ }
+
+ /**
+ * Serializes SetEvent to ByteValue.
+ *
+ * @param event {@link SetEvent}
+ * @return {@link ByteValue}
+ */
+ private ByteValue serialize(SetEvent event) {
+ return new ByteValue(SERIALIZER.serialize(event));
+ }
+
+ /**
+ * Serializes CompareAndSetEvent to ByteValue.
+ *
+ * @param event {@link CompareAndSetEvent}
+ * @return {@link ByteValue}
+ */
+ private ByteValue serialize(CompareAndSetEvent event) {
+ return new ByteValue(SERIALIZER.serialize(event));
+ }
+
+ /**
+ * Gets the current value.
+ *
+ * @return current value
+ */
+ public long get() {
+ runtime.queryHelper(this);
+ acquireReadLock();
+ try {
+ return value;
+ } finally {
+ releaseReadLock();
+ }
+ }
+
+ @Override
+ public String toString() {
+ return Long.toString(get());
+ }
+
+ @Override
+ public SharedLogObjectID getObjectID() {
+ return this.oid;
+ }
+
+ @Override
+ public SeqNum getSeqNum() {
+ return this.current;
+ }
+
+ @GuardedBy("writeLock")
+ @Override
+ public void apply(SeqNum seq, LogValue logValue) {
+ // This should be the only method modifying this instance's field
+
+ if (logValue instanceof ByteValue) {
+ ByteValue byteValue = (ByteValue) logValue;
+ final Object event = SERIALIZER.deserialize(byteValue.getBytes());
+ if (event instanceof SetEvent) {
+ applySetEvent(seq, (SetEvent) event);
+ }
+ if (event instanceof CompareAndSetEvent) {
+ applyCompareAndSetEvent(seq, (CompareAndSetEvent) event);
+ }
+
+ } else if (logValue instanceof NoOp) {
+ this.current = seq;
+ }
+ }
+
+ /**
+ * Applies {@link CompareAndSetEvent}.
+ *
+ * @param seq sequence number of event
+ * @param casEvent {@link CompareAndSetEvent}
+ */
+ @GuardedBy("writeLock")
+ private void applyCompareAndSetEvent(SeqNum seq, final CompareAndSetEvent casEvent) {
+ assert (this.value == casEvent.expect);
+ this.current = seq;
+ this.value = casEvent.getUpdate();
+ }
+
+ /**
+ * Applies {@link SetEvent}.
+ *
+ * @param seq sequence number of event
+ * @param setEvent {@link SetEvent}
+ */
+ @GuardedBy("writeLock")
+ private void applySetEvent(SeqNum seq, final SetEvent setEvent) {
+ this.current = seq;
+ this.value = setEvent.getNewValue();
+ }
+
+ @GuardedBy("writeLock")
+ @Override
+ public void reset(SeqNum seq, SnapShotValue ssValue) {
+ if (ssValue instanceof ByteValue) {
+ ByteValue byteValue = (ByteValue) ssValue;
+ final Object event = SERIALIZER.deserialize(byteValue.getBytes());
+ if (event instanceof SetEvent) {
+ applySetEvent(seq, (SetEvent) event);
+ } else {
+ log.error("Unexpected SnapShot ByteValue encountered {}", event);
+ }
+
+ } else if (ssValue instanceof NoOp) {
+ this.current = seq;
+ this.value = 0L;
+ } else {
+ log.error("Unexpected SnapShotValue encountered {}", ssValue);
+ }
+ }
+
+ @GuardedBy("readLock")
+ @Override
+ public ImmutablePair<SeqNum, ? extends SnapShotValue> createSnapshot() {
+ return ImmutablePair.of(getSeqNum(), serialize(new SetEvent(get())));
+ }
+
+ @GuardedBy("readLock")
+ @Override
+ public boolean isApplicable(SeqNum seq, ByteValue logValue) {
+
+ final Object event = SERIALIZER.deserialize(logValue.getBytes());
+ if (event instanceof SetEvent) {
+ return true;
+ }
+ if (event instanceof CompareAndSetEvent) {
+ CompareAndSetEvent casEvent = (CompareAndSetEvent) event;
+ return casEvent.getExpect() == value;
+ }
+ return false;
+ }
+
+ @Override
+ public void acquireReadLock() {
+ this.readLock.lock();
+ }
+
+ @Override
+ public void releaseReadLock() {
+ this.readLock.unlock();
+ }
+
+ @Override
+ public void acquireWriteLock() {
+ this.writeLock.lock();
+ }
+
+ @Override
+ public void releaseWriteLock() {
+ this.writeLock.unlock();
+ }
+}
diff --git a/src/main/java/net/onrc/onos/core/util/distributed/sharedlog/exception/LogNotApplicable.java b/src/main/java/net/onrc/onos/core/util/distributed/sharedlog/exception/LogNotApplicable.java
new file mode 100644
index 0000000..f41f11e
--- /dev/null
+++ b/src/main/java/net/onrc/onos/core/util/distributed/sharedlog/exception/LogNotApplicable.java
@@ -0,0 +1,51 @@
+package net.onrc.onos.core.util.distributed.sharedlog.exception;
+
+import com.google.common.annotations.Beta;
+
+/**
+ * Exception thrown, when log cannot be applied to the shared log object.
+ */
+@Beta
+public class LogNotApplicable extends Exception {
+
+ private static final long serialVersionUID = 670547401260137360L;
+
+ /**
+ * {@link Exception#Exception()}.
+ */
+ public LogNotApplicable() {
+ super();
+ }
+
+ /**
+ * Constructs a new exception with the specified detail message.
+ * {@link Exception#Exception(String)}
+ *
+ * @param message failure description
+ */
+ public LogNotApplicable(String message) {
+ super(message);
+ }
+
+ /**
+ * Constructs a new exception with the specified cause.
+ * {@link Exception#Exception(Throwable)}
+ *
+ * @param cause exception causing this.
+ */
+ public LogNotApplicable(Throwable cause) {
+ super(cause);
+ }
+
+ /**
+ * Constructs a new exception with the specified detail message and cause.
+ * {@link Exception#Exception(String, Throwable)}
+ *
+ * @param message failure description
+ * @param cause exception causing this.
+ */
+ public LogNotApplicable(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+}
diff --git a/src/main/java/net/onrc/onos/core/util/distributed/sharedlog/exception/LogNotContiguous.java b/src/main/java/net/onrc/onos/core/util/distributed/sharedlog/exception/LogNotContiguous.java
new file mode 100644
index 0000000..a68ec4a
--- /dev/null
+++ b/src/main/java/net/onrc/onos/core/util/distributed/sharedlog/exception/LogNotContiguous.java
@@ -0,0 +1,65 @@
+package net.onrc.onos.core.util.distributed.sharedlog.exception;
+
+import com.google.common.annotations.Beta;
+
+import net.onrc.onos.core.util.distributed.sharedlog.SeqNum;
+
+/**
+ * Exception thrown, when log can no longer be replayed.
+ * Caller need to jump to the available snapshot and restart from snapshot.
+ */
+@Beta
+public class LogNotContiguous extends Exception {
+
+ /**
+ * Construct a new exception.
+ *
+ * @param failed sequence number which failed
+ */
+ public LogNotContiguous(final SeqNum failed) {
+ this(failed + " cannot be read");
+ }
+
+ /**
+ * Constructs a new exception with the specified detail message.
+ * {@link Exception#Exception(String)}
+ *
+ * @param message failure description
+ */
+ protected LogNotContiguous(String message) {
+ super(message);
+ }
+
+ /**
+ * Constructs a new exception with the specified cause.
+ * {@link Exception#Exception(Throwable)}
+ *
+ * @param failed sequence number which failed
+ * @param cause exception causing this.
+ */
+ public LogNotContiguous(SeqNum failed, Throwable cause) {
+ this(failed + " cannot be read", cause);
+ }
+
+ /**
+ * Constructs a new exception with the specified cause.
+ * {@link Exception#Exception(Throwable)}
+ *
+ * @param cause exception causing this.
+ */
+ protected LogNotContiguous(Throwable cause) {
+ super(cause);
+ }
+
+ /**
+ * Constructs a new exception with the specified detail message and cause.
+ * {@link Exception#Exception(String, Throwable)}
+ *
+ * @param message failure description
+ * @param cause exception causing this.
+ */
+ public LogNotContiguous(String message, Throwable cause) {
+ super(message, cause);
+ // TODO Auto-generated constructor stub
+ }
+}
diff --git a/src/main/java/net/onrc/onos/core/util/distributed/sharedlog/exception/LogWriteTimedOut.java b/src/main/java/net/onrc/onos/core/util/distributed/sharedlog/exception/LogWriteTimedOut.java
new file mode 100644
index 0000000..4e9c38d
--- /dev/null
+++ b/src/main/java/net/onrc/onos/core/util/distributed/sharedlog/exception/LogWriteTimedOut.java
@@ -0,0 +1,47 @@
+package net.onrc.onos.core.util.distributed.sharedlog.exception;
+
+import com.google.common.annotations.Beta;
+
+/**
+ * Exception thrown, when allocated sequence number timed out.
+ */
+@Beta
+public class LogWriteTimedOut extends Exception {
+
+ /**
+ * {@link Exception#Exception()}.
+ */
+ public LogWriteTimedOut() {
+ }
+
+ /**
+ * Constructs a new exception with the specified detail message.
+ * {@link Exception#Exception(String)}
+ *
+ * @param message failure description
+ */
+ public LogWriteTimedOut(String message) {
+ super(message);
+ }
+
+ /**
+ * Constructs a new exception with the specified cause.
+ * {@link Exception#Exception(Throwable)}
+ *
+ * @param cause exception causing this.
+ */
+ public LogWriteTimedOut(Throwable cause) {
+ super(cause);
+ }
+
+ /**
+ * Constructs a new exception with the specified detail message and cause.
+ * {@link Exception#Exception(String, Throwable)}
+ *
+ * @param message failure description
+ * @param cause exception causing this.
+ */
+ public LogWriteTimedOut(String message, Throwable cause) {
+ super(message, cause);
+ }
+}
diff --git a/src/main/java/net/onrc/onos/core/util/distributed/sharedlog/hazelcast/HazelcastRuntime.java b/src/main/java/net/onrc/onos/core/util/distributed/sharedlog/hazelcast/HazelcastRuntime.java
new file mode 100644
index 0000000..3becaa9
--- /dev/null
+++ b/src/main/java/net/onrc/onos/core/util/distributed/sharedlog/hazelcast/HazelcastRuntime.java
@@ -0,0 +1,621 @@
+package net.onrc.onos.core.util.distributed.sharedlog.hazelcast;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkArgument;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import net.onrc.onos.core.util.distributed.sharedlog.ByteValue;
+import net.onrc.onos.core.util.distributed.sharedlog.LogEventListener;
+import net.onrc.onos.core.util.distributed.sharedlog.SeqNum;
+import net.onrc.onos.core.util.distributed.sharedlog.Sequencer;
+import net.onrc.onos.core.util.distributed.sharedlog.SharedLogObject;
+import net.onrc.onos.core.util.distributed.sharedlog.SharedLogObjectID;
+import net.onrc.onos.core.util.distributed.sharedlog.exception.LogNotApplicable;
+import net.onrc.onos.core.util.distributed.sharedlog.exception.LogWriteTimedOut;
+import net.onrc.onos.core.util.distributed.sharedlog.internal.LogValue;
+import net.onrc.onos.core.util.distributed.sharedlog.internal.SnapShotValue;
+import net.onrc.onos.core.util.distributed.sharedlog.internal.StringID;
+import net.onrc.onos.core.util.distributed.sharedlog.internal.NoOp;
+import net.onrc.onos.core.util.distributed.sharedlog.runtime.LogBasedRuntime;
+import net.onrc.onos.core.util.distributed.sharedlog.runtime.SequencerRuntime;
+
+import org.apache.commons.lang3.concurrent.ConcurrentException;
+import org.apache.commons.lang3.concurrent.ConcurrentInitializer;
+import org.apache.commons.lang3.concurrent.ConcurrentUtils;
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.joda.time.DateTime;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.Beta;
+import com.hazelcast.core.HazelcastInstance;
+import com.hazelcast.core.IAtomicLong;
+import com.hazelcast.core.IFunction;
+import com.hazelcast.core.IMap;
+import com.hazelcast.nio.ObjectDataInput;
+import com.hazelcast.nio.ObjectDataOutput;
+import com.hazelcast.nio.serialization.DataSerializable;
+
+
+/**
+ * LogBasedRuntime using Hazelcast.
+ */
+@Beta
+public class HazelcastRuntime implements LogBasedRuntime {
+
+ private static final Logger log = LoggerFactory
+ .getLogger(HazelcastRuntime.class);
+
+ // Hazelcast distributed object name prefixes
+ private static final String LATEST_SNAPSHOT_PREFIX = "latestSS://";
+ private static final String SNAPSHOTMAP_PREFIX = "ssmap://";
+ private static final String LOGMAP_PREFIX = "logmap://";
+
+ // FIXME set appropriate timeout & make it configurable
+ private static final long LOG_READ_TIMEOUT_MS = 1000L;
+
+ // FIXME make these configurable from file, etc.
+ /**
+ * Number of times to retry, if write to LogMap failed due to
+ * other node invalidated the sequencer due to timeout.
+ */
+ private static final int UPDATE_RETRIES = 5;
+
+ /**
+ * Interval to check if we should create SnapShot.
+ * <p>
+ * If the sequence number of the log just written is multiple of this value,
+ * SnapShot check and log cleanup will be triggered.
+ */
+ private static final int SNAPSHOT_CHECK_INTERVAL = 50;
+
+ /**
+ * Interval SnapShot should be created.
+ * <p>
+ * SnapShot will be created if latest SnapShot was more than
+ * this interval apart from last written.
+ */
+ public static final int SNAPSHOT_INTERVAL = 500;
+
+ /**
+ * Maximum number of Snapshots to store in distributed Map.
+ */
+ public static final int MAX_SNAPSHOTS = 10;
+
+ // TODO create ThreadFactory to assign thread name and lower priority
+ // executor for SnapShot builder
+ private static final ExecutorService EXECUTOR =
+ new ThreadPoolExecutor(1, 5, 3L, TimeUnit.MINUTES,
+ new ArrayBlockingQueue<Runnable>(10),
+ new ThreadPoolExecutor.DiscardOldestPolicy());
+
+ private final HazelcastInstance instance;
+ private final SequencerRuntime sequencerRuntime;
+
+ private final ConcurrentMap<SharedLogObjectID, LogMapManager>
+ listenerMap;
+
+ /**
+ * Initializes Hazelcast based LogRuntime.
+ *
+ * @param instance Hazelcast instance to use
+ * @param sequencerRuntime {@link SequencerRuntime} to use
+ */
+ public HazelcastRuntime(HazelcastInstance instance,
+ SequencerRuntime sequencerRuntime) {
+
+ this.instance = checkNotNull(instance);
+ this.sequencerRuntime = checkNotNull(sequencerRuntime);
+ this.listenerMap = new ConcurrentHashMap<>();
+ }
+
+ @Override
+ public SharedLogObjectID getOid(String sharedObjectName) {
+
+ // TODO Create specialized SharedLogObject, if getting IMap from
+ // HZ instance each time becomes a significant overhead.
+
+ return new StringID(sharedObjectName);
+ }
+
+ @Override
+ public SeqNum updateHelper(SharedLogObject sobj, ByteValue logValue,
+ boolean queryBeforeUpdate)
+ throws LogNotApplicable, LogWriteTimedOut {
+
+ return updateHelper(sobj, logValue, queryBeforeUpdate, UPDATE_RETRIES);
+ }
+
+ /**
+ * Apply given logValue to SharedLogObject.
+ *
+ * @param sobj SharedLogObject to manipulate.
+ * @param logValue LogValue to apply
+ * @param queryBeforeUpdate true if log should be replayed to latest before updating
+ * @param retries number of retries
+ * @return sequence number of the new log entry
+ *
+ * @throws LogNotApplicable Thrown when {@code logValue} was not applicable to
+ * SharedLogObject.
+ * @throws LogWriteTimedOut Thrown when write failed due to time out.
+ */
+ private SeqNum updateHelper(final SharedLogObject sobj, ByteValue logValue,
+ boolean queryBeforeUpdate, int retries)
+ throws LogNotApplicable, LogWriteTimedOut {
+
+ checkNotNull(sobj);
+ checkNotNull(logValue);
+
+ log.trace("updating {}@{} with {}", sobj.getObjectID(), sobj.getSeqNum(),
+ logValue);
+
+ final Sequencer sequencer = sequencerRuntime.getSequencer(sobj.getObjectID());
+ final IMap<SeqNum, LogValue> logMap = getLogMap(sobj);
+
+ // allocate seq #
+ final SeqNum allocated = sequencer.next();
+ log.trace("allocated {}", allocated);
+
+ // replay
+ if (queryBeforeUpdate) {
+ // FIXME How to handle initial case?
+ queryHelper(sobj, allocated.prev());
+ }
+
+ // test if applicable
+ sobj.acquireWriteLock();
+ try {
+ final boolean isApplicable = sobj.isApplicable(allocated, logValue);
+ if (!isApplicable) {
+ log.trace("log not applicable abondoning {}", allocated);
+ logMap.putIfAbsent(allocated, NoOp.VALUE);
+ throw new LogNotApplicable("Rejected by " + sobj.getObjectID());
+ }
+
+ // write to shared log
+ LogValue existing = logMap.putIfAbsent(allocated, logValue);
+ if (NoOp.VALUE.equals(existing)) {
+ if (retries > 0) {
+ log.trace("write failed due to time out retrying {}", retries);
+ return updateHelper(sobj, logValue, queryBeforeUpdate, retries - 1);
+ }
+ throw new LogWriteTimedOut("Was timed out by other node by " + sobj.getObjectID());
+ }
+
+ // apply to local object
+ sobj.apply(allocated, logValue);
+
+ // Success.
+
+ // FIXME Current design is that any SharedObject instance could
+ // become SnapShot writer/Log cleaner.
+ // We may need provide a way for a SharedObject instance
+ // to declare that it does not want to become
+ // a SnapShot writer (Don't wont unexpected ReadLock, etc.)
+
+ // give hint to snapshot builder
+ if (allocated.longValue() % SNAPSHOT_CHECK_INTERVAL == 0) {
+ // check and create snapshot in background
+ EXECUTOR.execute(new Runnable() {
+
+ @Override
+ public void run() {
+ createSnapShot(sobj);
+ }
+ });
+ }
+ return allocated;
+
+ } finally {
+ sobj.releaseWriteLock();
+ }
+ }
+
+ /**
+ * Updates latest snapshot pointer.
+ * <p>
+ * IFunction to apply to IAtomicLong used as latest snapshot pointer.
+ */
+ public static final class UpdateLatestSnapshot implements
+ IFunction<Long, Long>, DataSerializable {
+
+ private SeqNum ssCreated;
+
+ /**
+ * Updates latest snapshot pointer to {@code ssCreated}.
+ *
+ * @param ssCreated value to update to
+ */
+ public UpdateLatestSnapshot(SeqNum ssCreated) {
+ this.ssCreated = ssCreated;
+ }
+
+ @Override
+ public Long apply(Long input) {
+ final SeqNum in = SeqNum.anyValueOf(input);
+
+ if (ssCreated.compareTo(in) > 0) {
+ // update if snapshot written is >= existing latest SS.
+ return ssCreated.longValue();
+ } else {
+ return input;
+ }
+ }
+
+ @Override
+ public void writeData(ObjectDataOutput out) throws IOException {
+ out.writeLong(ssCreated.longValue());
+ }
+
+ @Override
+ public void readData(ObjectDataInput in) throws IOException {
+ this.ssCreated = SeqNum.valueOf(in.readLong());
+ }
+
+ /**
+ * Default constructor for deserialization.
+ */
+ public UpdateLatestSnapshot() {
+ this.ssCreated = SeqNum.INITIAL;
+ }
+ }
+
+ /**
+ * Creates a snapshot of {@link SharedLogObject} if
+ * latest snapshot is old.
+ *
+ * @param sobj {@link SharedLogObject} to create snapshot.
+ */
+ private void createSnapShot(SharedLogObject sobj) {
+ if (sobj == null) {
+ log.warn("Tried to create snapshot of null object. Ignoring.");
+ return;
+ }
+ log.trace("Checking if {} needs a new snapshot.", sobj.getObjectID());
+
+ // XXX lock can be more fine grained if necessary
+ final IAtomicLong rawLatest = getLatestSnapshotStore(sobj.getObjectID());
+ sobj.acquireReadLock();
+ try {
+ final SeqNum current = sobj.getSeqNum();
+ final SeqNum latest = SeqNum.anyValueOf(rawLatest.get());
+
+ // if distance(current - latest) > SNAPSHOT_INTERVAL
+ if (latest.distance(current) < SNAPSHOT_INTERVAL) {
+ // not the time to create snap shot yet
+ log.trace("Skip creating snapshot. current:{}, latest:{}", current, latest);
+ return;
+ }
+ // create SnapShot Bytes
+ final ImmutablePair<SeqNum, ? extends SnapShotValue> snapshot
+ = sobj.createSnapshot();
+
+ // write to SnapShot Map
+ IMap<SeqNum, SnapShotValue> ssMap = getSnapshotMap(sobj);
+ final SeqNum ssCreated = snapshot.getKey();
+ // XXX is there a way to do setIfAbsent
+ ssMap.set(ssCreated, snapshot.getValue());
+ log.info("Created snapshot. {}@{}", sobj.getObjectID(), ssCreated);
+
+ // update latest snapshot pointer
+ rawLatest.alter(new UpdateLatestSnapshot(ssCreated));
+
+ } finally {
+ sobj.releaseReadLock();
+ }
+
+ // XXX Should we be triggering log clean up in same context?
+ IMap<SeqNum, SnapShotValue> ssMap = getSnapshotMap(sobj);
+
+ // note: snapshots is a copy of keySet unlike usual Map
+ Set<SeqNum> snapshots = ssMap.keySet();
+ if (snapshots.size() > MAX_SNAPSHOTS) {
+ // FIXME we should be able to avoid prev() walking if we can
+ // always safely compare orders of SeqNums.
+
+ SeqNum latest = SeqNum.valueOf(rawLatest.get());
+ snapshots.remove(latest);
+ SeqNum remove = latest.prev();
+ while (snapshots.size() > MAX_SNAPSHOTS) {
+ snapshots.remove(remove);
+ remove = remove.prev();
+ }
+
+ // what's still left in `snapshots` is subject to removal.
+ // delete snapshots and find log SeqNum to start removing
+ SeqNum deleteBeforeThis = null;
+ for (SeqNum ss : snapshots) {
+ log.debug("Removed snapshot {}@{}", sobj.getObjectID(), ss);
+ // XXX If we decide to persist snapshots for reference,
+ // evict() ing here is another option.
+ ssMap.delete(ss);
+ if (deleteBeforeThis == null) {
+ deleteBeforeThis = ss;
+ } else {
+ if (deleteBeforeThis.compareTo(ss) > 0) {
+ deleteBeforeThis = ss;
+ }
+ }
+ }
+ if (deleteBeforeThis == null) {
+ // nothing to do
+ return;
+ }
+
+ // start removing log
+ IMap<SeqNum, LogValue> logMap = getLogMap(sobj);
+ // naive log cleaning
+ log.debug("Trimming log before {}@{}...", sobj.getObjectID(), deleteBeforeThis);
+ while (logMap.remove(deleteBeforeThis) != null) {
+ deleteBeforeThis = deleteBeforeThis.prev();
+ }
+ log.debug("Trimming log up to this{}@{}...", sobj.getObjectID(), deleteBeforeThis);
+ }
+ }
+
+ /**
+ * Gets the Log Map for specified shared object.
+ *
+ * @param sobj {@link SharedLogObject}
+ * @return Log Map
+ */
+ private IMap<SeqNum, LogValue> getLogMap(SharedLogObject sobj) {
+ return getLogMap(sobj.getObjectID());
+ }
+
+ /**
+ * Gets the Log Map for specified shared object ID.
+ *
+ * @param oid {@link SharedLogObjectID}
+ * @return Log Map
+ */
+ private IMap<SeqNum, LogValue> getLogMap(SharedLogObjectID oid) {
+ return instance.getMap(LOGMAP_PREFIX + oid.getObjectName());
+ }
+
+ /**
+ * Gets the Snapshot Map for specified shared object.
+ *
+ * @param sobj {@link SharedLogObject}
+ * @return Snapshot Map
+ */
+ private IMap<SeqNum, SnapShotValue> getSnapshotMap(SharedLogObject sobj) {
+ return instance.getMap(SNAPSHOTMAP_PREFIX + sobj.getObjectID().getObjectName());
+ }
+
+ /**
+ * Gets the store holding latest snapshot version.
+ *
+ * @param oid {@link SharedLogObjectID}
+ * @return IAtomicLong
+ */
+ private IAtomicLong getLatestSnapshotStore(SharedLogObjectID oid) {
+ return instance.getAtomicLong(LATEST_SNAPSHOT_PREFIX + oid.getObjectName());
+ }
+
+ @Override
+ public void queryHelper(SharedLogObject sobj) {
+ queryHelper(checkNotNull(sobj), sequencerRuntime.getSequencer(sobj.getObjectID()).get());
+ }
+
+ @Override
+ public void queryHelper(SharedLogObject sobj, SeqNum replayTo) {
+
+ checkNotNull(sobj);
+ checkNotNull(replayTo);
+ // TODO check if sobj.getSeqNum() <= replayTo
+
+ if (sobj.getSeqNum().equals(replayTo)) {
+ // nothing to do.
+ return;
+ }
+
+ log.trace("querying {}@{} to {}", sobj.getObjectID(), sobj.getSeqNum(),
+ replayTo);
+
+ IMap<SeqNum, LogValue> logMap = getLogMap(sobj);
+ final LogMapManager logCache = getLogMapManager(sobj.getObjectID());
+
+ // TODO Consider more fine grained lock if we ever need
+ // SharedLogObject to be exposed to heavy concurrent read/write.
+
+ sobj.acquireWriteLock();
+ try {
+ final SeqNum current = sobj.getSeqNum();
+
+ List<SeqNum> range = getSeqNumRange(current, replayTo);
+ Map<SeqNum, Future<LogValue>> values = new HashMap<>(range.size());
+
+ // FIXME use notification based cache
+
+ // pre-request all range first to pre-populate near cache
+ log.trace("Pre reading range: {}", range);
+ for (SeqNum key : range) {
+ Future<LogValue> value = logCache.getLogValue(key);
+ values.put(key, value);
+ }
+
+ // walk and apply range 1 by 1
+ for (SeqNum key : range) {
+ Future<LogValue> future = values.get(key);
+ LogValue value = null;
+
+ // FIXME handle Recycled Snapshot log entry scenario
+ // when the get result is null it could be waiting for writer
+ // or log has been trimmed.
+
+ final long deadline = System.currentTimeMillis()
+ + LOG_READ_TIMEOUT_MS;
+ if (log.isTraceEnabled()) {
+ log.trace("Try reading {} until {}", key, new DateTime(deadline));
+ }
+ while (System.currentTimeMillis() < deadline) {
+ try {
+ value = future.get(LOG_READ_TIMEOUT_MS,
+ TimeUnit.MILLISECONDS);
+ if (value == null) {
+ log.trace("{} was not there yet retrying.", key);
+ // value not set yet, retry;
+ future = logCache.getLogValue(key);
+ continue;
+ }
+ } catch (InterruptedException | ExecutionException e) {
+ log.error("queryError retrying", e);
+ future = logCache.getLogValue(key);
+ continue;
+ } catch (TimeoutException e) {
+ log.warn("query timed out", e);
+ }
+ }
+ if (value == null) {
+ log.trace("{} was not found, writer failed?", key);
+ // writer failure scenario handling
+ // mark sequence number null
+ LogValue newVal = logMap.put(key, NoOp.VALUE);
+ if (newVal == null) {
+ // skip this sequence number.
+ continue;
+ }
+ log.trace("{} appeared while invalidating.", newVal);
+ value = newVal;
+ }
+
+ log.trace("applying log {} {}", key, value);
+ // apply log value
+ sobj.apply(key, value);
+ }
+
+ } finally {
+ sobj.releaseWriteLock();
+ log.trace("query done");
+ }
+ }
+
+ /**
+ * Creates a List containing (afterThis, uptoThis].
+ *
+ * @param afterThis beginning of range not including this sequence number
+ * @param upToThis end of range including this sequence number
+ * @return List of SeqNum in order
+ */
+ private List<SeqNum> getSeqNumRange(SeqNum afterThis, SeqNum upToThis) {
+ checkArgument(!upToThis.equals(SeqNum.INITIAL),
+ "upToThis must not be INITIAL");
+
+ // TODO Wasting heap, etc.
+ // If iterable SeqNum is the only thing required,
+ // implement SeqNum range class, etc.
+ List<SeqNum> range = new ArrayList<>();
+ for (SeqNum i = afterThis.next(); !i.equals(upToThis); i = i.next()) {
+ range.add(i);
+ }
+ range.add(upToThis);
+ return range;
+ }
+
+ @Override
+ public void addListener(SharedLogObjectID oid, LogEventListener listener) {
+ LogMapManager hzListener = getLogMapManager(oid);
+ hzListener.addListener(listener);
+
+ getLogMap(oid).addEntryListener(hzListener, true);
+ }
+
+ @Override
+ public void removeListener(SharedLogObjectID oid, LogEventListener listener) {
+ LogMapManager hzListener = getLogMapManager(oid);
+ hzListener.removeListener(listener);
+ }
+
+ /**
+ * Gets the LogMapManager for given {@link SharedLogObjectID}.
+ * <p/>
+ * If listener was not registered, it will create and register a listener.
+ *
+ * @param oid {@link SharedLogObjectID}
+ * @return {@link LogMapManager}
+ */
+ private LogMapManager getLogMapManager(final SharedLogObjectID oid) {
+ LogMapManager listener
+ = ConcurrentUtils.createIfAbsentUnchecked(listenerMap, oid,
+ new ConcurrentInitializer<LogMapManager>() {
+ @Override
+ public LogMapManager get() throws ConcurrentException {
+ IMap<SeqNum, LogValue> logMap = getLogMap(oid);
+ return new LogMapManager(oid, logMap);
+ }
+ });
+ return listener;
+ }
+
+ @Override
+ public SeqNum getLatestSnapShotId(SharedLogObjectID oid) {
+ IAtomicLong latest = getLatestSnapshotStore(oid);
+ final long ssId = latest.get();
+ return SeqNum.anyValueOf(ssId);
+ }
+
+ @Override
+ public boolean resetToSnapShot(SharedLogObject sobj, SeqNum ssId) {
+ checkNotNull(ssId);
+
+ IMap<SeqNum, SnapShotValue> ssMap = getSnapshotMap(sobj);
+ SnapShotValue snapshot = ssMap.get(ssId);
+ if (snapshot == null) {
+ if (ssId.equals(SeqNum.INITIAL)) {
+ snapshot = NoOp.VALUE;
+ } else {
+ log.error("Invalid Snapshot version {}@{} specified",
+ sobj.getObjectID(), ssId);
+ return false;
+ }
+ }
+
+ sobj.reset(ssId, snapshot);
+
+ return true;
+ }
+
+ /**
+ * Gets the LogValues in range ({@code after}, {@code upToThis}].
+ *
+ * @param oid ID of SharedLogObject
+ * @param after sequence number before (typically current sequence number)
+ * @param upToThis last sequence number you want to retrieve
+ * @return List of {@link LogValue}s in specified range
+ */
+ @Override
+ public List<LogValue> getLogRange(final SharedLogObjectID oid,
+ final SeqNum after, final SeqNum upToThis) {
+
+ List<LogValue> logs = new ArrayList<>();
+ final SeqNum oneAfterLast = upToThis.next();
+ LogMapManager logCache = getLogMapManager(oid);
+ // may want to do async pre-fetching if RPC is triggered often
+ for (SeqNum s = after.next(); !s.equals(oneAfterLast); s = s.next()) {
+ try {
+ logs.add(logCache.getLogValue(s).get());
+ // FIXME Need to somehow detect log has been recycled case and fail
+ } catch (InterruptedException | ExecutionException e) {
+ log.error("Failed getting log " + oid + "@" + s, e);
+ // Should we retry/[ignore]/throw?
+ }
+ }
+ return logs;
+ }
+}
diff --git a/src/main/java/net/onrc/onos/core/util/distributed/sharedlog/hazelcast/HazelcastSequencer.java b/src/main/java/net/onrc/onos/core/util/distributed/sharedlog/hazelcast/HazelcastSequencer.java
new file mode 100644
index 0000000..7969999
--- /dev/null
+++ b/src/main/java/net/onrc/onos/core/util/distributed/sharedlog/hazelcast/HazelcastSequencer.java
@@ -0,0 +1,41 @@
+package net.onrc.onos.core.util.distributed.sharedlog.hazelcast;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.common.annotations.Beta;
+import com.hazelcast.core.IAtomicLong;
+
+import net.onrc.onos.core.util.distributed.sharedlog.SeqNum;
+import net.onrc.onos.core.util.distributed.sharedlog.Sequencer;
+
+/**
+ * Sequencer implemented using Hazelcast IAtomicLong.
+ */
+@Beta
+public final class HazelcastSequencer implements Sequencer {
+
+ private final IAtomicLong value;
+
+ /**
+ * Constructor.
+ *
+ * @param value IAtomicLong to use as Sequencer
+ */
+ public HazelcastSequencer(IAtomicLong value) {
+ this.value = checkNotNull(value);
+ }
+
+ @Override
+ public SeqNum get() {
+ return SeqNum.anyValueOf(value.get());
+ }
+
+ @Override
+ public SeqNum next() {
+ long next = value.incrementAndGet();
+ if (next == SeqNum.ZERO) {
+ next = value.incrementAndGet();
+ }
+ return SeqNum.valueOf(next);
+ }
+}
diff --git a/src/main/java/net/onrc/onos/core/util/distributed/sharedlog/hazelcast/HazelcastSequencerRuntime.java b/src/main/java/net/onrc/onos/core/util/distributed/sharedlog/hazelcast/HazelcastSequencerRuntime.java
new file mode 100644
index 0000000..7972b89
--- /dev/null
+++ b/src/main/java/net/onrc/onos/core/util/distributed/sharedlog/hazelcast/HazelcastSequencerRuntime.java
@@ -0,0 +1,35 @@
+package net.onrc.onos.core.util.distributed.sharedlog.hazelcast;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.common.annotations.Beta;
+import com.hazelcast.core.HazelcastInstance;
+
+import net.onrc.onos.core.util.distributed.sharedlog.Sequencer;
+import net.onrc.onos.core.util.distributed.sharedlog.SharedLogObjectID;
+import net.onrc.onos.core.util.distributed.sharedlog.runtime.SequencerRuntime;
+
+/**
+ * Runtime which hands out the Sequencer backed by Hazelcast IAtomicLong.
+ */
+@Beta
+public class HazelcastSequencerRuntime implements SequencerRuntime {
+
+ private final HazelcastInstance instance;
+
+ /**
+ * Constructs SequencerRuntime.
+ *
+ * @param instance HazelcastInstance to use.
+ */
+ public HazelcastSequencerRuntime(HazelcastInstance instance) {
+ this.instance = checkNotNull(instance);
+ }
+
+ @Override
+ public Sequencer getSequencer(SharedLogObjectID id) {
+ return new HazelcastSequencer(
+ instance.getAtomicLong(
+ "sequencer://" + id.getObjectName()));
+ }
+}
diff --git a/src/main/java/net/onrc/onos/core/util/distributed/sharedlog/hazelcast/LogMapManager.java b/src/main/java/net/onrc/onos/core/util/distributed/sharedlog/hazelcast/LogMapManager.java
new file mode 100644
index 0000000..bcaf07d
--- /dev/null
+++ b/src/main/java/net/onrc/onos/core/util/distributed/sharedlog/hazelcast/LogMapManager.java
@@ -0,0 +1,166 @@
+package net.onrc.onos.core.util.distributed.sharedlog.hazelcast;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import javax.annotation.concurrent.ThreadSafe;
+
+import org.apache.commons.lang3.concurrent.ConcurrentUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import net.onrc.onos.core.util.distributed.sharedlog.LogEventListener;
+import net.onrc.onos.core.util.distributed.sharedlog.SeqNum;
+import net.onrc.onos.core.util.distributed.sharedlog.SharedLogObjectID;
+import net.onrc.onos.core.util.distributed.sharedlog.internal.LogValue;
+
+import com.google.common.annotations.Beta;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.hazelcast.core.EntryEvent;
+import com.hazelcast.core.EntryListener;
+import com.hazelcast.core.IMap;
+
+/**
+ * LogMapManager.
+ *
+ * - Listener to register to Hazelcast IMap.
+ * - Caches LogValue notification.
+ * - Dispatches LogValue added event to listeners
+ * - TODO health check latest log to detect dropped notification
+ */
+@Beta
+@ThreadSafe
+public class LogMapManager implements EntryListener<SeqNum, LogValue> {
+
+ private static final Logger log = LoggerFactory
+ .getLogger(LogMapManager.class);
+
+ // TODO revisit appropriate size. current guess MAX 5k/s * 2
+ private static final int CACHE_SIZE = 10000;
+
+ private final SharedLogObjectID oid;
+ private final IMap<SeqNum, LogValue> logMap;
+
+ // TODO is there a library to do listener management?
+ private CopyOnWriteArrayList<LogEventListener> listeners;
+
+ private Cache<SeqNum, Future<LogValue>> cache;
+
+ // latest log notified to clients
+ private SeqNum lastLog;
+
+ // It might be that only oid is sufficient.
+ /**
+ * Constructor.
+ *
+ * @param oid ID of SharedLogObject this instance is attached
+ * @param logMap Log Map this SharedLogObject use
+ */
+ public LogMapManager(SharedLogObjectID oid,
+ IMap<SeqNum, LogValue> logMap) {
+ this.oid = checkNotNull(oid);
+ this.logMap = checkNotNull(logMap);
+ this.listeners = new CopyOnWriteArrayList<>();
+ this.cache = CacheBuilder.newBuilder()
+ .weakValues()
+ .expireAfterWrite(5, TimeUnit.MINUTES)
+ .concurrencyLevel(1)
+ .maximumSize(CACHE_SIZE)
+ .build();
+ this.lastLog = SeqNum.INITIAL;
+ }
+
+ // called from Hazelcast thread
+ @Override
+ public void entryAdded(EntryEvent<SeqNum, LogValue> event) {
+ // Cache maintenance
+ cache.put(event.getKey(),
+ ConcurrentUtils.constantFuture(event.getValue()));
+
+ // TODO will need suppress mechanism once we have health check
+
+ if (lastLog.compareTo(event.getKey()) < 0) {
+ lastLog = event.getKey();
+ }
+ for (LogEventListener lsnr : listeners) {
+ lsnr.logAdded(event.getKey());
+ }
+ }
+
+ // called from Hazelcast thread
+ @Override
+ public void entryRemoved(EntryEvent<SeqNum, LogValue> event) {
+ // Cache maintenance
+ cache.invalidate(event.getKey());
+
+ // only add will be notified to listeners
+ }
+
+ // called from Hazelcast thread
+ @Override
+ public void entryUpdated(EntryEvent<SeqNum, LogValue> event) {
+ // Cache maintenance
+ cache.put(event.getKey(),
+ ConcurrentUtils.constantFuture(event.getValue()));
+
+ // only add will be notified to listeners
+ }
+
+ // called from Hazelcast thread
+ @Override
+ public void entryEvicted(EntryEvent<SeqNum, LogValue> event) {
+ // Cache maintenance
+ cache.invalidate(event.getKey());
+
+ // only add will be notified to listeners
+ }
+
+ /**
+ * Adds the listener for specified shared object log event.
+ *
+ * @param listener to add
+ */
+ public void addListener(LogEventListener listener) {
+ listeners.addIfAbsent(listener);
+ }
+
+ /**
+ * Removes the listener for specified shared object log event.
+ *
+ * @param listener to add
+ */
+ public void removeListener(LogEventListener listener) {
+ listeners.remove(listener);
+ }
+
+ /**
+ * Reads LogValue from LogMap.
+ * <p>
+ * It will use the cached value if it exist, if not it will
+ * get the value from distributed store.
+ *
+ * @param key log sequence number
+ * @return Future containing log value
+ */
+ public Future<LogValue> getLogValue(final SeqNum key) {
+ try {
+ return cache.get(key, new Callable<Future<LogValue>>() {
+
+ @Override
+ public Future<LogValue> call() throws Exception {
+ return logMap.getAsync(key);
+ }
+ });
+ } catch (ExecutionException e) {
+ log.error("Reading from Log Map failed.", e);
+ // should never happen?
+ return ConcurrentUtils.constantFuture(null);
+ }
+ }
+}
diff --git a/src/main/java/net/onrc/onos/core/util/distributed/sharedlog/hazelcast/LogValueSerializer.java b/src/main/java/net/onrc/onos/core/util/distributed/sharedlog/hazelcast/LogValueSerializer.java
new file mode 100644
index 0000000..87b4e3f
--- /dev/null
+++ b/src/main/java/net/onrc/onos/core/util/distributed/sharedlog/hazelcast/LogValueSerializer.java
@@ -0,0 +1,77 @@
+package net.onrc.onos.core.util.distributed.sharedlog.hazelcast;
+
+import java.io.IOException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import net.onrc.onos.core.util.distributed.sharedlog.ByteValue;
+import net.onrc.onos.core.util.distributed.sharedlog.internal.LogValue;
+import net.onrc.onos.core.util.distributed.sharedlog.internal.NoOp;
+import net.onrc.onos.core.util.serializers.HazelcastSerializationConstants;
+
+import com.google.common.annotations.Beta;
+import com.hazelcast.nio.ObjectDataInput;
+import com.hazelcast.nio.ObjectDataOutput;
+import com.hazelcast.nio.serialization.StreamSerializer;
+
+/**
+ * Serializer for LogValue.
+ */
+@Beta
+public class LogValueSerializer implements StreamSerializer<LogValue> {
+
+ private static final Logger log = LoggerFactory
+ .getLogger(LogValueSerializer.class);
+
+ //
+ // | LogValue type (1 byte) | (type specific) |
+ //
+
+ // 1st byte identifying LogValue type
+ private static final int NO_OP = 0;
+ private static final int BYTE_VALUE = 1;
+
+ @Override
+ public int getTypeId() {
+ return HazelcastSerializationConstants.LOG_VALUE_TYPE_ID;
+ }
+
+ @Override
+ public void destroy() {
+ }
+
+ @Override
+ public void write(ObjectDataOutput out, LogValue object) throws IOException {
+
+ if (object instanceof ByteValue) {
+ out.writeByte(BYTE_VALUE);
+ ByteValue bytes = (ByteValue) object;
+ out.writeObject(bytes.getBytes());
+
+ } else if (object instanceof NoOp) {
+ out.writeByte(NO_OP);
+
+ } else {
+ log.error("Unexpected type encountered: {}", object);
+ }
+ }
+
+ @Override
+ public LogValue read(ObjectDataInput in) throws IOException {
+
+ final int typeId = in.readByte();
+ switch(typeId) {
+ case BYTE_VALUE:
+ return new ByteValue(in.<byte[]>readObject());
+
+ case NO_OP:
+ return NoOp.VALUE;
+
+ default:
+ }
+
+ log.error("Unexpected type encountered: {}", typeId);
+ return null;
+ }
+}
diff --git a/src/main/java/net/onrc/onos/core/util/distributed/sharedlog/internal/LogValue.java b/src/main/java/net/onrc/onos/core/util/distributed/sharedlog/internal/LogValue.java
new file mode 100644
index 0000000..726f37b
--- /dev/null
+++ b/src/main/java/net/onrc/onos/core/util/distributed/sharedlog/internal/LogValue.java
@@ -0,0 +1,26 @@
+package net.onrc.onos.core.util.distributed.sharedlog.internal;
+
+import javax.annotation.concurrent.Immutable;
+
+import com.google.common.annotations.Beta;
+
+
+// LogMap value transition
+//
+// (null) -> ByteValue : log successfully written
+// (null) -> NoOp : allocated SeqNum abandoned/timed out
+// NoOp -> Recycled : became part of SnapShot
+
+// When log reader encounter null
+// (case 1) wait for writer, if time out invalidate log with NoOp
+// (case 2) too far behind, reader need to reset to snap shot
+// XXX How to distinguish above is TBD. For now assuming case 2 will never happen
+
+/**
+ * Value stored in LogMap.
+ */
+@Beta
+@Immutable
+public interface LogValue {
+
+}
diff --git a/src/main/java/net/onrc/onos/core/util/distributed/sharedlog/internal/NoOp.java b/src/main/java/net/onrc/onos/core/util/distributed/sharedlog/internal/NoOp.java
new file mode 100644
index 0000000..db1666c
--- /dev/null
+++ b/src/main/java/net/onrc/onos/core/util/distributed/sharedlog/internal/NoOp.java
@@ -0,0 +1,41 @@
+package net.onrc.onos.core.util.distributed.sharedlog.internal;
+
+import com.google.common.annotations.Beta;
+
+
+// TODO register to Kryo?
+// TODO Not sure if reusing for SnapShotValue is good idea
+/**
+ * Value representing this log entry was abandoned.
+ */
+@Beta
+public final class NoOp implements LogValue, SnapShotValue {
+
+ /**
+ * The NoOp.
+ */
+ public static final NoOp VALUE = new NoOp();
+
+ /**
+ * Constructor.
+ */
+ protected NoOp() {}
+
+ @Override
+ public int hashCode() {
+ return 0;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (o instanceof NoOp) {
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public String toString() {
+ return this.getClass().getSimpleName();
+ }
+}
diff --git a/src/main/java/net/onrc/onos/core/util/distributed/sharedlog/internal/SnapShotValue.java b/src/main/java/net/onrc/onos/core/util/distributed/sharedlog/internal/SnapShotValue.java
new file mode 100644
index 0000000..5553c64
--- /dev/null
+++ b/src/main/java/net/onrc/onos/core/util/distributed/sharedlog/internal/SnapShotValue.java
@@ -0,0 +1,14 @@
+package net.onrc.onos.core.util.distributed.sharedlog.internal;
+
+import javax.annotation.concurrent.Immutable;
+
+import com.google.common.annotations.Beta;
+
+/**
+ * Value stored in SnapShot Map.
+ */
+@Beta
+@Immutable
+public interface SnapShotValue {
+
+}
diff --git a/src/main/java/net/onrc/onos/core/util/distributed/sharedlog/internal/StringID.java b/src/main/java/net/onrc/onos/core/util/distributed/sharedlog/internal/StringID.java
new file mode 100644
index 0000000..07588a8
--- /dev/null
+++ b/src/main/java/net/onrc/onos/core/util/distributed/sharedlog/internal/StringID.java
@@ -0,0 +1,62 @@
+package net.onrc.onos.core.util.distributed.sharedlog.internal;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.common.annotations.Beta;
+import com.google.common.base.Objects;
+
+import net.onrc.onos.core.util.distributed.sharedlog.SharedLogObjectID;
+
+/**
+ * Simple String implementation of SharedLogObjectID.
+ */
+@Beta
+public class StringID implements SharedLogObjectID {
+
+ private final String id;
+
+ /**
+ * Constructor.
+ *
+ * @param id String
+ */
+ public StringID(final String id) {
+ this.id = checkNotNull(id);
+ checkArgument(!id.isEmpty(), "id cannot be empty String");
+ }
+
+
+ @Override
+ public String getObjectName() {
+ return id;
+ }
+
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(id);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null) {
+ return false;
+ }
+ if (getClass() != obj.getClass()) {
+ return false;
+ }
+ StringID other = (StringID) obj;
+ return Objects.equal(id, other.id);
+ }
+
+ @Override
+ public String toString() {
+ return Objects.toStringHelper(this)
+ .add("id", id)
+ .toString();
+ }
+}
diff --git a/src/main/java/net/onrc/onos/core/util/distributed/sharedlog/runtime/LogBasedRuntime.java b/src/main/java/net/onrc/onos/core/util/distributed/sharedlog/runtime/LogBasedRuntime.java
new file mode 100644
index 0000000..0cd121e
--- /dev/null
+++ b/src/main/java/net/onrc/onos/core/util/distributed/sharedlog/runtime/LogBasedRuntime.java
@@ -0,0 +1,127 @@
+package net.onrc.onos.core.util.distributed.sharedlog.runtime;
+
+import java.util.List;
+
+import javax.annotation.concurrent.ThreadSafe;
+
+import com.google.common.annotations.Beta;
+
+import net.onrc.onos.core.util.distributed.sharedlog.ByteValue;
+import net.onrc.onos.core.util.distributed.sharedlog.LogEventListener;
+import net.onrc.onos.core.util.distributed.sharedlog.SeqNum;
+import net.onrc.onos.core.util.distributed.sharedlog.SharedLogObject;
+import net.onrc.onos.core.util.distributed.sharedlog.SharedLogObjectID;
+import net.onrc.onos.core.util.distributed.sharedlog.exception.LogNotApplicable;
+import net.onrc.onos.core.util.distributed.sharedlog.exception.LogWriteTimedOut;
+import net.onrc.onos.core.util.distributed.sharedlog.internal.LogValue;
+
+
+/**
+ * Runtime to read/write shared object using shared log.
+ */
+@Beta
+@ThreadSafe
+public interface LogBasedRuntime {
+
+ // FIXME better method name
+ /**
+ * Assign or get already assigned SharedLogObject ID.
+ *
+ * @param sharedObjectName The name of the shared log object
+ * @return SharedLogObjectID
+ */
+ public SharedLogObjectID getOid(final String sharedObjectName);
+
+ /**
+ * Apply given logValue to SharedLogObject.
+ *
+ * @param sobj SharedLogObject to manipulate.
+ * @param logValue LogValue to apply
+ * @param queryBeforeUpdate true if log should be replayed to latest before updating
+ * @return sequence number of the new log entry
+ *
+ * @throws LogNotApplicable Thrown when {@code logValue} was not applicable to
+ * SharedLogObject.
+ * @throws LogWriteTimedOut Thrown when write failed due to time out.
+ */
+ public SeqNum updateHelper(final SharedLogObject sobj, final ByteValue logValue,
+ final boolean queryBeforeUpdate)
+ throws LogNotApplicable, LogWriteTimedOut;
+
+ /**
+ * Reads from the shared log and
+ * updates the {@link SharedLogObject} to latest state,
+ * by calling {@link SharedLogObject#apply(SeqNum, LogValue)}.
+ *
+ * @param sobj SharedLogObject to update.
+ * @throws LogNotApplicable
+ */
+ public void queryHelper(final SharedLogObject sobj);
+
+
+ /**
+ * Reads from the shared log and
+ * updates the {@link SharedLogObject} to replayTo,
+ * by calling {@link SharedLogObject#apply(SeqNum, LogValue)}.
+ *
+ * @param sobj SharedLogObject to update.
+ * @param replayTo Sequence number to replay to. (must be <= latest)
+ */
+ public void queryHelper(final SharedLogObject sobj, final SeqNum replayTo);
+
+ // TODO we may need to provide a way to do manual update (getSeqNum, update)
+
+
+ // Snapshot related interfaces
+
+ // TODO should there be a public interface to create snapshot?
+ // If we expose to public, how to determine "latest" may have issue
+
+ /**
+ * Gets the latest snapshot ID.
+ *
+ * @param oid {@link SharedLogObjectID}
+ * @return latest snapshot ID
+ */
+ public SeqNum getLatestSnapShotId(final SharedLogObjectID oid);
+
+ // TODO give me better name for LogBaseRuntime#resetToSnapShot
+ /**
+ * Updates the {@link SharedLogObject} to specified snapshot.
+ *
+ * @param sobj {@link SharedLogObject} to update
+ * @param ssId snap shot ID
+ * @return true if success, false otherwise.
+ */
+ public boolean resetToSnapShot(final SharedLogObject sobj, final SeqNum ssId);
+
+
+ // TODO should LogRuntime be responsible for polling, to detect dropped notification.
+ /**
+ * Adds the listener for specified shared object log event.
+ *
+ * @param oid {@link SharedLogObjectID}
+ * @param listener to add
+ */
+ public void addListener(SharedLogObjectID oid, LogEventListener listener);
+
+ /**
+ * Removes the listener for specified shared object log event.
+ *
+ * @param oid {@link SharedLogObjectID}
+ * @param listener to add
+ */
+ public void removeListener(SharedLogObjectID oid, LogEventListener listener);
+
+ // TODO might not need this any more
+ /**
+ * Gets the LogValues in range ({@code after}, {@code upToThis}].
+ *
+ * @param oid ID of SharedLogObject
+ * @param after sequence number before (typically current sequence number)
+ * @param upToThis last sequence number you want to retrieve
+ * @return List of {@link LogValue}s in specified range
+ */
+ public List<LogValue> getLogRange(final SharedLogObjectID oid,
+ final SeqNum after, final SeqNum upToThis);
+}
diff --git a/src/main/java/net/onrc/onos/core/util/distributed/sharedlog/runtime/SequencerRuntime.java b/src/main/java/net/onrc/onos/core/util/distributed/sharedlog/runtime/SequencerRuntime.java
new file mode 100644
index 0000000..bf432c6
--- /dev/null
+++ b/src/main/java/net/onrc/onos/core/util/distributed/sharedlog/runtime/SequencerRuntime.java
@@ -0,0 +1,24 @@
+package net.onrc.onos.core.util.distributed.sharedlog.runtime;
+
+import javax.annotation.concurrent.ThreadSafe;
+
+import com.google.common.annotations.Beta;
+
+import net.onrc.onos.core.util.distributed.sharedlog.Sequencer;
+import net.onrc.onos.core.util.distributed.sharedlog.SharedLogObjectID;
+
+/**
+ * Runtime to get Sequencer implementation for given SharedLogObjectID.
+ */
+@Beta
+@ThreadSafe
+public interface SequencerRuntime {
+
+ /**
+ * Gets the Sequencer for specified SharedLogObjectID.
+ *
+ * @param id SharedLogObjectID
+ * @return Sequencer for specified id
+ */
+ public Sequencer getSequencer(SharedLogObjectID id);
+}
diff --git a/src/main/java/net/onrc/onos/core/util/serializers/HazelcastSerializationConstants.java b/src/main/java/net/onrc/onos/core/util/serializers/HazelcastSerializationConstants.java
new file mode 100644
index 0000000..1c8f876
--- /dev/null
+++ b/src/main/java/net/onrc/onos/core/util/serializers/HazelcastSerializationConstants.java
@@ -0,0 +1,33 @@
+package net.onrc.onos.core.util.serializers;
+
+/**
+ * Hazelcast serializer related constants.
+ */
+public final class HazelcastSerializationConstants {
+
+ // Type ID
+
+ /**
+ * Type ID for {@link net.onrc.onos.core.datastore.hazelcast.HZTable.VersionedValue}.
+ */
+ public static final int VERSIONED_VALUE_TYPE_ID = 1;
+
+ /**
+ * Type ID for {@link net.onrc.onos.core.util.distributed.sharedlog.internal.LogValue}.
+ */
+ public static final int LOG_VALUE_TYPE_ID = 2;
+
+
+ // Factory ID
+
+ // WARNING: hard coded value exist in hazelcast.xml
+ /**
+ * Factory ID for {@link net.onrc.onos.core.datastore.hazelcast.VersionedValueSerializableFactory}.
+ */
+ public static final int VERSIONED_VALUE_SERIALIZABLE_FACTORY_ID = 1;
+
+ /**
+ * Avoid instantiation.
+ */
+ private HazelcastSerializationConstants() {}
+}
diff --git a/src/test/java/net/onrc/onos/core/util/TestUtils.java b/src/test/java/net/onrc/onos/core/util/TestUtils.java
index 98d5d60..8dafad5 100644
--- a/src/test/java/net/onrc/onos/core/util/TestUtils.java
+++ b/src/test/java/net/onrc/onos/core/util/TestUtils.java
@@ -84,7 +84,12 @@
try {
@SuppressWarnings("unchecked")
Class<T> clazz = (Class<T>) subject.getClass();
- Method method = clazz.getDeclaredMethod(methodName, paramTypes);
+ final Method method;
+ if (paramTypes == null || paramTypes.length == 0) {
+ method = clazz.getDeclaredMethod(methodName);
+ } else {
+ method = clazz.getDeclaredMethod(methodName, paramTypes);
+ }
method.setAccessible(true);
@SuppressWarnings("unchecked")
diff --git a/src/test/java/net/onrc/onos/core/util/distributed/sharedlog/SeqNumTest.java b/src/test/java/net/onrc/onos/core/util/distributed/sharedlog/SeqNumTest.java
new file mode 100644
index 0000000..9ae7897
--- /dev/null
+++ b/src/test/java/net/onrc/onos/core/util/distributed/sharedlog/SeqNumTest.java
@@ -0,0 +1,157 @@
+package net.onrc.onos.core.util.distributed.sharedlog;
+
+import static org.junit.Assert.*;
+import static org.hamcrest.Matchers.*;
+
+import org.junit.Test;
+
+import com.google.common.primitives.UnsignedLongs;
+
+/**
+ * Basic {@link SeqNum} class tests.
+ */
+public class SeqNumTest {
+
+ /**
+ * Tests {@link SeqNum#next()} points to next sequence number,
+ * excluding reserved INITIAL value,
+ * and correctly wraps around ULONG_MAX.
+ */
+ @Test
+ public void testNext() {
+ final SeqNum one = SeqNum.INITIAL.next();
+ assertEquals(1L, one.longValue());
+
+ // succ
+ final SeqNum two = one.next();
+ assertEquals(2L, two.longValue());
+
+ // succ wraps around skipping INITIAL
+ final SeqNum max = SeqNum.valueOf(UnsignedLongs.MAX_VALUE);
+ assertEquals(one, max.next());
+ }
+
+ /**
+ * Tests {@link SeqNum#prev()} points to previous sequence number,
+ * excluding reserved INITIAL value,
+ * and correctly wraps around ULONG_MAX.
+ */
+ @Test
+ public void testPrev() {
+
+ // prev
+ final SeqNum two = SeqNum.valueOf(2);
+ final SeqNum one = two.prev();
+ assertEquals(1L, one.longValue());
+
+ final SeqNum max = SeqNum.INITIAL.prev();
+ assertEquals(UnsignedLongs.MAX_VALUE, max.longValue());
+
+ // prev wraps around skipping INITIAL
+ assertEquals(max, one.prev());
+ }
+
+ /**
+ * Tests that SeqNum equals and hashCode.
+ */
+ @Test
+ public void testEqualsObject() {
+ final SeqNum s = SeqNum.valueOf(42L);
+
+ assertTrue(s.equals(s));
+ assertEquals(s.hashCode(), s.hashCode());
+
+ assertTrue(s.equals(s.next().prev()));
+ assertEquals(s.hashCode(), s.next().prev().hashCode());
+
+ assertFalse(s.equals(s.next()));
+ assertFalse(s.equals(s.prev()));
+ assertFalse(s.equals(null));
+ assertFalse(s.equals(Long.valueOf(42L)));
+ }
+
+ /**
+ * Tests that SeqNum is converted to String as unsigned decimal.
+ */
+ @Test
+ public void testToString() {
+ assertEquals("0", SeqNum.INITIAL.toString());
+ assertEquals("1", SeqNum.valueOf(1).toString());
+ // toString format is unsigned decimal string
+ assertEquals("9223372036854775808",
+ SeqNum.valueOf(Long.MAX_VALUE + 1).toString());
+ }
+
+ /**
+ * Tests that comparison works treating long value as point in a ring.
+ */
+ @Test
+ public void testCompareTo() {
+ final SeqNum zero = SeqNum.INITIAL;
+ final SeqNum one = SeqNum.valueOf(1);
+ final SeqNum two = SeqNum.valueOf(2);
+ final SeqNum oneAlt = zero.next();
+ final SeqNum negOne = one.prev();
+
+ // 0 < 1
+ assertThat(zero.compareTo(one), lessThan(0));
+ // 1 > 0
+ assertThat(one.compareTo(zero), greaterThan(0));
+
+ // 0 == 0
+ assertThat(zero.compareTo(zero), equalTo(0));
+ // 1 == 1
+ assertThat(oneAlt.compareTo(one), equalTo(0));
+ assertThat(one.compareTo(oneAlt), equalTo(0));
+
+ // 2 > 1
+ assertThat(two.compareTo(one), greaterThan(0));
+ // 1 < 2
+ assertThat(one.compareTo(two), lessThan(0));
+
+ // (-1) < 1
+ assertThat(negOne.compareTo(one), lessThan(0));
+ // 1 > (-1)
+ assertThat(one.compareTo(negOne), greaterThan(0));
+
+ // (-1) > (-3)
+ assertThat(negOne.compareTo(negOne.prev().prev()), greaterThan(0));
+ // (-3) < (-1)
+ assertThat(negOne.prev().prev().compareTo(negOne), lessThan(0));
+
+ // (-1) > 0 [0 is always the smallest element]
+ assertThat(negOne.compareTo(zero), greaterThan(0));
+ // 0 < (-1) [0 is always the smallest element]
+ assertThat(zero.compareTo(negOne), lessThan(0));
+
+ /// comparison using shorter arc
+
+ // 0 < SLONG_MAX+1(=HALF) [clockwise arc used]
+ assertThat(zero.compareTo(zero.step(Long.MAX_VALUE).next()), lessThan(0));
+ assertThat(zero.step(Long.MAX_VALUE).next().compareTo(zero), greaterThan(0));
+ /// 0 is always compared clock wise (never wraps)
+ // 0 < SLONG_MAX+1(=HALF)+1 [clockwise arc used]
+ assertThat(zero.compareTo(zero.step(Long.MAX_VALUE).next().next()), lessThan(0));
+ assertThat(zero.step(Long.MAX_VALUE).next().next().compareTo(zero), greaterThan(0));
+
+ // 1 < 1+SLONG_MAX(=HALF-1) [clockwise arc used]
+ assertThat(one.compareTo(one.step(Long.MAX_VALUE)), lessThan(0));
+ assertThat(one.step(Long.MAX_VALUE).compareTo(one), greaterThan(0));
+ // 1 < 1+SLONG_MAX+1(=HALF) [SAME, counter-clockwise arc used]
+ assertThat(one.compareTo(one.step(Long.MAX_VALUE).next()), greaterThan(0));
+ assertThat(one.step(Long.MAX_VALUE).next().compareTo(one), lessThan(0));
+ // 1 < 1+SLONG_MAX+2(=HALF+1) [counter-clockwise arc used]
+ assertThat(one.compareTo(one.step(Long.MAX_VALUE).next().next()), greaterThan(0));
+ assertThat(one.step(Long.MAX_VALUE).next().next().compareTo(one), lessThan(0));
+
+ // (-1) < (-1)+SLONG_MAX(=HALF-1) [clockwise arc used]
+ assertThat(negOne.compareTo(negOne.step(Long.MAX_VALUE)), lessThan(0));
+ assertThat(negOne.step(Long.MAX_VALUE).compareTo(negOne), greaterThan(0));
+ // (-1) < (-1)+SLONG_MAX+1(=HALF) [SAME, clockwise arc used]
+ assertThat(negOne.compareTo(negOne.step(Long.MAX_VALUE).next()), lessThan(0));
+ assertThat(negOne.step(Long.MAX_VALUE).next().compareTo(negOne), greaterThan(0));
+ // (-1) > (-1)+SLONG_MAX+2(=HALF+1) [counter-clockwise arc used]
+ assertThat(negOne.compareTo(negOne.step(Long.MAX_VALUE).next().next()), greaterThan(0));
+ assertThat(negOne.step(Long.MAX_VALUE).next().next().compareTo(negOne), lessThan(0));
+ }
+}
diff --git a/src/test/java/net/onrc/onos/core/util/distributed/sharedlog/example/LogAtomicLongTest.java b/src/test/java/net/onrc/onos/core/util/distributed/sharedlog/example/LogAtomicLongTest.java
new file mode 100644
index 0000000..91fd2be
--- /dev/null
+++ b/src/test/java/net/onrc/onos/core/util/distributed/sharedlog/example/LogAtomicLongTest.java
@@ -0,0 +1,132 @@
+package net.onrc.onos.core.util.distributed.sharedlog.example;
+
+import static org.junit.Assert.*;
+
+import java.util.UUID;
+
+import net.onrc.onos.core.datastore.hazelcast.HZClient;
+import net.onrc.onos.core.util.IntegrationTest;
+import net.onrc.onos.core.util.TestUtils;
+import net.onrc.onos.core.util.distributed.sharedlog.hazelcast.HazelcastRuntime;
+import net.onrc.onos.core.util.distributed.sharedlog.hazelcast.HazelcastSequencerRuntime;
+import net.onrc.onos.core.util.distributed.sharedlog.runtime.LogBasedRuntime;
+import net.onrc.onos.core.util.distributed.sharedlog.runtime.SequencerRuntime;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import com.hazelcast.core.HazelcastInstance;
+
+/**
+ * Unit test to run LogAtomicLong example.
+ */
+public class LogAtomicLongTest {
+
+ static {
+ // configuration to quickly fall back to instance mode for faster test run
+ System.setProperty("net.onrc.onos.core.datastore.hazelcast.client.attemptLimit", "0");
+ }
+
+
+ private LogAtomicLong along;
+
+ /**
+ * Create LogAtomicLong instance.
+ */
+ @Before
+ public void setUp() {
+ final HZClient cl = HZClient.getClient();
+ HazelcastInstance hz = TestUtils.callMethod(cl, "getHZInstance", null);
+
+ SequencerRuntime sequencerRuntime = new HazelcastSequencerRuntime(hz);
+ LogBasedRuntime runtime = new HazelcastRuntime(hz, sequencerRuntime);
+
+ String counterName = UUID.randomUUID().toString();
+ along = new LogAtomicLong(runtime, counterName);
+ }
+
+ /**
+ * Test unconditional write.
+ */
+ @Test
+ public void testSet() {
+ along.set(42);
+ assertEquals(42, along.get());
+ along.set(0);
+ assertEquals(0, along.get());
+ }
+
+ /**
+ * Test conditional write.
+ */
+ @Test
+ public void testCompareAndSet() {
+ along.set(42);
+ assertEquals(42, along.get());
+
+ along.compareAndSet(42, 43);
+ assertEquals(43, along.get());
+
+ // should remain unchanged if expectation not met
+ along.compareAndSet(42, 45);
+ assertEquals(43, along.get());
+ }
+
+ /**
+ * Confirm initial value is 0.
+ */
+ @Test
+ public void testGet() {
+ assertEquals(0, along.get());
+ }
+
+ /**
+ * Confirm another instance with same ID observes the same value.
+ */
+ @Test
+ public void testOtherInstance() {
+ along.set(42);
+ assertEquals(42, along.get());
+
+ final HZClient cl = HZClient.getClient();
+ HazelcastInstance hz = TestUtils.callMethod(cl, "getHZInstance", null);
+
+ SequencerRuntime sequencerRuntime = new HazelcastSequencerRuntime(hz);
+ LogBasedRuntime runtime = new HazelcastRuntime(hz, sequencerRuntime);
+
+ LogAtomicLong anotherInstance = new LogAtomicLong(runtime,
+ along.getObjectID());
+ assertEquals(42, anotherInstance.get());
+ }
+
+ /**
+ * Confirm another instance with same ID initializes using snapshot.
+ */
+ @Category(IntegrationTest.class)
+ @Test
+ public void testOtherInstanceFromSnapshot() {
+ along.set(42);
+ assertEquals(42, along.get());
+
+ final HZClient cl = HZClient.getClient();
+ HazelcastInstance hz = TestUtils.callMethod(cl, "getHZInstance", null);
+
+ SequencerRuntime sequencerRuntime = new HazelcastSequencerRuntime(hz);
+ LogBasedRuntime runtime = new HazelcastRuntime(hz, sequencerRuntime);
+
+ // FIXME SNAPSHOT_INTERVAL should be customized to smaller values
+ // write multiple times to trigger snapshot
+ for (int i = 0; i < HazelcastRuntime.SNAPSHOT_INTERVAL + 2; ++i) {
+ along.set(i);
+ }
+ along.set(99);
+ Thread.yield();
+
+ // this instance might start from latest snap shot then replay
+ LogAtomicLong anotherInstance2 = new LogAtomicLong(runtime,
+ along.getObjectID());
+ assertEquals(99, anotherInstance2.get());
+
+ }
+}