SharedLog runtime proto

ONOS-1806, ONOS-1807, ONOS-1808

Change-Id: Ic86cb7bdc6b04a81180ab43afa01c1aea4ac18c7
diff --git a/src/main/java/net/onrc/onos/core/datastore/hazelcast/HZClient.java b/src/main/java/net/onrc/onos/core/datastore/hazelcast/HZClient.java
index eaa647c..f725248 100644
--- a/src/main/java/net/onrc/onos/core/datastore/hazelcast/HZClient.java
+++ b/src/main/java/net/onrc/onos/core/datastore/hazelcast/HZClient.java
@@ -18,6 +18,7 @@
 import net.onrc.onos.core.datastore.hazelcast.HZTable.VersionedValue;
 import net.onrc.onos.core.datastore.internal.IModifiableMultiEntryOperation;
 import net.onrc.onos.core.datastore.utils.ByteArrayUtil;
+import net.onrc.onos.core.util.serializers.HazelcastSerializationConstants;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -186,7 +187,7 @@
         // This method is no longer required, if equibalent to the following
         // is defined in hazelcast.xml
         config.addDataSerializableFactoryClass(
-                VersionedValueSerializableFactory.FACTORY_ID,
+                HazelcastSerializationConstants.VERSIONED_VALUE_SERIALIZABLE_FACTORY_ID,
                 VersionedValueSerializableFactory.class);
     }
 
diff --git a/src/main/java/net/onrc/onos/core/datastore/hazelcast/HZTable.java b/src/main/java/net/onrc/onos/core/datastore/hazelcast/HZTable.java
index 0164433..d858423 100644
--- a/src/main/java/net/onrc/onos/core/datastore/hazelcast/HZTable.java
+++ b/src/main/java/net/onrc/onos/core/datastore/hazelcast/HZTable.java
@@ -17,6 +17,7 @@
 import net.onrc.onos.core.datastore.ObjectExistsException;
 import net.onrc.onos.core.datastore.WrongVersionException;
 import net.onrc.onos.core.datastore.utils.ByteArrayUtil;
+import net.onrc.onos.core.util.serializers.HazelcastSerializationConstants;
 
 import org.apache.commons.lang.ArrayUtils;
 import org.slf4j.Logger;
@@ -117,12 +118,12 @@
 
         @Override
         public int getFactoryId() {
-            return VersionedValueSerializableFactory.FACTORY_ID;
+            return HazelcastSerializationConstants.VERSIONED_VALUE_SERIALIZABLE_FACTORY_ID;
         }
 
         @Override
         public int getId() {
-            return VersionedValueSerializableFactory.VERSIONED_VALUE_ID;
+            return HazelcastSerializationConstants.VERSIONED_VALUE_TYPE_ID;
         }
 
         @Override
diff --git a/src/main/java/net/onrc/onos/core/datastore/hazelcast/VersionedValueSerializableFactory.java b/src/main/java/net/onrc/onos/core/datastore/hazelcast/VersionedValueSerializableFactory.java
index 7177695..d4ed1b2 100644
--- a/src/main/java/net/onrc/onos/core/datastore/hazelcast/VersionedValueSerializableFactory.java
+++ b/src/main/java/net/onrc/onos/core/datastore/hazelcast/VersionedValueSerializableFactory.java
@@ -1,5 +1,7 @@
 package net.onrc.onos.core.datastore.hazelcast;
 
+import net.onrc.onos.core.util.serializers.HazelcastSerializationConstants;
+
 import com.hazelcast.nio.serialization.DataSerializableFactory;
 import com.hazelcast.nio.serialization.IdentifiedDataSerializable;
 
