Renamed datagrid and datastore packages

net.onrc.onos.datagrid.* => net.onrc.onos.core.datagrid.*
net.onrc.onos.datastore.* => net.onrc.onos.core.datastore.*

Change-Id: Ibe1894a6fabae08ea7cfcbf6595f0c91b05ef497
diff --git a/src/main/java/net/onrc/onos/core/datastore/hazelcast/HZClient.java b/src/main/java/net/onrc/onos/core/datastore/hazelcast/HZClient.java
new file mode 100644
index 0000000..b21ef8e
--- /dev/null
+++ b/src/main/java/net/onrc/onos/core/datastore/hazelcast/HZClient.java
@@ -0,0 +1,345 @@
+package net.onrc.onos.core.datastore.hazelcast;
+
+import java.io.FileNotFoundException;
+import java.util.Collection;
+import java.util.List;
+
+import net.onrc.onos.core.datastore.IKVClient;
+import net.onrc.onos.core.datastore.IKVTable;
+import net.onrc.onos.core.datastore.IKVTableID;
+import net.onrc.onos.core.datastore.IMultiEntryOperation;
+import net.onrc.onos.core.datastore.ObjectDoesntExistException;
+import net.onrc.onos.core.datastore.ObjectExistsException;
+import net.onrc.onos.core.datastore.WrongVersionException;
+import net.onrc.onos.core.datastore.IKVTable.IKVEntry;
+import net.onrc.onos.core.datastore.IMultiEntryOperation.OPERATION;
+import net.onrc.onos.core.datastore.IMultiEntryOperation.STATUS;
+import net.onrc.onos.core.datastore.hazelcast.HZTable.VersionedValue;
+import net.onrc.onos.core.datastore.internal.IModifiableMultiEntryOperation;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.hazelcast.client.HazelcastClient;
+import com.hazelcast.client.config.ClientConfig;
+import com.hazelcast.config.Config;
+import com.hazelcast.config.FileSystemXmlConfig;
+import com.hazelcast.config.MapConfig;
+import com.hazelcast.config.SerializationConfig;
+import com.hazelcast.core.Hazelcast;
+import com.hazelcast.core.HazelcastInstance;
+import com.hazelcast.core.IMap;
+
+public class HZClient implements IKVClient {
+    private static final Logger log = LoggerFactory.getLogger(HZClient.class);
+
+    static final long VERSION_NONEXISTENT = 0L;
+
+    private static final String MAP_PREFIX = "datastore://";
+
+    // make this path configurable
+    private static final String BASE_CONFIG_FILENAME = System.getProperty("net.onrc.onos.core.datastore.hazelcast.baseConfig", "conf/hazelcast.xml");
+    private static boolean useClientMode = Boolean.parseBoolean(System.getProperty("net.onrc.onos.core.datastore.hazelcast.clientMode", "true"));
+
+    // Note: xml configuration will overwrite this value if present
+    private static int backupCount = Integer.valueOf(System.getProperty("net.onrc.onos.core.datastore.hazelcast.backupCount", "3"));
+
+    private final HazelcastInstance hazelcastInstance;
+
+    private static final HZClient THE_INSTANCE = new HZClient();
+
+    public static HZClient getClient() {
+        return THE_INSTANCE;
+    }
+
+    private HZClient() {
+        hazelcastInstance = getHZinstance(BASE_CONFIG_FILENAME);
+    }
+
+    private static HazelcastInstance getHZinstance(final String hazelcastConfigFileName) {
+        Config baseHzConfig = null;
+        try {
+            baseHzConfig = new FileSystemXmlConfig(hazelcastConfigFileName);
+        } catch (FileNotFoundException e) {
+            log.error("Error opening Hazelcast XML configuration. File not found: " + hazelcastConfigFileName, e);
+            throw new Error("Cannot find Hazelcast configuration: " + hazelcastConfigFileName , e);
+        }
+
+        // use xml config if present, if not use System.property
+        MapConfig mapConfig = baseHzConfig.getMapConfigs().get(MAP_PREFIX + "*");
+        if (mapConfig != null) {
+            backupCount = mapConfig.getBackupCount();
+        }
+
+        HazelcastInstance instance = null;
+        if (useClientMode) {
+            log.info("Configuring Hazelcast datastore as Client mode");
+            ClientConfig clientConfig = new ClientConfig();
+            final int port = baseHzConfig.getNetworkConfig().getPort();
+
+            String server = System.getProperty("net.onrc.onos.core.datastore.hazelcast.client.server", "localhost");
+            clientConfig.addAddress(server + ":" + port);
+
+            // copy group config from base Hazelcast configuration
+            clientConfig.getGroupConfig().setName(baseHzConfig.getGroupConfig().getName());
+            clientConfig.getGroupConfig().setPassword(baseHzConfig.getGroupConfig().getPassword());
+
+            // TODO We probably need to figure out what else need to be
+            // derived from baseConfig
+
+            registerSerializer(clientConfig.getSerializationConfig());
+
+            log.info("Starting Hazelcast datastore client for [{}]", clientConfig.getAddressList());
+
+            try {
+                instance = HazelcastClient.newHazelcastClient(clientConfig);
+                if (!instance.getCluster().getMembers().isEmpty()) {
+                    log.debug("Members in cluster: " + instance.getCluster().getMembers());
+                    return instance;
+                }
+                log.info("Failed to find cluster member, falling back to Instance mode");
+            } catch (IllegalStateException e) {
+                log.info("Failed to initialize HazelcastClient, falling back to Instance mode");
+            }
+            useClientMode = false;
+            instance = null;
+        }
+        log.info("Configuring Hazelcast datastore as Instance mode");
+
+        // To run 2 Hazelcast instance in 1 JVM,
+        // we probably need to something like below
+        //int port = hazelcastConfig.getNetworkConfig().getPort();
+        //hazelcastConfig.getNetworkConfig().setPort(port+1);
+
+        registerSerializer(baseHzConfig.getSerializationConfig());
+
+        return Hazelcast.newHazelcastInstance(baseHzConfig);
+    }
+
+    /**
+     * Register serializer for VersionedValue class used to imitate value version.
+     * @param config
+     */
+    private static void registerSerializer(final SerializationConfig config) {
+        config.addDataSerializableFactoryClass(
+                VersionedValueSerializableFactory.FACTORY_ID,
+                VersionedValueSerializableFactory.class);
+    }
+
+    @Override
+    public IKVTable getTable(final String tableName) {
+        IMap<byte[], VersionedValue> map = hazelcastInstance.getMap(MAP_PREFIX + tableName);
+
+        if (!useClientMode) {
+            // config only available in Instance Mode
+            // Client Mode must rely on hazelcast.xml to be properly configured.
+            MapConfig config = hazelcastInstance.getConfig().getMapConfig(MAP_PREFIX + tableName);
+            // config for this map to be strong consistent
+            if (config.isReadBackupData()) {
+                config.setReadBackupData(false);
+            }
+            if (config.isNearCacheEnabled()) {
+                config.getNearCacheConfig().setMaxSize(0);
+            }
+
+            if (config.getBackupCount() != backupCount) {
+                config.setAsyncBackupCount(0);
+                config.setBackupCount(backupCount);
+            }
+        }
+
+        return new HZTable(tableName, map);
+    }
+
+    @Override
+    public void dropTable(final IKVTable table) {
+        ((HZTable) table).getBackendMap().clear();
+    }
+
+    @Override
+    public long create(final IKVTableID tableId, final byte[] key, final byte[] value)
+            throws ObjectExistsException {
+        IKVTable table = (IKVTable) tableId;
+        return table.create(key, value);
+    }
+
+    @Override
+    public long forceCreate(final IKVTableID tableId, final byte[] key, final byte[] value) {
+        IKVTable table = (IKVTable) tableId;
+        return table.forceCreate(key, value);
+    }
+
+    @Override
+    public IKVEntry read(final IKVTableID tableId, final byte[] key)
+            throws ObjectDoesntExistException {
+        IKVTable table = (IKVTable) tableId;
+        return table.read(key);
+    }
+
+    @Override
+    public long update(final IKVTableID tableId, final byte[] key, final byte[] value,
+            final long version) throws ObjectDoesntExistException,
+            WrongVersionException {
+        IKVTable table = (IKVTable) tableId;
+        return table.update(key, value, version);
+    }
+
+    @Override
+    public long update(final IKVTableID tableId, final byte[] key, final byte[] value)
+            throws ObjectDoesntExistException {
+        IKVTable table = (IKVTable) tableId;
+        return table.update(key, value);
+    }
+
+    @Override
+    public long delete(final IKVTableID tableId, final byte[] key, final long version)
+            throws ObjectDoesntExistException, WrongVersionException {
+        IKVTable table = (IKVTable) tableId;
+        return table.delete(key, version);
+    }
+
+    @Override
+    public long forceDelete(final IKVTableID tableId, final byte[] key) {
+        IKVTable table = (IKVTable) tableId;
+        return table.forceDelete(key);
+    }
+
+    @Override
+    public Iterable<IKVEntry> getAllEntries(final IKVTableID tableId) {
+        IKVTable table = (IKVTable) tableId;
+        return table.getAllEntries();
+    }
+
+    @Override
+    public IMultiEntryOperation createOp(final IKVTableID tableId, final byte[] key,
+            final byte[] value) {
+        return new HZMultiEntryOperation((HZTable) tableId, key, value, HZClient.VERSION_NONEXISTENT, OPERATION.CREATE);
+    }
+
+    @Override
+    public IMultiEntryOperation forceCreateOp(final IKVTableID tableId, final byte[] key,
+            final byte[] value) {
+        return new HZMultiEntryOperation((HZTable) tableId, key, value, HZClient.VERSION_NONEXISTENT, OPERATION.FORCE_CREATE);
+    }
+
+    @Override
+    public IMultiEntryOperation readOp(final IKVTableID tableId, final byte[] key) {
+        return new HZMultiEntryOperation((HZTable) tableId, key, OPERATION.READ);
+    }
+
+    @Override
+    public IMultiEntryOperation updateOp(final IKVTableID tableId, final byte[] key,
+            final byte[] value, final long version) {
+        return new HZMultiEntryOperation((HZTable) tableId, key, value, version, OPERATION.UPDATE);
+    }
+
+    @Override
+    public IMultiEntryOperation deleteOp(final IKVTableID tableId, final byte[] key,
+            final byte[] value, final long version) {
+        return new HZMultiEntryOperation((HZTable) tableId, key, value, version, OPERATION.DELETE);
+    }
+
+    @Override
+    public IMultiEntryOperation forceDeleteOp(final IKVTableID tableId, final byte[] key) {
+        return new HZMultiEntryOperation((HZTable) tableId, key, OPERATION.FORCE_DELETE);
+    }
+
+    @Override
+    public boolean multiDelete(final Collection<IMultiEntryOperation> ops) {
+        boolean failExists = false;
+        for (IMultiEntryOperation op : ops) {
+            HZMultiEntryOperation mop = (HZMultiEntryOperation) op;
+            switch (mop.getOperation()) {
+            case DELETE:
+                try {
+                    final long version = delete(mop.getTableId(), mop.getKey(), mop.getVersion());
+                    mop.setVersion(version);
+                    mop.setStatus(STATUS.SUCCESS);
+                } catch (ObjectDoesntExistException | WrongVersionException e) {
+                    log.error(mop + " failed.", e);
+                    mop.setStatus(STATUS.FAILED);
+                    failExists = true;
+                }
+                break;
+            case FORCE_DELETE:
+                final long version = forceDelete(mop.getTableId(), mop.getKey());
+                mop.setVersion(version);
+                mop.setStatus(STATUS.SUCCESS);
+                break;
+            default:
+                throw new UnsupportedOperationException(mop.toString());
+            }
+        }
+        return failExists;
+    }
+
+    @Override
+    public boolean multiWrite(final List<IMultiEntryOperation> ops) {
+        // there may be room to batch to improve performance
+        boolean failExists = false;
+        for (IMultiEntryOperation op : ops) {
+            IModifiableMultiEntryOperation mop = (IModifiableMultiEntryOperation) op;
+            switch (mop.getOperation()) {
+            case CREATE:
+                try {
+                    long version = create(mop.getTableId(), mop.getKey(), mop.getValue());
+                    mop.setVersion(version);
+                    mop.setStatus(STATUS.SUCCESS);
+                } catch (ObjectExistsException e) {
+                    log.error(mop + " failed.", e);
+                    mop.setStatus(STATUS.FAILED);
+                    failExists = true;
+                }
+                break;
+            case FORCE_CREATE:
+            {
+                final long version = forceCreate(mop.getTableId(), mop.getKey(), mop.getValue());
+                mop.setVersion(version);
+                mop.setStatus(STATUS.SUCCESS);
+                break;
+            }
+            case UPDATE:
+                try {
+                    long version = update(mop.getTableId(), mop.getKey(), mop.getValue(), mop.getVersion());
+                    mop.setVersion(version);
+                    mop.setStatus(STATUS.SUCCESS);
+                } catch (ObjectDoesntExistException | WrongVersionException e) {
+                    log.error(mop + " failed.", e);
+                    mop.setStatus(STATUS.FAILED);
+                    failExists = true;
+                }
+                break;
+            default:
+                throw new UnsupportedOperationException(mop.toString());
+            }
+        }
+        return failExists;
+    }
+
+    @Override
+    public boolean multiRead(final Collection<IMultiEntryOperation> ops) {
+        boolean failExists = false;
+        for (IMultiEntryOperation op : ops) {
+            IModifiableMultiEntryOperation mop = (IModifiableMultiEntryOperation) op;
+            HZTable table = (HZTable) op.getTableId();
+            ((HZMultiEntryOperation) mop.getActualOperation()).setFuture(table.getBackendMap().getAsync(op.getKey()));
+        }
+        for (IMultiEntryOperation op : ops) {
+            IModifiableMultiEntryOperation mop = (IModifiableMultiEntryOperation) op;
+            if (mop.hasSucceeded()) {
+                // status update is already done, nothing to do.
+            } else {
+                failExists = true;
+            }
+        }
+
+        return failExists;
+    }
+
+    @Override
+    public long VERSION_NONEXISTENT() {
+        return VERSION_NONEXISTENT;
+    }
+
+
+}
diff --git a/src/main/java/net/onrc/onos/core/datastore/hazelcast/HZMultiEntryOperation.java b/src/main/java/net/onrc/onos/core/datastore/hazelcast/HZMultiEntryOperation.java
new file mode 100644
index 0000000..a4842ef
--- /dev/null
+++ b/src/main/java/net/onrc/onos/core/datastore/hazelcast/HZMultiEntryOperation.java
@@ -0,0 +1,162 @@
+package net.onrc.onos.core.datastore.hazelcast;
+
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import net.onrc.onos.core.datastore.IKVTableID;
+import net.onrc.onos.core.datastore.IMultiEntryOperation;
+import net.onrc.onos.core.datastore.hazelcast.HZTable.VersionedValue;
+import net.onrc.onos.core.datastore.internal.IModifiableMultiEntryOperation;
+import net.onrc.onos.core.datastore.utils.ByteArrayUtil;
+
+public class HZMultiEntryOperation implements IMultiEntryOperation, IModifiableMultiEntryOperation {
+    private static final Logger log = LoggerFactory.getLogger(HZMultiEntryOperation.class);
+
+    private final HZTable table;
+    private final byte[] key;
+    protected final OPERATION operation;
+    private STATUS status;
+
+    // for read op
+    private Future<VersionedValue> future;
+    // for write op
+    private VersionedValue writeValue;
+
+    /**
+     * Constructor for Read/ForceDelete Operation.
+     * @param table
+     * @param key
+     * @param operation
+     */
+    public HZMultiEntryOperation(final HZTable table, final byte[] key, final OPERATION operation) {
+        this.table = table;
+        this.key = key;
+        this.status = STATUS.NOT_EXECUTED;
+        this.operation = operation;
+
+        this.future = null;
+        this.writeValue = null;
+    }
+
+    /**
+     * Constructor for Other Operations.
+     * @param table
+     * @param key
+     * @param value
+     * @param version
+     * @param operation
+     */
+    public HZMultiEntryOperation(final HZTable table, final byte[] key, final byte[] value, final long version, final OPERATION operation) {
+        this.table = table;
+        this.key = key;
+        this.status = STATUS.NOT_EXECUTED;
+        this.operation = operation;
+
+        this.future = null;
+        this.writeValue = new VersionedValue(value, version);
+    }
+
+    @Override
+    public boolean hasSucceeded() {
+
+        VersionedValue value = get();
+        return (value != null) && (this.status == STATUS.SUCCESS);
+    }
+
+    @Override
+    public STATUS getStatus() {
+        get();
+        return status;
+    }
+
+    @Override
+    public IKVTableID getTableId() {
+        return table;
+    }
+
+    @Override
+    public byte[] getKey() {
+        return key;
+    }
+
+    @Override
+    public byte[] getValue() {
+        if (future != null) {
+            VersionedValue value = get();
+            return value.getValue();
+        }
+        return writeValue.getValue();
+    }
+
+    @Override
+    public long getVersion() {
+        if (future != null) {
+            VersionedValue value = get();
+            return value.getVersion();
+        }
+        return writeValue.getVersion();
+    }
+
+    @Override
+    public OPERATION getOperation() {
+        return operation;
+    }
+
+    /**
+     * Evaluate Future object and set Status and Value+Version.
+     * @return the value read or null on failure.
+     */
+    private VersionedValue get() {
+        try {
+            VersionedValue value = future.get();
+            setValue(value.getValue(), value.getVersion());
+            setStatus(STATUS.SUCCESS);
+            return value;
+        } catch (CancellationException | InterruptedException | ExecutionException e) {
+            log.error(this + " has failed.", e);
+            setStatus(STATUS.FAILED);
+            return null;
+        }
+    }
+
+    @Override
+    public void setValue(final byte[] value, final long version) {
+        writeValue = new VersionedValue(value, version);
+        setVersion(version);
+    }
+
+    @Override
+    public void setVersion(final long version) {
+        if (future != null) {
+            // no-op on read
+        }
+        if (writeValue == null) {
+            writeValue = new VersionedValue(null, version);
+        }
+    }
+
+    @Override
+    public void setStatus(final STATUS status) {
+        this.status = status;
+    }
+
+    @Override
+    public IModifiableMultiEntryOperation getActualOperation() {
+        return this;
+    }
+
+    void setFuture(final Future<VersionedValue> future) {
+        this.future = future;
+    }
+
+    @Override
+    public String toString() {
+        return "[HZMultiEntryOperation table=" + table + ", key="
+                + ByteArrayUtil.toHexStringBuffer(key, ":") + ", operation=" + operation
+                + ", status=" + status + ", writeValue=" + writeValue + "]";
+    }
+}
diff --git a/src/main/java/net/onrc/onos/core/datastore/hazelcast/HZTable.java b/src/main/java/net/onrc/onos/core/datastore/hazelcast/HZTable.java
new file mode 100644
index 0000000..f071ecc
--- /dev/null
+++ b/src/main/java/net/onrc/onos/core/datastore/hazelcast/HZTable.java
@@ -0,0 +1,325 @@
+package net.onrc.onos.core.datastore.hazelcast;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+
+import net.onrc.onos.core.datastore.IKVTable;
+import net.onrc.onos.core.datastore.IKVTableID;
+import net.onrc.onos.core.datastore.ObjectDoesntExistException;
+import net.onrc.onos.core.datastore.ObjectExistsException;
+import net.onrc.onos.core.datastore.WrongVersionException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.hazelcast.core.IMap;
+import com.hazelcast.nio.ObjectDataInput;
+import com.hazelcast.nio.ObjectDataOutput;
+import com.hazelcast.nio.serialization.IdentifiedDataSerializable;
+
+public class HZTable implements IKVTable, IKVTableID {
+    @SuppressWarnings("unused")
+    private static final Logger log = LoggerFactory.getLogger(HZTable.class);
+
+    // not sure how strict this should be managed
+    private static final AtomicLong  initialVersion = new AtomicLong(HZClient.VERSION_NONEXISTENT);
+
+    /**
+     * generate a new initial version for an entry.
+     * @return initial value
+     */
+    protected static long getInitialVersion() {
+        long version = initialVersion.incrementAndGet();
+        if (version == HZClient.VERSION_NONEXISTENT) {
+            // used up whole 64bit space?
+            version = initialVersion.incrementAndGet();
+        }
+        return version;
+    }
+
+    /**
+     * increment version, avoiding VERSION_NONEXISTENT.
+     * @param version
+     * @return
+     */
+    protected static long getNextVersion(final long version) {
+        long nextVersion = version + 1;
+        if (nextVersion == HZClient.VERSION_NONEXISTENT) {
+            ++nextVersion;
+        }
+        return nextVersion;
+    }
+
+    static class VersionedValue implements IdentifiedDataSerializable {
+        private static final long serialVersionUID = -3149375966890712708L;
+
+        private byte[] value;
+        private long version;
+
+        protected VersionedValue() {
+            value = new byte[0];
+            version = HZClient.VERSION_NONEXISTENT;
+        }
+
+        public VersionedValue(final byte[] value, final long version) {
+            this.value = value;
+            this.version = version;
+        }
+
+        public byte[] getValue() {
+            return value;
+        }
+
+        public long getVersion() {
+            return version;
+        }
+
+        public void setValue(final byte[] value) {
+            this.value = value;
+        }
+
+        public void setNextVersion() {
+            this.version = getNextVersion(this.version);
+        }
+
+        @Override
+        public void writeData(final ObjectDataOutput out) throws IOException {
+            out.writeLong(version);
+            out.writeInt(value.length);
+            if (value.length > 0) {
+                out.write(value);
+            }
+        }
+
+        @Override
+        public void readData(final ObjectDataInput in) throws IOException {
+            version = in.readLong();
+            final int valueLen = in.readInt();
+            value = new byte[valueLen];
+            in.readFully(value);
+        }
+
+        @Override
+        public int getFactoryId() {
+            return VersionedValueSerializableFactory.FACTORY_ID;
+        }
+
+        @Override
+        public int getId() {
+            return VersionedValueSerializableFactory.VERSIONED_VALUE_ID;
+        }
+
+        @Override
+        public int hashCode() {
+            final int prime = 31;
+            int result = 1;
+            result = prime * result + (int) (version ^ (version >>> 32));
+            result = prime * result + Arrays.hashCode(value);
+            return result;
+        }
+
+        @Override
+        public boolean equals(final Object obj) {
+            if (this == obj) {
+                return true;
+            }
+            if (obj == null) {
+                return false;
+            }
+            if (getClass() != obj.getClass()) {
+                return false;
+            }
+            VersionedValue other = (VersionedValue) obj;
+            if (version != other.version) {
+                return false;
+            }
+            if (!Arrays.equals(value, other.value)) {
+                return false;
+            }
+            return true;
+        }
+    }
+
+    // TODO Refactor and extract common parts
+    public static class Entry implements IKVEntry {
+        final byte[] key;
+        byte[] value;
+        long version;
+
+        public Entry(final byte[] key, final byte[] value, final long version) {
+            this.key = key;
+            this.setValue(value);
+            this.setVersion(version);
+        }
+
+        public Entry(final byte[] key) {
+            this(key, null, HZClient.VERSION_NONEXISTENT);
+        }
+
+        @Override
+        public byte[] getKey() {
+            return key;
+        }
+
+        @Override
+        public byte[] getValue() {
+            return value;
+        }
+
+        @Override
+        public long getVersion() {
+            return version;
+        }
+
+        void setValue(final byte[] value) {
+            this.value = value;
+        }
+
+        void setVersion(final long version) {
+            this.version = version;
+        }
+    }
+
+
+
+    private final String mapName;
+    private final IMap<byte[], VersionedValue> map;
+
+    public HZTable(final String mapName, final IMap<byte[], VersionedValue> map) {
+        this.mapName = mapName;
+        this.map = map;
+    }
+
+    @Override
+    public String getTableName() {
+        return mapName;
+    }
+
+    @Override
+    public IKVTableID getTableId() {
+        return this;
+    }
+
+    @Override
+    public long create(final byte[] key, final byte[] value) throws ObjectExistsException {
+        final long version = getInitialVersion();
+        VersionedValue existing = map.putIfAbsent(key, new VersionedValue(value, version));
+        if (existing != null) {
+            throw new ObjectExistsException(this, key);
+        }
+        return version;
+    }
+
+    @Override
+    public long forceCreate(final byte[] key, final byte[] value) {
+        final long version = getInitialVersion();
+        map.set(key, new VersionedValue(value, version));
+        return version;
+    }
+
+    @Override
+    public IKVEntry read(final byte[] key) throws ObjectDoesntExistException {
+        final VersionedValue value = map.get(key);
+        if (value == null) {
+            throw new ObjectDoesntExistException(this, key);
+        }
+        return new Entry(key, value.getValue(), value.getVersion());
+    }
+
+    @Override
+    public long update(final byte[] key, final byte[] value, final long version)
+            throws ObjectDoesntExistException, WrongVersionException {
+
+        try {
+            map.lock(key);
+            final VersionedValue oldValue = map.get(key);
+            if (oldValue == null) {
+                throw new ObjectDoesntExistException(this, key);
+            }
+            if (oldValue.getVersion() != version) {
+                throw new WrongVersionException(this, key, version, oldValue.getVersion());
+            }
+            final long nextVersion = getNextVersion(version);
+            map.set(key, new VersionedValue(value, nextVersion));
+            return nextVersion;
+        } finally {
+            map.unlock(key);
+        }
+    }
+
+    @Override
+    public long update(final byte[] key, final byte[] value)
+            throws ObjectDoesntExistException {
+
+        try {
+            map.lock(key);
+            final VersionedValue valueInMap = map.get(key);
+            if (valueInMap == null) {
+                throw new ObjectDoesntExistException(this, key);
+            }
+            valueInMap.setValue(value);
+            valueInMap.setNextVersion();
+            map.set(key, valueInMap);
+            return valueInMap.getVersion();
+        } finally {
+            map.unlock(key);
+        }
+    }
+
+    @Override
+    public long delete(final byte[] key, final long version)
+            throws ObjectDoesntExistException, WrongVersionException {
+
+        try {
+            map.lock(key);
+            final VersionedValue oldValue = map.get(key);
+            if (oldValue == null) {
+                throw new ObjectDoesntExistException(this, key);
+            }
+            if (oldValue.getVersion() != version) {
+                throw new WrongVersionException(this, key, version, oldValue.getVersion());
+            }
+            map.delete(key);
+            return oldValue.getVersion();
+        } finally {
+            map.unlock(key);
+        }
+    }
+
+    @Override
+    public long forceDelete(final byte[] key) {
+        final VersionedValue valueInMap = map.remove(key);
+        if (valueInMap == null) {
+            return HZClient.VERSION_NONEXISTENT;
+        }
+        return valueInMap.getVersion();
+    }
+
+    @Override
+    public Iterable<IKVEntry> getAllEntries() {
+        final Set<IMap.Entry<byte[], VersionedValue>> entries = map.entrySet();
+        List<IKVEntry> entryList = new ArrayList<IKVTable.IKVEntry>(entries.size());
+        for (IMap.Entry<byte[], VersionedValue> entry : entries) {
+            entryList.add(new Entry(entry.getKey(), entry.getValue().getValue(), entry.getValue().getVersion()));
+        }
+        return entryList;
+    }
+
+    @Override
+    public String toString() {
+        return "[HZTable " + mapName + "]";
+    }
+
+    IMap<byte[], VersionedValue> getBackendMap() {
+        return this.map;
+    }
+
+    @Override
+    public long VERSION_NONEXISTENT() {
+        return HZClient.VERSION_NONEXISTENT;
+    }
+}
diff --git a/src/main/java/net/onrc/onos/core/datastore/hazelcast/VersionedValueSerializableFactory.java b/src/main/java/net/onrc/onos/core/datastore/hazelcast/VersionedValueSerializableFactory.java
new file mode 100644
index 0000000..7e77be7
--- /dev/null
+++ b/src/main/java/net/onrc/onos/core/datastore/hazelcast/VersionedValueSerializableFactory.java
@@ -0,0 +1,24 @@
+package net.onrc.onos.core.datastore.hazelcast;
+
+import com.hazelcast.nio.serialization.DataSerializableFactory;
+import com.hazelcast.nio.serialization.IdentifiedDataSerializable;
+
+public class VersionedValueSerializableFactory implements
+        DataSerializableFactory {
+    // revisit these magic numbers
+    public static final int FACTORY_ID = 1;
+
+    public static final int VERSIONED_VALUE_ID = 1;
+
+    @Override
+    public IdentifiedDataSerializable create(final int typeId) {
+        switch (typeId) {
+        case VERSIONED_VALUE_ID:
+            return new HZTable.VersionedValue();
+
+        default:
+            return null;
+        }
+    }
+
+}