@@ -8,21 +10,11 @@
  */
 public class VersionedValueSerializableFactory implements
         DataSerializableFactory {
-    // revisit these magic numbers
-    /**
-     * IdentifiedDataSerializable Factory ID.
-     */
-    public static final int FACTORY_ID = 1;
-
-    /**
-     * IdentifiedDataSerializable type ID for HZTable.VersionedValue class.
-     */
-    public static final int VERSIONED_VALUE_ID = 1;
 
     @Override
     public IdentifiedDataSerializable create(final int typeId) {
         switch (typeId) {
-            case VERSIONED_VALUE_ID:
+            case HazelcastSerializationConstants.VERSIONED_VALUE_TYPE_ID:
                 return new HZTable.VersionedValue();
 
             default:
diff --git a/src/main/java/net/onrc/onos/core/util/distributed/sharedlog/ByteValue.java b/src/main/java/net/onrc/onos/core/util/distributed/sharedlog/ByteValue.java
new file mode 100644
index 0000000..26f7e75
--- /dev/null
+++ b/src/main/java/net/onrc/onos/core/util/distributed/sharedlog/ByteValue.java
@@ -0,0 +1,45 @@
+package net.onrc.onos.core.util.distributed.sharedlog;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.util.Arrays;
+
+import javax.annotation.concurrent.Immutable;
+
+import com.google.common.annotations.Beta;
+
+import net.onrc.onos.core.util.distributed.sharedlog.internal.LogValue;
+import net.onrc.onos.core.util.distributed.sharedlog.internal.SnapShotValue;
+
+// TODO register to Kryo?
+// TODO Should this be final, or should we allow sub-class
+// TODO Not sure if reusing for SnapShotValue is good idea
+/**
+ * Regular Log Map Value.
+ */
+@Beta
+@Immutable
+public final class ByteValue implements LogValue, SnapShotValue {
+
+    private final byte[] bytes;
+
+    /**
+     * Construct ByteValue.
+     *
+     * @param bytes must not be null
+     */
+    public ByteValue(final byte[] bytes) {
+        this.bytes = Arrays.copyOf(checkNotNull(bytes), bytes.length);
+    }
+
+    /**
+     * Gets the stored bytes.
+     *
+     * @return serialized bytes
+     */
+    public byte[] getBytes() {
+        return Arrays.copyOf(bytes, bytes.length);
+    }
+
+    // TODO toString?
+}
diff --git a/src/main/java/net/onrc/onos/core/util/distributed/sharedlog/LogEventListener.java b/src/main/java/net/onrc/onos/core/util/distributed/sharedlog/LogEventListener.java
new file mode 100644
index 0000000..51f9f85
--- /dev/null
+++ b/src/main/java/net/onrc/onos/core/util/distributed/sharedlog/LogEventListener.java
@@ -0,0 +1,19 @@
+package net.onrc.onos.core.util.distributed.sharedlog;
+
+import com.google.common.annotations.Beta;
+
+/**
+ * Listener interface for SharedLogObject consumer.
+ */
+@Beta
+public interface LogEventListener {
+
+    // TODO Whether to expose logValue is TBD
+    // if exposed, one may manually apply logValue without going through runtime
+    /**
+     * Notification for .
+     *
+     * @param seq updated log entry sequence number
+     */
+    public void logAdded(SeqNum seq/*, ByteValue logValue*/);
+}
diff --git a/src/main/java/net/onrc/onos/core/util/distributed/sharedlog/SeqNum.java b/src/main/java/net/onrc/onos/core/util/distributed/sharedlog/SeqNum.java
new file mode 100644
index 0000000..480138b
--- /dev/null
+++ b/src/main/java/net/onrc/onos/core/util/distributed/sharedlog/SeqNum.java
@@ -0,0 +1,197 @@
+package net.onrc.onos.core.util.distributed.sharedlog;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import javax.annotation.concurrent.Immutable;
+
+import com.google.common.annotations.Beta;
+import com.google.common.primitives.Longs;
+import com.google.common.primitives.UnsignedLong;
+import com.google.common.primitives.UnsignedLongs;
+
+// TODO register to Kryo
+/**
+ * Sequence number used as log entry key.
+ */
+@Beta
+@Immutable
+public final class SeqNum extends Number implements Comparable<SeqNum> {
+
+    /**
+     * Long value reserved for {@link #INITIAL}.
+     */
+    public static final long ZERO = 0L;
+
+    /**
+     * Special sequence number which will never have LogValue.
+     */
+    public static final SeqNum INITIAL = new SeqNum(ZERO);
+
+
+    private final long seq;
+
+    /**
+     * Constructor.
+     *
+     * @param number sequence number.
+     */
+    protected SeqNum(final long number) {
+        this.seq = number;
+    }
+
+    /**
+     * Gets an instance for specified number.
+     *
+     * @param number must not be {@link #ZERO}
+     * @return SeqNum
+     *
+     * @throws IllegalArgumentException if given number was {@link #ZERO}
+     */
+    public static SeqNum valueOf(final long number) {
+        checkArgument(number != ZERO);
+        return new SeqNum(number);
+    }
+
+    // TODO give better name?
+    /**
+     * Gets an instance for specified number.
+     *
+     * @param number sequence number. Can be {@link #ZERO}
+     * @return SeqNum
+     */
+    public static SeqNum anyValueOf(final long number) {
+        return new SeqNum(number);
+    }
+
+    /**
+     * Gets the next sequence number.
+     * <p>
+     * WARN: This is not a atomic sequencer,
+     * this method just returns the next number in sequence.
+     *
+     * @return next sequence number
+     */
+    public SeqNum next() {
+        return step(1);
+    }
+
+    /**
+     * Gets the previous sequence number.
+     * <p>
+     * WARN: This is not a atomic sequencer,
+     * this method just returns the previous number in sequence.
+     *
+     * @return prev sequence number
+     */
+    public SeqNum prev() {
+        return step(-1);
+    }
+
+    /**
+     * Gets the sequence number stepping forward/backward by {@code delta}.
+     *
+     * @param delta step
+     * @return sequence number
+     */
+    public SeqNum step(long delta) {
+        long next = seq + delta;
+        if (next == SeqNum.ZERO) {
+            if (delta >= 0) {
+                return SeqNum.valueOf(next + 1);
+            } else {
+                // XXX Revisit this behavior
+                return SeqNum.valueOf(next - 1);
+            }
+        }
+        return SeqNum.valueOf(next);
+    }
+
+    // TODO comparator which treats long as ring?
+
+    @Override
+    public int intValue() {
+        return UnsignedLong.valueOf(seq).intValue();
+    }
+
+    @Override
+    public long longValue() {
+        return seq;
+    }
+
+    @Override
+    public float floatValue() {
+        return UnsignedLong.valueOf(seq).floatValue();
+    }
+
+    @Override
+    public double doubleValue() {
+        return UnsignedLong.valueOf(seq).doubleValue();
+    }
+
+    @Override
+    public int hashCode() {
+        return Longs.hashCode(seq);
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj) {
+            return true;
+        }
+        if (obj == null) {
+            return false;
+        }
+        if (getClass() != obj.getClass()) {
+            return false;
+        }
+        SeqNum other = (SeqNum) obj;
+        return seq == other.seq;
+    }
+
+    @Override
+    public String toString() {
+        return UnsignedLongs.toString(seq);
+    }
+
+    @Override
+    public int compareTo(final SeqNum o) {
+        checkNotNull(o);
+
+        return Long.signum(-this.distance(o));
+    }
+
+    /**
+     * Returns the distance between other and this SeqNum.
+     *
+     * @param other other SeqNum
+     * @return {@code other - this}
+     *         or -Long.MAX_VALUE, Log.MAX_VALUE if too far apart.
+     */
+    public long distance(final SeqNum other) {
+        checkNotNull(other);
+
+        // distance from INITIAL is always measured clockwise on the ring
+        if (this.equals(INITIAL)) {
+            return (other.seq >= 0) ? other.seq : Long.MAX_VALUE;
+        } else if (other.equals(INITIAL)) {
+            return (this.seq >= 0) ? -this.seq : -Long.MAX_VALUE;
+        }
+
+        /// other cases measures using "shorter" half of the ring
+        final long diff = other.seq - this.seq;
+        if (diff == Long.MIN_VALUE) {
+            // both arc is same distance
+            // treat arc including INITIAL as shorter
+            if (this.seq < 0) {
+                // clock wise arc contain INITIAL
+                return Long.MAX_VALUE;
+            } else {
+                // counter clock wise arc contain INITIAL
+                return -Long.MAX_VALUE;
+            }
+        } else {
+            return diff;
+        }
+    }
+}
diff --git a/src/main/java/net/onrc/onos/core/util/distributed/sharedlog/Sequencer.java b/src/main/java/net/onrc/onos/core/util/distributed/sharedlog/Sequencer.java
new file mode 100644
index 0000000..fa63c33
--- /dev/null
+++ b/src/main/java/net/onrc/onos/core/util/distributed/sharedlog/Sequencer.java
@@ -0,0 +1,27 @@
+package net.onrc.onos.core.util.distributed.sharedlog;
+
+import javax.annotation.concurrent.ThreadSafe;
+
+import com.google.common.annotations.Beta;
+
+/**
+ * Sequencer for LogBasedRuntime.
+ */
+@Beta
+@ThreadSafe
+public interface Sequencer {
+
+    /**
+     * Gets the current sequence number.
+     *
+     * @return current sequence number
+     */
+    public SeqNum get();
+
+    /**
+     * Gets the next sequence number.
+     *
+     * @return next sequence number
+     */
+    public SeqNum next();
+}
diff --git a/src/main/java/net/onrc/onos/core/util/distributed/sharedlog/SharedLogObject.java b/src/main/java/net/onrc/onos/core/util/distributed/sharedlog/SharedLogObject.java
new file mode 100644
index 0000000..75c155f
--- /dev/null
+++ b/src/main/java/net/onrc/onos/core/util/distributed/sharedlog/SharedLogObject.java
@@ -0,0 +1,129 @@
+package net.onrc.onos.core.util.distributed.sharedlog;
+
+import javax.annotation.concurrent.GuardedBy;
+import javax.annotation.concurrent.NotThreadSafe;
+
+import org.apache.commons.lang3.tuple.ImmutablePair;
+
+import com.google.common.annotations.Beta;
+
+import net.onrc.onos.core.util.distributed.sharedlog.internal.LogValue;
+import net.onrc.onos.core.util.distributed.sharedlog.internal.SnapShotValue;
+
+// TODO Should this be abstract class instead, to hide apply/isApplicable?
+// MEMO: Something SharedObject implementor defines
+/**
+ * Shared object backed by shared log structure.
+ *
+ * Object state must be updated only by {@link #apply(SeqNum, LogValue)}
+ */
+@Beta
+@NotThreadSafe // FIXME remove when we make these objects thread safe.
+public interface SharedLogObject {
+
+    /**
+     * ID of this Shared Object.
+     *
+     * @return SharedLogObjectID
+     */
+//    @ThreadSafe // TODO find annotation for method or add javadoc
+    public SharedLogObjectID getObjectID();
+
+    /**
+     * Gets the current log sequence number this instance is on.
+     *
+     * @return Log version of this object.
+     */
+//    @ThreadSafe
+    public SeqNum getSeqNum();
+
+    // FIXME Is there a good way to ensure this will be called only by Runtime
+    /**
+     * Apply changes to the shared object.
+     * <p/>
+     * Developer implementing shared object must implement this method to:
+     * <ul>
+     *  <li>update local instance based on given logValue if ByteValue.</li>
+     *  <li>update SeqNum to given seq.</li>
+     * </ul>
+     * <p/>
+     * Developer must also ensure that this method will never fail.
+     * Any potential error check should be checked beforehand on
+     * {@link #isApplicable(SeqNum, ByteValue)} call.
+     * <p/>
+     * Modification to this shared object instance should only happen
+     * inside this method.
+     * <p/>
+     * This method should only be called by the runtime.
+     *
+     * This method will be called as a side-effect of calling
+     * {@link net.onrc.onos.core.util.distributed.sharedlog.runtime
+     *          .LogBasedRuntime#queryHelper(SharedLogObject)
+     *        LogBasedRuntime#queryHelper(SharedLogObject)}.
+     *
+     * @param seq sequence number of the LogValue
+     * @param logValue {@link ByteValue} to apply or NoOp
+     */
+    @GuardedBy("acquireWriteLock()")
+    void apply(final SeqNum seq, final LogValue logValue);
+
+    /**
+     * Tests if given LogValue is applicable to this .
+     * <p/>
+     * This method will be called before {@link #apply(SeqNum, LogValue)} call.
+     * This method should be implemented to be side-effect free.
+     *
+     * @param seq sequence number of the LogValue
+     * @param logValue LogValue to test
+     * @return true if {@code data} is applicable
+     */
+    @GuardedBy("acquireWriteLock()")
+    public boolean isApplicable(final SeqNum seq, final ByteValue logValue);
+
+
+    // TODO give me better name for SharedLogObject#reset
+    /**
+     * Resets the object to specified snapshot value.
+     *
+     * @param seq Log version of this snapshot
+     * @param ssValue snapshot {@link ByteValue} to apply or NoOp representing initial state.
+     */
+    @GuardedBy("acquireWriteLock()")
+    void reset(final SeqNum seq, final SnapShotValue ssValue);
+
+    /**
+     * Creates a snapshot value of current object.
+     *
+     * @return (current log version, snapshot value)
+     */
+    @GuardedBy("acquireReadLock()")
+    ImmutablePair<SeqNum, ? extends SnapShotValue> createSnapshot();
+
+    /**
+     * Acquires read lock for this object.
+     * <p/>
+     * Note: Lock implementation must be reentrant.
+     */
+    public void acquireReadLock();
+
+    /**
+     * Releases read lock for this object.
+     * <p/>
+     * Note: Lock implementation must be reentrant.
+     */
+    public void releaseReadLock();
+
+    /**
+     * Acquires write lock for this object.
+     * <p/>
+     * Note: Lock implementation must be reentrant.
+     */
+    public void acquireWriteLock();
+
+    /**
+     * Releases write lock for this object.
+     * <p/>
+     * Note: Lock implementation must be reentrant.
+     */
+    public void releaseWriteLock();
+}
diff --git a/src/main/java/net/onrc/onos/core/util/distributed/sharedlog/SharedLogObjectID.java b/src/main/java/net/onrc/onos/core/util/distributed/sharedlog/SharedLogObjectID.java
new file mode 100644
index 0000000..abf9bf0
--- /dev/null
+++ b/src/main/java/net/onrc/onos/core/util/distributed/sharedlog/SharedLogObjectID.java
@@ -0,0 +1,19 @@
+package net.onrc.onos.core.util.distributed.sharedlog;
+
+import com.google.common.annotations.Beta;
+
+/**
+ * ID object to identify each SharedLogObject.
+ * <p/>
+ * Class implementing this interface must implement equals and hashCode.
+ */
+@Beta
+public interface SharedLogObjectID {
+
+    /**
+     * String name for SharedLogObject.
+     *
+     * @return String name for SharedLogObject
+     */
+    public String getObjectName();
+}
diff --git a/src/main/java/net/onrc/onos/core/util/distributed/sharedlog/example/LogAtomicLong.java b/src/main/java/net/onrc/onos/core/util/distributed/sharedlog/example/LogAtomicLong.java
new file mode 100644
index 0000000..b2bad85
--- /dev/null
+++ b/src/main/java/net/onrc/onos/core/util/distributed/sharedlog/example/LogAtomicLong.java
@@ -0,0 +1,384 @@
+package net.onrc.onos.core.util.distributed.sharedlog.example;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import net.onrc.onos.core.datastore.utils.KryoSerializer;
+import net.onrc.onos.core.datastore.utils.Serializer;
+import net.onrc.onos.core.util.distributed.sharedlog.ByteValue;
+import net.onrc.onos.core.util.distributed.sharedlog.SeqNum;
+import net.onrc.onos.core.util.distributed.sharedlog.SharedLogObject;
+import net.onrc.onos.core.util.distributed.sharedlog.SharedLogObjectID;
+import net.onrc.onos.core.util.distributed.sharedlog.exception.LogNotApplicable;
+import net.onrc.onos.core.util.distributed.sharedlog.exception.LogWriteTimedOut;
+import net.onrc.onos.core.util.distributed.sharedlog.internal.LogValue;
+import net.onrc.onos.core.util.distributed.sharedlog.internal.NoOp;
+import net.onrc.onos.core.util.distributed.sharedlog.internal.SnapShotValue;
+import net.onrc.onos.core.util.distributed.sharedlog.runtime.LogBasedRuntime;
+
+/**
+ * Example implementing CAS-able Long as {@link SharedLogObject}.
+ * <p>
+ * This is just an example implementation of SharedLogObject,
+ * not intended for actual use.
+ */
+public class LogAtomicLong implements SharedLogObject {
+
+    private static final Logger log = LoggerFactory
+            .getLogger(LogAtomicLong.class);
+
+    private static final Serializer SERIALIZER = new KryoSerializer(
+            SetEvent.class,
+            CompareAndSetEvent.class);
+
+
+    private final LogBasedRuntime runtime;
+    private final SharedLogObjectID oid;
+
+    private volatile SeqNum current;
+    private long value;
+
+    // adding volatile to `value` is good-enough for this example in reality
+    private ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
+    private Lock readLock = readWriteLock.readLock();
+    private Lock writeLock = readWriteLock.writeLock();
+
+
+    /**
+     * Constructor to create instance synced to latest snapshot.
+     *
+     * @param runtime {@link LogBasedRuntime} to use
+     * @param name name of this {@link SharedLogObject}
+     */
+    public LogAtomicLong(LogBasedRuntime runtime, String name) {
+        this(runtime, checkNotNull(runtime).getOid(name));
+    }
+
+    /**
+     * Constructor to create instance synced to latest snapshot.
+     *
+     * @param runtime {@link LogBasedRuntime} to use
+     * @param name name of this {@link SharedLogObject}
+     * @param ssId snap shot ID to initialize to
+     */
+    public LogAtomicLong(LogBasedRuntime runtime, String name, SeqNum ssId) {
+        this(runtime, checkNotNull(runtime).getOid(name), ssId);
+    }
+
+    /**
+     * Constructor to create instance synced to latest snapshot.
+     *
+     * @param runtime {@link LogBasedRuntime} to use
+     * @param oid {@link SharedLogObjectID} of this object
+     */
+    protected LogAtomicLong(LogBasedRuntime runtime, SharedLogObjectID oid) {
+        this(runtime, oid, checkNotNull(runtime).getLatestSnapShotId(oid));
+    }
+
+    /**
+     * Constructor to initialize to specified snap shot.
+     *
+     * @param runtime {@link LogBasedRuntime} to use
+     * @param oid {@link SharedLogObjectID} of this object
+     * @param ssId snap shot ID to initialize to
+     */
+    protected LogAtomicLong(LogBasedRuntime runtime, SharedLogObjectID oid, SeqNum ssId) {
+        this.runtime = checkNotNull(runtime);
+        this.oid = checkNotNull(oid);
+        boolean success = runtime.resetToSnapShot(this, ssId);
+        if (!success) {
+            throw new IllegalStateException(
+                    "Failed to initialize to specified Snapshot " + ssId);
+        }
+    }
+
+
+    /**
+     * LogMessage for {@link LogAtomicLong#set(long)}.
+     */
+    private static final class SetEvent {
+
+        private final long newValue;
+
+        /**
+         * Constructor.
+         *
+         * @param newValue value to set
+         */
+        public SetEvent(long newValue) {
+            this.newValue = newValue;
+        }
+
+        /**
+         * Gets the new value to set.
+         *
+         * @return new value
+         */
+        public long getNewValue() {
+            return newValue;
+        }
+
+        /**
+         * Default constructor for deserializer.
+         */
+        @SuppressWarnings("unused")
+        @Deprecated
+        public SetEvent() {
+            this.newValue = 0L;
+        }
+    }
+
+    /**
+     * LogMessage for {@link LogAtomicLong#compareAndSet(long, long)}.
+     */
+    private static final class CompareAndSetEvent {
+
+        private final long expect;
+        private final long update;
+
+        /**
+         * Constructor.
+         *
+         * @param expect expected value
+         * @param update new value
+         */
+        public CompareAndSetEvent(long expect, long update) {
+            this.expect = expect;
+            this.update = update;
+        }
+
+        /**
+         * Gets the expected value.
+         *
+         * @return expected value
+         */
+        public long getExpect() {
+            return expect;
+        }
+
+        /**
+         * Gets the new value to update.
+         *
+         * @return new value to update
+         */
+        public long getUpdate() {
+            return update;
+        }
+
+        /**
+         * Default constructor for deserializer.
+         */
+        @SuppressWarnings("unused")
+        @Deprecated
+        public CompareAndSetEvent() {
+            this.expect = 0L;
+            this.update = 0L;
+        }
+    }
+
+
+    /**
+     * Sets the specified value regardless of current value.
+     *
+     * @param newValue new value to set
+     */
+    public void set(long newValue) {
+        ByteValue newLog = serialize(new SetEvent(newValue));
+
+        try {
+            // no need to replay to latest log (3rd option)
+            runtime.updateHelper(this, newLog, false);
+        } catch (LogNotApplicable e) {
+            log.error("Should never happen", e);
+        } catch (LogWriteTimedOut e) {
+            log.warn("Timeout, retrying", e);
+            set(newValue);
+        }
+    }
+
+    /**
+     * Compare and set new value.
+     *
+     * @param expect value expected
+     * @param update new value
+     * @return true if successfully updated
+     */
+    public boolean compareAndSet(long expect, long update) {
+        ByteValue newLog = serialize(new CompareAndSetEvent(expect, update));
+
+        try {
+            runtime.updateHelper(this, newLog, true);
+            return true;
+        } catch (LogNotApplicable e) {
+            return false;
+        } catch (LogWriteTimedOut e) {
+            log.warn("Timeout, retrying", e);
+            return compareAndSet(expect, update);
+        }
+    }
+
+    /**
+     * Serializes SetEvent to ByteValue.
+     *
+     * @param event {@link SetEvent}
+     * @return {@link ByteValue}
+     */
+    private ByteValue serialize(SetEvent event) {
+        return new ByteValue(SERIALIZER.serialize(event));
+    }
+
+    /**
+     * Serializes CompareAndSetEvent to ByteValue.
+     *
+     * @param event {@link CompareAndSetEvent}
+     * @return {@link ByteValue}
+     */
+    private ByteValue serialize(CompareAndSetEvent event) {
+        return new ByteValue(SERIALIZER.serialize(event));
+    }
+
+    /**
+     * Gets the current value.
+     *
+     * @return current value
+     */
+    public long get() {
+        runtime.queryHelper(this);
+        acquireReadLock();
+        try {
+            return value;
+        } finally {
+            releaseReadLock();
+        }
+    }
+
+    @Override
+    public String toString() {
+        return Long.toString(get());
+    }
+
+    @Override
+    public SharedLogObjectID getObjectID() {
+        return this.oid;
+    }
+
+    @Override
+    public SeqNum getSeqNum() {
+        return this.current;
+    }
+
+    @GuardedBy("writeLock")
+    @Override
+    public void apply(SeqNum seq, LogValue logValue) {
+        // This should be the only method modifying this instance's field
+
+        if (logValue instanceof ByteValue) {
+            ByteValue byteValue = (ByteValue) logValue;
+            final Object event = SERIALIZER.deserialize(byteValue.getBytes());
+            if (event instanceof SetEvent) {
+                applySetEvent(seq, (SetEvent) event);
+            }
+            if (event instanceof CompareAndSetEvent) {
+                applyCompareAndSetEvent(seq, (CompareAndSetEvent) event);
+            }
+
+        } else if (logValue instanceof NoOp) {
+            this.current = seq;
+        }
+    }
+
+    /**
+     * Applies {@link CompareAndSetEvent}.
+     *
+     * @param seq sequence number of event
+     * @param casEvent {@link CompareAndSetEvent}
+     */
+    @GuardedBy("writeLock")
+    private void applyCompareAndSetEvent(SeqNum seq, final CompareAndSetEvent casEvent) {
+        assert (this.value == casEvent.expect);
+        this.current = seq;
+        this.value = casEvent.getUpdate();
+    }
+
+    /**
+     * Applies {@link SetEvent}.
+     *
+     * @param seq sequence number of event
+     * @param setEvent {@link SetEvent}
+     */
+    @GuardedBy("writeLock")
+    private void applySetEvent(SeqNum seq, final SetEvent setEvent) {
+        this.current = seq;
+        this.value = setEvent.getNewValue();
+    }
+
+    @GuardedBy("writeLock")
+    @Override
+    public void reset(SeqNum seq, SnapShotValue ssValue) {
+        if (ssValue instanceof ByteValue) {
+            ByteValue byteValue = (ByteValue) ssValue;
+            final Object event = SERIALIZER.deserialize(byteValue.getBytes());
+            if (event instanceof SetEvent) {
+                applySetEvent(seq, (SetEvent) event);
+            } else {
+                log.error("Unexpected SnapShot ByteValue encountered {}", event);
+            }
+
+        } else if (ssValue instanceof NoOp) {
+            this.current = seq;
+            this.value = 0L;
+        } else {
+            log.error("Unexpected SnapShotValue encountered {}", ssValue);
+        }
+    }
+
+    @GuardedBy("readLock")
+    @Override
+    public ImmutablePair<SeqNum, ? extends SnapShotValue> createSnapshot() {
+        return ImmutablePair.of(getSeqNum(), serialize(new SetEvent(get())));
+    }
+
+    @GuardedBy("readLock")
+    @Override
+    public boolean isApplicable(SeqNum seq, ByteValue logValue) {
+
+        final Object event = SERIALIZER.deserialize(logValue.getBytes());
+        if (event instanceof SetEvent) {
+            return true;
+        }
+        if (event instanceof CompareAndSetEvent) {
+            CompareAndSetEvent casEvent = (CompareAndSetEvent) event;
+            return casEvent.getExpect() == value;
+        }
+        return false;
+    }
+
+    @Override
+    public void acquireReadLock() {
+        this.readLock.lock();
+    }
+
+    @Override
+    public void releaseReadLock() {
+        this.readLock.unlock();
+    }
+
+    @Override
+    public void acquireWriteLock() {
+        this.writeLock.lock();
+    }
+
+    @Override
+    public void releaseWriteLock() {
+        this.writeLock.unlock();
+    }
+}
diff --git a/src/main/java/net/onrc/onos/core/util/distributed/sharedlog/exception/LogNotApplicable.java b/src/main/java/net/onrc/onos/core/util/distributed/sharedlog/exception/LogNotApplicable.java
new file mode 100644
index 0000000..f41f11e
--- /dev/null
+++ b/src/main/java/net/onrc/onos/core/util/distributed/sharedlog/exception/LogNotApplicable.java
@@ -0,0 +1,51 @@
+package net.onrc.onos.core.util.distributed.sharedlog.exception;
+
+import com.google.common.annotations.Beta;
+
+/**
+ * Exception thrown, when log cannot be applied to the shared log object.
+ */
+@Beta
+public class LogNotApplicable extends Exception {
+
+    private static final long serialVersionUID = 670547401260137360L;
+
+    /**
+     * {@link Exception#Exception()}.
+     */
+    public LogNotApplicable() {
+        super();
+    }
+
+    /**
+     * Constructs a new exception with the specified detail message.
+     * {@link Exception#Exception(String)}
+     *
+     * @param message failure description
+     */
+    public LogNotApplicable(String message) {
+        super(message);
+    }
+
+    /**
+     * Constructs a new exception with the specified cause.
+     * {@link Exception#Exception(Throwable)}
+     *
+     * @param cause exception causing this.
+     */
+    public LogNotApplicable(Throwable cause) {
+        super(cause);
+    }
+
+    /**
+     * Constructs a new exception with the specified detail message and cause.
+     * {@link Exception#Exception(String, Throwable)}
+     *
+     * @param message failure description
+     * @param cause exception causing this.
+     */
+    public LogNotApplicable(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+}
diff --git a/src/main/java/net/onrc/onos/core/util/distributed/sharedlog/exception/LogNotContiguous.java b/src/main/java/net/onrc/onos/core/util/distributed/sharedlog/exception/LogNotContiguous.java
new file mode 100644
index 0000000..a68ec4a
--- /dev/null
+++ b/src/main/java/net/onrc/onos/core/util/distributed/sharedlog/exception/LogNotContiguous.java
@@ -0,0 +1,65 @@
+package net.onrc.onos.core.util.distributed.sharedlog.exception;
+
+import com.google.common.annotations.Beta;
+
+import net.onrc.onos.core.util.distributed.sharedlog.SeqNum;
+
+/**
+ * Exception thrown, when log can no longer be replayed.
+ * Caller need to jump to the available snapshot and restart from snapshot.
+ */
+@Beta
+public class LogNotContiguous extends Exception {
+
+    /**
+     * Construct a new exception.
+     *
+     * @param failed sequence number which failed
+     */
+    public LogNotContiguous(final SeqNum failed) {
+        this(failed + " cannot be read");
+    }
+
+    /**
+     * Constructs a new exception with the specified detail message.
+     * {@link Exception#Exception(String)}
+     *
+     * @param message failure description
+     */
+    protected LogNotContiguous(String message) {
+        super(message);
+    }
+
+    /**
+     * Constructs a new exception with the specified cause.
+     * {@link Exception#Exception(Throwable)}
+     *
+     * @param failed sequence number which failed
+     * @param cause exception causing this.
+     */
+    public LogNotContiguous(SeqNum failed, Throwable cause) {
+        this(failed + " cannot be read", cause);
+    }
+
+    /**
+     * Constructs a new exception with the specified cause.
+     * {@link Exception#Exception(Throwable)}
+     *
+     * @param cause exception causing this.
+     */
+    protected LogNotContiguous(Throwable cause) {
+        super(cause);
+    }
+
+    /**
+     * Constructs a new exception with the specified detail message and cause.
+     * {@link Exception#Exception(String, Throwable)}
+     *
+     * @param message failure description
+     * @param cause exception causing this.
+     */
+    public LogNotContiguous(String message, Throwable cause) {
+        super(message, cause);
+        // TODO Auto-generated constructor stub
+    }
+}
diff --git a/src/main/java/net/onrc/onos/core/util/distributed/sharedlog/exception/LogWriteTimedOut.java b/src/main/java/net/onrc/onos/core/util/distributed/sharedlog/exception/LogWriteTimedOut.java
new file mode 100644
index 0000000..4e9c38d
--- /dev/null
+++ b/src/main/java/net/onrc/onos/core/util/distributed/sharedlog/exception/LogWriteTimedOut.java
@@ -0,0 +1,47 @@
+package net.onrc.onos.core.util.distributed.sharedlog.exception;
+
+import com.google.common.annotations.Beta;
+
+/**
+ * Exception thrown, when allocated sequence number timed out.
+ */
+@Beta
+public class LogWriteTimedOut extends Exception {
+
+    /**
+     * {@link Exception#Exception()}.
+     */
+    public LogWriteTimedOut() {
+    }
+
+    /**
+     * Constructs a new exception with the specified detail message.
+     * {@link Exception#Exception(String)}
+     *
+     * @param message failure description
+     */
+    public LogWriteTimedOut(String message) {
+        super(message);
+    }
+
+    /**
+     * Constructs a new exception with the specified cause.
+     * {@link Exception#Exception(Throwable)}
+     *
+     * @param cause exception causing this.
+     */
+    public LogWriteTimedOut(Throwable cause) {
+        super(cause);
+    }
+
+    /**
+     * Constructs a new exception with the specified detail message and cause.
+     * {@link Exception#Exception(String, Throwable)}
+     *
+     * @param message failure description
+     * @param cause exception causing this.
+     */
+    public LogWriteTimedOut(String message, Throwable cause) {
+        super(message, cause);
+    }
+}
diff --git a/src/main/java/net/onrc/onos/core/util/distributed/sharedlog/hazelcast/HazelcastRuntime.java b/src/main/java/net/onrc/onos/core/util/distributed/sharedlog/hazelcast/HazelcastRuntime.java
new file mode 100644
index 0000000..3becaa9
--- /dev/null
+++ b/src/main/java/net/onrc/onos/core/util/distributed/sharedlog/hazelcast/HazelcastRuntime.java
@@ -0,0 +1,621 @@
+package net.onrc.onos.core.util.distributed.sharedlog.hazelcast;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkArgument;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import net.onrc.onos.core.util.distributed.sharedlog.ByteValue;
+import net.onrc.onos.core.util.distributed.sharedlog.LogEventListener;
+import net.onrc.onos.core.util.distributed.sharedlog.SeqNum;
+import net.onrc.onos.core.util.distributed.sharedlog.Sequencer;
+import net.onrc.onos.core.util.distributed.sharedlog.SharedLogObject;
+import net.onrc.onos.core.util.distributed.sharedlog.SharedLogObjectID;
+import net.onrc.onos.core.util.distributed.sharedlog.exception.LogNotApplicable;
+import net.onrc.onos.core.util.distributed.sharedlog.exception.LogWriteTimedOut;
+import net.onrc.onos.core.util.distributed.sharedlog.internal.LogValue;
+import net.onrc.onos.core.util.distributed.sharedlog.internal.SnapShotValue;
+import net.onrc.onos.core.util.distributed.sharedlog.internal.StringID;
+import net.onrc.onos.core.util.distributed.sharedlog.internal.NoOp;
+import net.onrc.onos.core.util.distributed.sharedlog.runtime.LogBasedRuntime;
+import net.onrc.onos.core.util.distributed.sharedlog.runtime.SequencerRuntime;
+
+import org.apache.commons.lang3.concurrent.ConcurrentException;
+import org.apache.commons.lang3.concurrent.ConcurrentInitializer;
+import org.apache.commons.lang3.concurrent.ConcurrentUtils;
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.joda.time.DateTime;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.Beta;
+import com.hazelcast.core.HazelcastInstance;
+import com.hazelcast.core.IAtomicLong;
+import com.hazelcast.core.IFunction;
+import com.hazelcast.core.IMap;
+import com.hazelcast.nio.ObjectDataInput;
+import com.hazelcast.nio.ObjectDataOutput;
+import com.hazelcast.nio.serialization.DataSerializable;
+
+
+/**
+ * LogBasedRuntime using Hazelcast.
+ */
+@Beta
+public class HazelcastRuntime implements LogBasedRuntime {
+
+    private static final Logger log = LoggerFactory
+            .getLogger(HazelcastRuntime.class);
+
+    // Hazelcast distributed object name prefixes
+    private static final String LATEST_SNAPSHOT_PREFIX = "latestSS://";
+    private static final String SNAPSHOTMAP_PREFIX = "ssmap://";
+    private static final String LOGMAP_PREFIX = "logmap://";
+
+    // FIXME set appropriate timeout & make it configurable
+    private static final long LOG_READ_TIMEOUT_MS = 1000L;
+
+    // FIXME make these configurable from file, etc.
+    /**
+     * Number of times to retry, if write to LogMap failed due to
+     * other node invalidated the sequencer due to timeout.
+     */
+    private static final int UPDATE_RETRIES = 5;
+
+    /**
+     * Interval to check if we should create SnapShot.
+     * <p>
+     * If the sequence number of the log just written is multiple of this value,
+     * SnapShot check and log cleanup will be triggered.
+     */
+    private static final int SNAPSHOT_CHECK_INTERVAL = 50;
+
+    /**
+     * Interval SnapShot should be created.
+     * <p>
+     * SnapShot will be created if latest SnapShot was more than
+     * this interval apart from last written.
+     */
+    public static final int SNAPSHOT_INTERVAL = 500;
+
+    /**
+     * Maximum number of Snapshots to store in distributed Map.
+     */
+    public static final int MAX_SNAPSHOTS = 10;
+
+    // TODO create ThreadFactory to assign thread name and lower priority
+    // executor for SnapShot builder
+    private static final ExecutorService EXECUTOR =
+            new ThreadPoolExecutor(1, 5, 3L, TimeUnit.MINUTES,
+                    new ArrayBlockingQueue<Runnable>(10),
+                    new ThreadPoolExecutor.DiscardOldestPolicy());
+
+    private final HazelcastInstance instance;
+    private final SequencerRuntime sequencerRuntime;
+
+    private final ConcurrentMap<SharedLogObjectID, LogMapManager>
+                    listenerMap;
+
+    /**
+     * Initializes Hazelcast based LogRuntime.
+     *
+     * @param instance Hazelcast instance to use
+     * @param sequencerRuntime {@link SequencerRuntime} to use
+     */
+    public HazelcastRuntime(HazelcastInstance instance,
+                            SequencerRuntime sequencerRuntime) {
+
+        this.instance = checkNotNull(instance);
+        this.sequencerRuntime = checkNotNull(sequencerRuntime);
+        this.listenerMap = new ConcurrentHashMap<>();
+    }
+
+    @Override
+    public SharedLogObjectID getOid(String sharedObjectName) {
+
+        // TODO Create specialized SharedLogObject, if getting IMap from
+        //      HZ instance each time becomes a significant overhead.
+
+        return new StringID(sharedObjectName);
+    }
+
+    @Override
+    public SeqNum updateHelper(SharedLogObject sobj, ByteValue logValue,
+                                boolean queryBeforeUpdate)
+                                   throws LogNotApplicable, LogWriteTimedOut {
+
+        return updateHelper(sobj, logValue, queryBeforeUpdate, UPDATE_RETRIES);
+    }
+
+    /**
+     * Apply given logValue to SharedLogObject.
+     *
+     * @param sobj SharedLogObject to manipulate.
+     * @param logValue LogValue to apply
+     * @param queryBeforeUpdate true if log should be replayed to latest before updating
+     * @param retries number of retries
+     * @return sequence number of the new log entry
+     *
+     * @throws LogNotApplicable Thrown when {@code logValue} was not applicable to
+     *          SharedLogObject.
+     * @throws LogWriteTimedOut Thrown when write failed due to time out.
+     */
+    private SeqNum updateHelper(final SharedLogObject sobj, ByteValue logValue,
+                                boolean queryBeforeUpdate, int retries)
+                                   throws LogNotApplicable, LogWriteTimedOut {
+
+        checkNotNull(sobj);
+        checkNotNull(logValue);
+
+        log.trace("updating {}@{} with {}", sobj.getObjectID(), sobj.getSeqNum(),
+                    logValue);
+
+        final Sequencer sequencer = sequencerRuntime.getSequencer(sobj.getObjectID());
+        final IMap<SeqNum, LogValue> logMap = getLogMap(sobj);
+
+        // allocate seq #
+        final SeqNum allocated = sequencer.next();
+        log.trace("allocated {}", allocated);
+
+        // replay
+        if (queryBeforeUpdate) {
+            // FIXME How to handle initial case?
+            queryHelper(sobj, allocated.prev());
+        }
+
+        // test if applicable
+        sobj.acquireWriteLock();
+        try {
+            final boolean isApplicable = sobj.isApplicable(allocated, logValue);
+            if (!isApplicable) {
+                log.trace("log not applicable abondoning {}", allocated);
+                logMap.putIfAbsent(allocated, NoOp.VALUE);
+                throw new LogNotApplicable("Rejected by " + sobj.getObjectID());
+            }
+
+            // write to shared log
+            LogValue existing = logMap.putIfAbsent(allocated, logValue);
+            if (NoOp.VALUE.equals(existing)) {
+                if (retries > 0) {
+                    log.trace("write failed due to time out retrying {}", retries);
+                    return updateHelper(sobj, logValue, queryBeforeUpdate, retries - 1);
+                }
+                throw new LogWriteTimedOut("Was timed out by other node by " + sobj.getObjectID());
+            }
+
+            // apply to local object
+            sobj.apply(allocated, logValue);
+
+            // Success.
+
+            // FIXME Current design is that any SharedObject instance could
+            // become SnapShot writer/Log cleaner.
+            // We may need provide a way for a SharedObject instance
+            // to declare that it does not want to become
+            // a SnapShot writer (Don't wont unexpected ReadLock, etc.)
+
+            // give hint to snapshot builder
+            if (allocated.longValue() % SNAPSHOT_CHECK_INTERVAL == 0) {
+                // check and create snapshot in background
+                EXECUTOR.execute(new Runnable() {
+
+                    @Override
+                    public void run() {
+                        createSnapShot(sobj);
+                    }
+                });
+            }
+            return allocated;
+
+        } finally {
+            sobj.releaseWriteLock();
+        }
+    }
+
+    /**
+     * Updates latest snapshot pointer.
+     * <p>
+     * IFunction to apply to IAtomicLong used as latest snapshot pointer.
+     */
+    public static final class UpdateLatestSnapshot implements
+                                IFunction<Long, Long>, DataSerializable {
+
+        private SeqNum ssCreated;
+
+        /**
+         * Updates latest snapshot pointer to {@code ssCreated}.
+         *
+         * @param ssCreated value to update to
+         */
+        public UpdateLatestSnapshot(SeqNum ssCreated) {
+            this.ssCreated = ssCreated;
+        }
+
+        @Override
+        public Long apply(Long input) {
+            final SeqNum in = SeqNum.anyValueOf(input);
+
+            if (ssCreated.compareTo(in) > 0) {
+                // update if snapshot written is >= existing latest SS.
+                return ssCreated.longValue();
+            } else {
+                return input;
+            }
+        }
+
+        @Override
+        public void writeData(ObjectDataOutput out) throws IOException {
+            out.writeLong(ssCreated.longValue());
+        }
+
+        @Override
+        public void readData(ObjectDataInput in) throws IOException {
+            this.ssCreated = SeqNum.valueOf(in.readLong());
+        }
+
+        /**
+         * Default constructor for deserialization.
+         */
+        public UpdateLatestSnapshot() {
+            this.ssCreated = SeqNum.INITIAL;
+        }
+    }
+
+    /**
+     * Creates a snapshot of {@link SharedLogObject} if
+     * latest snapshot is old.
+     *
+     * @param sobj {@link SharedLogObject} to create snapshot.
+     */
+    private void createSnapShot(SharedLogObject sobj) {
+        if (sobj == null) {
+            log.warn("Tried to create snapshot of null object. Ignoring.");
+            return;
+        }
+        log.trace("Checking if {} needs a new snapshot.", sobj.getObjectID());
+
+        // XXX lock can be more fine grained if necessary
+        final IAtomicLong rawLatest = getLatestSnapshotStore(sobj.getObjectID());
+        sobj.acquireReadLock();
+        try {
+            final SeqNum current = sobj.getSeqNum();
+            final SeqNum latest = SeqNum.anyValueOf(rawLatest.get());
+
+            // if distance(current - latest) > SNAPSHOT_INTERVAL
+            if (latest.distance(current) < SNAPSHOT_INTERVAL) {
+                // not the time to create snap shot yet
+                log.trace("Skip creating snapshot. current:{}, latest:{}", current, latest);
+                return;
+            }
+            // create SnapShot Bytes
+            final ImmutablePair<SeqNum, ? extends SnapShotValue> snapshot
+                = sobj.createSnapshot();
+
+            // write to SnapShot Map
+            IMap<SeqNum, SnapShotValue> ssMap = getSnapshotMap(sobj);
+            final SeqNum ssCreated = snapshot.getKey();
+            // XXX is there a way to do setIfAbsent
+            ssMap.set(ssCreated, snapshot.getValue());
+            log.info("Created snapshot. {}@{}", sobj.getObjectID(), ssCreated);
+
+            // update latest snapshot pointer
+            rawLatest.alter(new UpdateLatestSnapshot(ssCreated));
+
+        } finally {
+            sobj.releaseReadLock();
+        }
+
+        // XXX Should we be triggering log clean up in same context?
+        IMap<SeqNum, SnapShotValue> ssMap = getSnapshotMap(sobj);
+
+        // note: snapshots is a copy of keySet unlike usual Map
+        Set<SeqNum> snapshots = ssMap.keySet();
+        if (snapshots.size() > MAX_SNAPSHOTS) {
+            // FIXME we should be able to avoid prev() walking if we can
+            // always safely compare orders of SeqNums.
+
+            SeqNum latest = SeqNum.valueOf(rawLatest.get());
+            snapshots.remove(latest);
+            SeqNum remove = latest.prev();
+            while (snapshots.size() > MAX_SNAPSHOTS) {
+                snapshots.remove(remove);
+                remove = remove.prev();
+            }
+
+            // what's still left in `snapshots` is subject to removal.
+            // delete snapshots and find log SeqNum to start removing
+            SeqNum deleteBeforeThis = null;
+            for (SeqNum ss : snapshots) {
+                log.debug("Removed snapshot {}@{}", sobj.getObjectID(), ss);
+                // XXX If we decide to persist snapshots for reference,
+                //     evict() ing here is another option.
+                ssMap.delete(ss);
+                if (deleteBeforeThis == null) {
+                    deleteBeforeThis = ss;
+                } else {
+                    if (deleteBeforeThis.compareTo(ss) > 0) {
+                        deleteBeforeThis = ss;
+                    }
+                }
+            }
+            if (deleteBeforeThis == null) {
+                // nothing to do
+                return;
+            }
+
+            // start removing log
+            IMap<SeqNum, LogValue> logMap = getLogMap(sobj);
+            // naive log cleaning
+            log.debug("Trimming log before {}@{}...", sobj.getObjectID(), deleteBeforeThis);
+            while (logMap.remove(deleteBeforeThis) != null) {
+                deleteBeforeThis = deleteBeforeThis.prev();
+            }
+            log.debug("Trimming log up to this{}@{}...", sobj.getObjectID(), deleteBeforeThis);
+        }
+    }
+
+    /**
+     * Gets the Log Map for specified shared object.
+     *
+     * @param sobj {@link SharedLogObject}
+     * @return Log Map
+     */
+    private IMap<SeqNum, LogValue> getLogMap(SharedLogObject sobj) {
+        return getLogMap(sobj.getObjectID());
+    }
+
+    /**
+     * Gets the Log Map for specified shared object ID.
+     *
+     * @param oid {@link SharedLogObjectID}
+     * @return Log Map
+     */
+    private IMap<SeqNum, LogValue> getLogMap(SharedLogObjectID oid) {
+        return instance.getMap(LOGMAP_PREFIX + oid.getObjectName());
+    }
+
+    /**
+     * Gets the Snapshot Map for specified shared object.
+     *
+     * @param sobj {@link SharedLogObject}
+     * @return Snapshot Map
+     */
+    private IMap<SeqNum, SnapShotValue> getSnapshotMap(SharedLogObject sobj) {
+        return instance.getMap(SNAPSHOTMAP_PREFIX + sobj.getObjectID().getObjectName());
+    }
+
+    /**
+     * Gets the store holding latest snapshot version.
+     *
+     * @param oid {@link SharedLogObjectID}
+     * @return IAtomicLong
+     */
+    private IAtomicLong getLatestSnapshotStore(SharedLogObjectID oid) {
+        return instance.getAtomicLong(LATEST_SNAPSHOT_PREFIX + oid.getObjectName());
+    }
+
+    @Override
+    public void queryHelper(SharedLogObject sobj) {
+        queryHelper(checkNotNull(sobj), sequencerRuntime.getSequencer(sobj.getObjectID()).get());
+    }
+
+    @Override
+    public void queryHelper(SharedLogObject sobj, SeqNum replayTo) {
+
+        checkNotNull(sobj);
+        checkNotNull(replayTo);
+        // TODO check if sobj.getSeqNum() <= replayTo
+
+        if (sobj.getSeqNum().equals(replayTo)) {
+            // nothing to do.
+            return;
+        }
+
+        log.trace("querying {}@{} to {}", sobj.getObjectID(), sobj.getSeqNum(),
+                                            replayTo);
+
+        IMap<SeqNum, LogValue> logMap = getLogMap(sobj);
+        final LogMapManager logCache = getLogMapManager(sobj.getObjectID());
+
+        // TODO Consider more fine grained lock if we ever need
+        //   SharedLogObject to be exposed to heavy concurrent read/write.
+
+        sobj.acquireWriteLock();
+        try {
+            final SeqNum current = sobj.getSeqNum();
+
+            List<SeqNum> range = getSeqNumRange(current, replayTo);
+            Map<SeqNum, Future<LogValue>> values = new HashMap<>(range.size());
+
+            // FIXME use notification based cache
+
+            // pre-request all range first to pre-populate near cache
+            log.trace("Pre reading range: {}", range);
+            for (SeqNum key : range) {
+                Future<LogValue> value = logCache.getLogValue(key);
+                values.put(key, value);
+            }
+
+            // walk and apply range 1 by 1
+            for (SeqNum key : range) {
+                Future<LogValue> future = values.get(key);
+                LogValue value = null;
+
+                // FIXME handle Recycled Snapshot log entry scenario
+                // when the get result is null it could be waiting for writer
+                // or log has been trimmed.
+
+                final long deadline = System.currentTimeMillis()
+                                        + LOG_READ_TIMEOUT_MS;
+                if (log.isTraceEnabled()) {
+                    log.trace("Try reading {} until {}", key, new DateTime(deadline));
+                }
+                while (System.currentTimeMillis() < deadline) {
+                    try {
+                        value = future.get(LOG_READ_TIMEOUT_MS,
+                                                    TimeUnit.MILLISECONDS);
+                        if (value == null) {
+                            log.trace("{} was not there yet retrying.", key);
+                            // value not set yet, retry;
+                            future = logCache.getLogValue(key);
+                            continue;
+                        }
+                    } catch (InterruptedException | ExecutionException e) {
+                        log.error("queryError retrying", e);
+                        future = logCache.getLogValue(key);
+                        continue;
+                    } catch (TimeoutException e) {
+                        log.warn("query timed out", e);
+                    }
+                }
+                if (value == null) {
+                    log.trace("{} was not found, writer failed?", key);
+                    // writer failure scenario handling
+                    //  mark sequence number null
+                    LogValue newVal = logMap.put(key, NoOp.VALUE);
+                    if (newVal == null) {
+                        // skip this sequence number.
+                        continue;
+                    }
+                    log.trace("{} appeared while invalidating.", newVal);
+                    value = newVal;
+                }
+
+                log.trace("applying log {} {}", key, value);
+                // apply log value
+                sobj.apply(key, value);
+            }
+
+        } finally {
+            sobj.releaseWriteLock();
+            log.trace("query done");
+        }
+    }
+
+    /**
+     * Creates a List containing (afterThis, uptoThis].
+     *
+     * @param afterThis beginning of range not including this sequence number
+     * @param upToThis end of range including this sequence number
+     * @return List of SeqNum in order
+     */
+    private List<SeqNum> getSeqNumRange(SeqNum afterThis, SeqNum upToThis) {
+        checkArgument(!upToThis.equals(SeqNum.INITIAL),
+                        "upToThis must not be INITIAL");
+
+        // TODO Wasting heap, etc.
+        // If iterable SeqNum is the only thing required,
+        // implement SeqNum range class, etc.
+        List<SeqNum> range = new ArrayList<>();
+        for (SeqNum i = afterThis.next(); !i.equals(upToThis); i = i.next()) {
+            range.add(i);
+        }
+        range.add(upToThis);
+        return range;
+    }
+
+    @Override
+    public void addListener(SharedLogObjectID oid, LogEventListener listener) {
+        LogMapManager hzListener = getLogMapManager(oid);
+        hzListener.addListener(listener);
+
+        getLogMap(oid).addEntryListener(hzListener, true);
+    }
+
+    @Override
+    public void removeListener(SharedLogObjectID oid, LogEventListener listener) {
+        LogMapManager hzListener = getLogMapManager(oid);
+        hzListener.removeListener(listener);
+    }
+
+    /**
+     * Gets the LogMapManager for given {@link SharedLogObjectID}.
+     * <p/>
+     * If listener was not registered, it will create and register a listener.
+     *
+     * @param oid {@link SharedLogObjectID}
+     * @return {@link LogMapManager}
+     */
+    private LogMapManager getLogMapManager(final SharedLogObjectID oid) {
+        LogMapManager listener
+            = ConcurrentUtils.createIfAbsentUnchecked(listenerMap, oid,
+                new ConcurrentInitializer<LogMapManager>() {
+                    @Override
+                    public LogMapManager get() throws ConcurrentException {
+                        IMap<SeqNum, LogValue> logMap = getLogMap(oid);
+                        return new LogMapManager(oid, logMap);
+                    }
+                });
+        return listener;
+    }
+
+    @Override
+    public SeqNum getLatestSnapShotId(SharedLogObjectID oid) {
+        IAtomicLong latest = getLatestSnapshotStore(oid);
+        final long ssId = latest.get();
+        return SeqNum.anyValueOf(ssId);
+    }
+
+    @Override
+    public boolean resetToSnapShot(SharedLogObject sobj, SeqNum ssId) {
+        checkNotNull(ssId);
+
+        IMap<SeqNum, SnapShotValue> ssMap = getSnapshotMap(sobj);
+        SnapShotValue snapshot = ssMap.get(ssId);
+        if (snapshot == null) {
+            if (ssId.equals(SeqNum.INITIAL)) {
+                snapshot = NoOp.VALUE;
+            } else {
+                log.error("Invalid Snapshot version {}@{} specified",
+                                    sobj.getObjectID(), ssId);
+                return false;
+            }
+        }
+
+        sobj.reset(ssId, snapshot);
+
+        return true;
+    }
+
+    /**
+     * Gets the LogValues in range ({@code after}, {@code upToThis}].
+     *
+     * @param oid ID of SharedLogObject
+     * @param after sequence number before (typically current sequence number)
+     * @param upToThis last sequence number you want to retrieve
+     * @return List of {@link LogValue}s in specified range
+     */
+    @Override
+    public List<LogValue> getLogRange(final SharedLogObjectID oid,
+                                    final SeqNum after, final SeqNum upToThis) {
+
+        List<LogValue> logs = new ArrayList<>();
+        final SeqNum oneAfterLast = upToThis.next();
+        LogMapManager logCache = getLogMapManager(oid);
+        // may want to do async pre-fetching if RPC is triggered often
+        for (SeqNum s = after.next(); !s.equals(oneAfterLast); s = s.next()) {
+            try {
+                logs.add(logCache.getLogValue(s).get());
+                // FIXME Need to somehow detect log has been recycled case and fail
+            } catch (InterruptedException | ExecutionException e) {
+                log.error("Failed getting log " + oid + "@" + s, e);
+                // Should we retry/[ignore]/throw?
+            }
+        }
+        return logs;
+    }
+}
diff --git a/src/main/java/net/onrc/onos/core/util/distributed/sharedlog/hazelcast/HazelcastSequencer.java b/src/main/java/net/onrc/onos/core/util/distributed/sharedlog/hazelcast/HazelcastSequencer.java
new file mode 100644
index 0000000..7969999
--- /dev/null
+++ b/src/main/java/net/onrc/onos/core/util/distributed/sharedlog/hazelcast/HazelcastSequencer.java
@@ -0,0 +1,41 @@
+package net.onrc.onos.core.util.distributed.sharedlog.hazelcast;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.common.annotations.Beta;
+import com.hazelcast.core.IAtomicLong;
+
+import net.onrc.onos.core.util.distributed.sharedlog.SeqNum;
+import net.onrc.onos.core.util.distributed.sharedlog.Sequencer;
+
+/**
+ * Sequencer implemented using Hazelcast IAtomicLong.
+ */
+@Beta
+public final class HazelcastSequencer implements Sequencer {
+
+    private final IAtomicLong value;
+
+    /**
+     * Constructor.
+     *
+     * @param value IAtomicLong to use as Sequencer
+     */
+    public HazelcastSequencer(IAtomicLong value) {
+        this.value = checkNotNull(value);
+    }
+
+    @Override
+    public SeqNum get() {
+        return SeqNum.anyValueOf(value.get());
+    }
+
+    @Override
+    public SeqNum next() {
+        long next = value.incrementAndGet();
+        if (next == SeqNum.ZERO) {
+            next = value.incrementAndGet();
+        }
+        return SeqNum.valueOf(next);
+    }
+}
diff --git a/src/main/java/net/onrc/onos/core/util/distributed/sharedlog/hazelcast/HazelcastSequencerRuntime.java b/src/main/java/net/onrc/onos/core/util/distributed/sharedlog/hazelcast/HazelcastSequencerRuntime.java
new file mode 100644
index 0000000..7972b89
--- /dev/null
+++ b/src/main/java/net/onrc/onos/core/util/distributed/sharedlog/hazelcast/HazelcastSequencerRuntime.java
@@ -0,0 +1,35 @@
+package net.onrc.onos.core.util.distributed.sharedlog.hazelcast;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.common.annotations.Beta;
+import com.hazelcast.core.HazelcastInstance;
+
+import net.onrc.onos.core.util.distributed.sharedlog.Sequencer;
+import net.onrc.onos.core.util.distributed.sharedlog.SharedLogObjectID;
+import net.onrc.onos.core.util.distributed.sharedlog.runtime.SequencerRuntime;
+
+/**
+ * Runtime which hands out the Sequencer backed by Hazelcast IAtomicLong.
+ */
+@Beta
+public class HazelcastSequencerRuntime implements SequencerRuntime {
+
+    private final HazelcastInstance instance;
+
+    /**
+     * Constructs SequencerRuntime.
+     *
+     * @param instance HazelcastInstance to use.
+     */
+    public HazelcastSequencerRuntime(HazelcastInstance instance) {
+        this.instance = checkNotNull(instance);
+    }
+
+    @Override
+    public Sequencer getSequencer(SharedLogObjectID id) {
+        return new HazelcastSequencer(
+                instance.getAtomicLong(
+                        "sequencer://" + id.getObjectName()));
+    }
+}
diff --git a/src/main/java/net/onrc/onos/core/util/distributed/sharedlog/hazelcast/LogMapManager.java b/src/main/java/net/onrc/onos/core/util/distributed/sharedlog/hazelcast/LogMapManager.java
new file mode 100644
index 0000000..bcaf07d
--- /dev/null
+++ b/src/main/java/net/onrc/onos/core/util/distributed/sharedlog/hazelcast/LogMapManager.java
@@ -0,0 +1,166 @@
+package net.onrc.onos.core.util.distributed.sharedlog.hazelcast;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import javax.annotation.concurrent.ThreadSafe;
+
+import org.apache.commons.lang3.concurrent.ConcurrentUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import net.onrc.onos.core.util.distributed.sharedlog.LogEventListener;
+import net.onrc.onos.core.util.distributed.sharedlog.SeqNum;
+import net.onrc.onos.core.util.distributed.sharedlog.SharedLogObjectID;
+import net.onrc.onos.core.util.distributed.sharedlog.internal.LogValue;
+
+import com.google.common.annotations.Beta;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.hazelcast.core.EntryEvent;
+import com.hazelcast.core.EntryListener;
+import com.hazelcast.core.IMap;
+
+/**
+ * LogMapManager.
+ *
+ * - Listener to register to Hazelcast IMap.
+ * - Caches LogValue notification.
+ * - Dispatches LogValue added event to listeners
+ * - TODO health check latest log to detect dropped notification
+ */
+@Beta
+@ThreadSafe
+public class LogMapManager implements EntryListener<SeqNum, LogValue> {
+
+    private static final Logger log = LoggerFactory
+            .getLogger(LogMapManager.class);
+
+    // TODO revisit appropriate size. current guess MAX 5k/s * 2
+    private static final int CACHE_SIZE = 10000;
+
+    private final SharedLogObjectID oid;
+    private final IMap<SeqNum, LogValue> logMap;
+
+    // TODO is there a library to do listener management?
+    private CopyOnWriteArrayList<LogEventListener> listeners;
+
+    private Cache<SeqNum, Future<LogValue>> cache;
+
+    // latest log notified to clients
+    private SeqNum lastLog;
+
+    // It might be that only oid is sufficient.
+    /**
+     * Constructor.
+     *
+     * @param oid ID of SharedLogObject this instance is attached
+     * @param logMap Log Map this SharedLogObject use
+     */
+    public LogMapManager(SharedLogObjectID oid,
+                                  IMap<SeqNum, LogValue> logMap) {
+        this.oid = checkNotNull(oid);
+        this.logMap = checkNotNull(logMap);
+        this.listeners = new CopyOnWriteArrayList<>();
+        this.cache = CacheBuilder.newBuilder()
+                .weakValues()
+                .expireAfterWrite(5, TimeUnit.MINUTES)
+                .concurrencyLevel(1)
+                .maximumSize(CACHE_SIZE)
+                .build();
+        this.lastLog = SeqNum.INITIAL;
+    }
+
+    // called from Hazelcast thread
+    @Override
+    public void entryAdded(EntryEvent<SeqNum, LogValue> event) {
+        // Cache maintenance
+        cache.put(event.getKey(),
+                  ConcurrentUtils.constantFuture(event.getValue()));
+
+        // TODO will need suppress mechanism once we have health check
+
+        if (lastLog.compareTo(event.getKey()) < 0) {
+            lastLog = event.getKey();
+        }
+        for (LogEventListener lsnr : listeners) {
+            lsnr.logAdded(event.getKey());
+        }
+    }
+
+    // called from Hazelcast thread
+    @Override
+    public void entryRemoved(EntryEvent<SeqNum, LogValue> event) {
+        // Cache maintenance
+        cache.invalidate(event.getKey());
+
+        // only add will be notified to listeners
+    }
+
+    // called from Hazelcast thread
+    @Override
+    public void entryUpdated(EntryEvent<SeqNum, LogValue> event) {
+        // Cache maintenance
+        cache.put(event.getKey(),
+                  ConcurrentUtils.constantFuture(event.getValue()));
+
+        // only add will be notified to listeners
+    }
+
+    // called from Hazelcast thread
+    @Override
+    public void entryEvicted(EntryEvent<SeqNum, LogValue> event) {
+        // Cache maintenance
+        cache.invalidate(event.getKey());
+
+        // only add will be notified to listeners
+    }
+
+    /**
+     * Adds the listener for specified shared object log event.
+     *
+     * @param listener to add
+     */
+    public void addListener(LogEventListener listener) {
+        listeners.addIfAbsent(listener);
+    }
+
+    /**
+     * Removes the listener for specified shared object log event.
+     *
+     * @param listener to add
+     */
+    public void removeListener(LogEventListener listener) {
+        listeners.remove(listener);
+    }
+
+    /**
+     * Reads LogValue from LogMap.
+     * <p>
+     * It will use the cached value if it exist, if not it will
+     * get the value from distributed store.
+     *
+     * @param key log sequence number
+     * @return Future containing log value
+     */
+    public Future<LogValue> getLogValue(final SeqNum key) {
+        try {
+            return cache.get(key, new Callable<Future<LogValue>>() {
+
+                @Override
+                public Future<LogValue> call() throws Exception {
+                    return logMap.getAsync(key);
+                }
+            });
+        } catch (ExecutionException e) {
+            log.error("Reading from Log Map failed.", e);
+            // should never happen?
+            return ConcurrentUtils.constantFuture(null);
+        }
+    }
+}
diff --git a/src/main/java/net/onrc/onos/core/util/distributed/sharedlog/hazelcast/LogValueSerializer.java b/src/main/java/net/onrc/onos/core/util/distributed/sharedlog/hazelcast/LogValueSerializer.java
new file mode 100644
index 0000000..87b4e3f
--- /dev/null
+++ b/src/main/java/net/onrc/onos/core/util/distributed/sharedlog/hazelcast/LogValueSerializer.java
@@ -0,0 +1,77 @@
+package net.onrc.onos.core.util.distributed.sharedlog.hazelcast;
+
+import java.io.IOException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import net.onrc.onos.core.util.distributed.sharedlog.ByteValue;
+import net.onrc.onos.core.util.distributed.sharedlog.internal.LogValue;
+import net.onrc.onos.core.util.distributed.sharedlog.internal.NoOp;
+import net.onrc.onos.core.util.serializers.HazelcastSerializationConstants;
+
+import com.google.common.annotations.Beta;
+import com.hazelcast.nio.ObjectDataInput;
+import com.hazelcast.nio.ObjectDataOutput;
+import com.hazelcast.nio.serialization.StreamSerializer;
+
+/**
+ * Serializer for LogValue.
+ */
+@Beta
+public class LogValueSerializer implements StreamSerializer<LogValue> {
+
+    private static final Logger log = LoggerFactory
+            .getLogger(LogValueSerializer.class);
+
+    //
+    // | LogValue type (1 byte) | (type specific) |
+    //
+
+    // 1st byte identifying LogValue type
+    private static final int NO_OP = 0;
+    private static final int BYTE_VALUE = 1;
+
+    @Override
+    public int getTypeId() {
+        return HazelcastSerializationConstants.LOG_VALUE_TYPE_ID;
+    }
+
+    @Override
+    public void destroy() {
+    }
+
+    @Override
+    public void write(ObjectDataOutput out, LogValue object) throws IOException {
+
+        if (object instanceof ByteValue) {
+            out.writeByte(BYTE_VALUE);
+            ByteValue bytes = (ByteValue) object;
+            out.writeObject(bytes.getBytes());
+
+        } else if (object instanceof NoOp) {
+            out.writeByte(NO_OP);
+
+        } else {
+            log.error("Unexpected type encountered: {}", object);
+        }
+    }
+
+    @Override
+    public LogValue read(ObjectDataInput in) throws IOException {
+
+        final int typeId = in.readByte();
+        switch(typeId) {
+        case BYTE_VALUE:
+            return new ByteValue(in.<byte[]>readObject());
+
+        case NO_OP:
+            return NoOp.VALUE;
+
+        default:
+        }
+
+        log.error("Unexpected type encountered: {}", typeId);
+        return null;
+    }
+}
diff --git a/src/main/java/net/onrc/onos/core/util/distributed/sharedlog/internal/LogValue.java b/src/main/java/net/onrc/onos/core/util/distributed/sharedlog/internal/LogValue.java
new file mode 100644
index 0000000..726f37b
--- /dev/null
+++ b/src/main/java/net/onrc/onos/core/util/distributed/sharedlog/internal/LogValue.java
@@ -0,0 +1,26 @@
+package net.onrc.onos.core.util.distributed.sharedlog.internal;
+
+import javax.annotation.concurrent.Immutable;
+
+import com.google.common.annotations.Beta;
+
+
+// LogMap value transition
+//
+//  (null) -> ByteValue   : log successfully written
+//  (null) -> NoOp   : allocated SeqNum abandoned/timed out
+//  NoOp -> Recycled : became part of SnapShot
+
+// When log reader encounter null
+//  (case 1) wait for writer, if time out invalidate log with NoOp
+//  (case 2) too far behind, reader need to reset to snap shot
+// XXX How to distinguish above is TBD. For now assuming case 2 will never happen
+
+/**
+ * Value stored in LogMap.
+ */
+@Beta
+@Immutable
+public interface LogValue {
+
+}
diff --git a/src/main/java/net/onrc/onos/core/util/distributed/sharedlog/internal/NoOp.java b/src/main/java/net/onrc/onos/core/util/distributed/sharedlog/internal/NoOp.java
new file mode 100644
index 0000000..db1666c
--- /dev/null
+++ b/src/main/java/net/onrc/onos/core/util/distributed/sharedlog/internal/NoOp.java
@@ -0,0 +1,41 @@
+package net.onrc.onos.core.util.distributed.sharedlog.internal;
+
+import com.google.common.annotations.Beta;
+
+
+// TODO register to Kryo?
+// TODO Not sure if reusing for SnapShotValue is good idea
+/**
+ * Value representing this log entry was abandoned.
+ */
+@Beta
+public final class NoOp implements LogValue, SnapShotValue {
+
+    /**
+     * The NoOp.
+     */
+    public static final NoOp VALUE = new NoOp();
+
+    /**
+     * Constructor.
+     */
+    protected NoOp() {}
+
+    @Override
+    public int hashCode() {
+        return 0;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (o instanceof NoOp) {
+            return true;
+        }
+        return false;
+    }
+
+    @Override
+    public String toString() {
+        return this.getClass().getSimpleName();
+    }
+}
diff --git a/src/main/java/net/onrc/onos/core/util/distributed/sharedlog/internal/SnapShotValue.java b/src/main/java/net/onrc/onos/core/util/distributed/sharedlog/internal/SnapShotValue.java
new file mode 100644
index 0000000..5553c64
--- /dev/null
+++ b/src/main/java/net/onrc/onos/core/util/distributed/sharedlog/internal/SnapShotValue.java
@@ -0,0 +1,14 @@
+package net.onrc.onos.core.util.distributed.sharedlog.internal;
+
+import javax.annotation.concurrent.Immutable;
+
+import com.google.common.annotations.Beta;
+
+/**
+ * Value stored in SnapShot Map.
+ */
+@Beta
+@Immutable
+public interface SnapShotValue {
+
+}
diff --git a/src/main/java/net/onrc/onos/core/util/distributed/sharedlog/internal/StringID.java b/src/main/java/net/onrc/onos/core/util/distributed/sharedlog/internal/StringID.java
new file mode 100644
index 0000000..07588a8
--- /dev/null
+++ b/src/main/java/net/onrc/onos/core/util/distributed/sharedlog/internal/StringID.java
@@ -0,0 +1,62 @@
+package net.onrc.onos.core.util.distributed.sharedlog.internal;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.common.annotations.Beta;
+import com.google.common.base.Objects;
+
+import net.onrc.onos.core.util.distributed.sharedlog.SharedLogObjectID;
+
+/**
+ * Simple String implementation of SharedLogObjectID.
+ */
+@Beta
+public class StringID implements SharedLogObjectID {
+
+    private final String id;
+
+    /**
+     * Constructor.
+     *
+     * @param id String
+     */
+    public StringID(final String id) {
+        this.id = checkNotNull(id);
+        checkArgument(!id.isEmpty(), "id cannot be empty String");
+    }
+
+
+    @Override
+    public String getObjectName() {
+        return id;
+    }
+
+
+    @Override
+    public int hashCode() {
+        return Objects.hashCode(id);
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj) {
+            return true;
+        }
+        if (obj == null) {
+            return false;
+        }
+        if (getClass() != obj.getClass()) {
+            return false;
+        }
+        StringID other = (StringID) obj;
+        return Objects.equal(id, other.id);
+    }
+
+    @Override
+    public String toString() {
+        return Objects.toStringHelper(this)
+                .add("id", id)
+                .toString();
+    }
+}
diff --git a/src/main/java/net/onrc/onos/core/util/distributed/sharedlog/runtime/LogBasedRuntime.java b/src/main/java/net/onrc/onos/core/util/distributed/sharedlog/runtime/LogBasedRuntime.java
new file mode 100644
index 0000000..0cd121e
--- /dev/null
+++ b/src/main/java/net/onrc/onos/core/util/distributed/sharedlog/runtime/LogBasedRuntime.java
@@ -0,0 +1,127 @@
+package net.onrc.onos.core.util.distributed.sharedlog.runtime;
+
+import java.util.List;
+
+import javax.annotation.concurrent.ThreadSafe;
+
+import com.google.common.annotations.Beta;
+
+import net.onrc.onos.core.util.distributed.sharedlog.ByteValue;
+import net.onrc.onos.core.util.distributed.sharedlog.LogEventListener;
+import net.onrc.onos.core.util.distributed.sharedlog.SeqNum;
+import net.onrc.onos.core.util.distributed.sharedlog.SharedLogObject;
+import net.onrc.onos.core.util.distributed.sharedlog.SharedLogObjectID;
+import net.onrc.onos.core.util.distributed.sharedlog.exception.LogNotApplicable;
+import net.onrc.onos.core.util.distributed.sharedlog.exception.LogWriteTimedOut;
+import net.onrc.onos.core.util.distributed.sharedlog.internal.LogValue;
+
+
+/**
+ * Runtime to read/write shared object using shared log.
+ */
+@Beta
+@ThreadSafe
+public interface LogBasedRuntime {
+
+    // FIXME better method name
+    /**
+     * Assign or get already assigned SharedLogObject ID.
+     *
+     * @param sharedObjectName The name of the shared log object
+     * @return SharedLogObjectID
+     */
+    public SharedLogObjectID getOid(final String sharedObjectName);
+
+    /**
+     * Apply given logValue to SharedLogObject.
+     *
+     * @param sobj SharedLogObject to manipulate.
+     * @param logValue LogValue to apply
+     * @param queryBeforeUpdate true if log should be replayed to latest before updating
+     * @return sequence number of the new log entry
+     *
+     * @throws LogNotApplicable Thrown when {@code logValue} was not applicable to
+     *          SharedLogObject.
+     * @throws LogWriteTimedOut Thrown when write failed due to time out.
+     */
+    public SeqNum updateHelper(final SharedLogObject sobj, final ByteValue logValue,
+                             final boolean queryBeforeUpdate)
+                                throws LogNotApplicable, LogWriteTimedOut;
+
+    /**
+     * Reads from the shared log and
+     * updates the {@link SharedLogObject} to latest state,
+     * by calling {@link SharedLogObject#apply(SeqNum, LogValue)}.
+     *
+     * @param sobj SharedLogObject to update.
+     * @throws LogNotApplicable
+     */
+    public void queryHelper(final SharedLogObject sobj);
+
+
+    /**
+     * Reads from the shared log and
+     * updates the {@link SharedLogObject} to replayTo,
+     * by calling {@link SharedLogObject#apply(SeqNum, LogValue)}.
+     *
+     * @param sobj SharedLogObject to update.
+     * @param replayTo Sequence number to replay to. (must be <= latest)
+     */
+    public void queryHelper(final SharedLogObject sobj, final SeqNum replayTo);
+
+    // TODO we may need to provide a way to do manual update (getSeqNum, update)
+
+
+    // Snapshot related interfaces
+
+    // TODO should there be a public interface to create snapshot?
+    // If we expose to public, how to determine "latest" may have issue
+
+    /**
+     * Gets the latest snapshot ID.
+     *
+     * @param oid {@link SharedLogObjectID}
+     * @return latest snapshot ID
+     */
+    public SeqNum getLatestSnapShotId(final SharedLogObjectID oid);
+
+    // TODO give me better name for LogBaseRuntime#resetToSnapShot
+    /**
+     * Updates the {@link SharedLogObject} to specified snapshot.
+     *
+     * @param sobj {@link SharedLogObject} to update
+     * @param ssId snap shot ID
+     * @return true if success, false otherwise.
+     */
+    public boolean resetToSnapShot(final SharedLogObject sobj, final SeqNum ssId);
+
+
+    // TODO should LogRuntime be responsible for polling, to detect dropped notification.
+    /**
+     * Adds the listener for specified shared object log event.
+     *
+     * @param oid {@link SharedLogObjectID}
+     * @param listener to add
+     */
+    public void addListener(SharedLogObjectID oid, LogEventListener listener);
+
+    /**
+     * Removes the listener for specified shared object log event.
+     *
+     * @param oid {@link SharedLogObjectID}
+     * @param listener to add
+     */
+    public void removeListener(SharedLogObjectID oid, LogEventListener listener);
+
+    // TODO might not need this any more
+    /**
+     * Gets the LogValues in range ({@code after}, {@code upToThis}].
+     *
+     * @param oid ID of SharedLogObject
+     * @param after sequence number before (typically current sequence number)
+     * @param upToThis last sequence number you want to retrieve
+     * @return List of {@link LogValue}s in specified range
+     */
+    public List<LogValue> getLogRange(final SharedLogObjectID oid,
+                                    final SeqNum after, final SeqNum upToThis);
+}
diff --git a/src/main/java/net/onrc/onos/core/util/distributed/sharedlog/runtime/SequencerRuntime.java b/src/main/java/net/onrc/onos/core/util/distributed/sharedlog/runtime/SequencerRuntime.java
new file mode 100644
index 0000000..bf432c6
--- /dev/null
+++ b/src/main/java/net/onrc/onos/core/util/distributed/sharedlog/runtime/SequencerRuntime.java
@@ -0,0 +1,24 @@
+package net.onrc.onos.core.util.distributed.sharedlog.runtime;
+
+import javax.annotation.concurrent.ThreadSafe;
+
+import com.google.common.annotations.Beta;
+
+import net.onrc.onos.core.util.distributed.sharedlog.Sequencer;
+import net.onrc.onos.core.util.distributed.sharedlog.SharedLogObjectID;
+
+/**
+ * Runtime to get Sequencer implementation for given SharedLogObjectID.
+ */
+@Beta
+@ThreadSafe
+public interface SequencerRuntime {
+
+    /**
+     * Gets the Sequencer for specified SharedLogObjectID.
+     *
+     * @param id SharedLogObjectID
+     * @return Sequencer for specified id
+     */
+    public Sequencer getSequencer(SharedLogObjectID id);
+}
diff --git a/src/main/java/net/onrc/onos/core/util/serializers/HazelcastSerializationConstants.java b/src/main/java/net/onrc/onos/core/util/serializers/HazelcastSerializationConstants.java
new file mode 100644
index 0000000..1c8f876
--- /dev/null
+++ b/src/main/java/net/onrc/onos/core/util/serializers/HazelcastSerializationConstants.java
@@ -0,0 +1,33 @@
+package net.onrc.onos.core.util.serializers;
+
+/**
+ * Hazelcast serializer related constants.
+ */
+public final class HazelcastSerializationConstants {
+
+    // Type ID
+
+    /**
+     * Type ID for {@link net.onrc.onos.core.datastore.hazelcast.HZTable.VersionedValue}.
+     */
+    public static final int VERSIONED_VALUE_TYPE_ID = 1;
+
+    /**
+     * Type ID for {@link net.onrc.onos.core.util.distributed.sharedlog.internal.LogValue}.
+     */
+    public static final int LOG_VALUE_TYPE_ID = 2;
+
+
+    // Factory ID
+
+    // WARNING: hard coded value exist in hazelcast.xml
+    /**
+     * Factory ID for {@link net.onrc.onos.core.datastore.hazelcast.VersionedValueSerializableFactory}.
+     */
+    public static final int VERSIONED_VALUE_SERIALIZABLE_FACTORY_ID = 1;
+
+    /**
+     * Avoid instantiation.
+     */
+    private HazelcastSerializationConstants() {}
+}