DistributedIntentStore: CopyCat version of Distributed intent store

- old DistributedIntentStore renamed to Hazelcast~ and is by default disabled

Change-Id: I386eaf6c136f8a2fbebb4268d20b1395249e77ea
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/intent/impl/DistributedIntentBatchQueue.java b/core/store/dist/src/main/java/org/onlab/onos/store/intent/impl/DistributedIntentBatchQueue.java
index e09a934..4c146d1 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/intent/impl/DistributedIntentBatchQueue.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/intent/impl/DistributedIntentBatchQueue.java
@@ -34,6 +34,7 @@
 import static com.google.common.base.Preconditions.checkState;
 import static org.slf4j.LoggerFactory.getLogger;
 
+// FIXME This is not distributed yet.
 @Component(immediate = true)
 @Service
 public class DistributedIntentBatchQueue implements IntentBatchService {
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/intent/impl/DistributedIntentStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/intent/impl/DistributedIntentStore.java
index ae5fb4a..a6d5b15 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/intent/impl/DistributedIntentStore.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/intent/impl/DistributedIntentStore.java
@@ -16,15 +16,12 @@
 package org.onlab.onos.store.intent.impl;
 
 import com.google.common.collect.ImmutableSet;
-import com.hazelcast.core.EntryAdapter;
-import com.hazelcast.core.EntryEvent;
-import com.hazelcast.core.EntryListener;
-import com.hazelcast.core.IMap;
-import com.hazelcast.core.Member;
 
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
 import org.apache.felix.scr.annotations.Service;
 import org.onlab.onos.net.intent.Intent;
 import org.onlab.onos.net.intent.IntentEvent;
@@ -32,10 +29,13 @@
 import org.onlab.onos.net.intent.IntentState;
 import org.onlab.onos.net.intent.IntentStore;
 import org.onlab.onos.net.intent.IntentStoreDelegate;
-import org.onlab.onos.store.hz.AbstractHazelcastStore;
-import org.onlab.onos.store.hz.SMap;
+import org.onlab.onos.store.AbstractStore;
 import org.onlab.onos.store.serializers.KryoNamespaces;
 import org.onlab.onos.store.serializers.KryoSerializer;
+import org.onlab.onos.store.serializers.StoreSerializer;
+import org.onlab.onos.store.service.DatabaseAdminService;
+import org.onlab.onos.store.service.DatabaseService;
+import org.onlab.onos.store.service.impl.CMap;
 import org.onlab.util.KryoNamespace;
 import org.slf4j.Logger;
 
@@ -52,7 +52,7 @@
 @Component(immediate = true)
 @Service
 public class DistributedIntentStore
-        extends AbstractHazelcastStore<IntentEvent, IntentStoreDelegate>
+        extends AbstractStore<IntentEvent, IntentStoreDelegate>
         implements IntentStore {
 
     /** Valid parking state, which can transition to INSTALLED. */
@@ -64,22 +64,29 @@
     private final Logger log = getLogger(getClass());
 
     // Assumption: IntentId will not have synonyms
-    private SMap<IntentId, Intent> intents;
-    private SMap<IntentId, IntentState> states;
+    private CMap<IntentId, Intent> intents;
+    private CMap<IntentId, IntentState> states;
 
+    // TODO left behind transient state issue: ONOS-103
     // Map to store instance local intermediate state transition
     private transient Map<IntentId, IntentState> transientStates = new ConcurrentHashMap<>();
 
-    private SMap<IntentId, List<Intent>> installable;
+    private CMap<IntentId, List<Intent>> installable;
 
-    @Override
+    private StoreSerializer serializer;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected DatabaseAdminService dbAdminService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected DatabaseService dbService;
+
     @Activate
     public void activate() {
         // FIXME: We need a way to add serializer for intents which has been plugged-in.
         // As a short term workaround, relax Kryo config to
         // registrationRequired=false
-        super.activate();
-        super.serializer = new KryoSerializer() {
+        serializer = new KryoSerializer() {
 
             @Override
             protected void setupKryoPool() {
@@ -89,24 +96,15 @@
                         .build()
                         .populate(1);
             }
-
         };
 
-        // TODO: enable near cache, allow read from backup for this IMap
-        IMap<byte[], byte[]> rawIntents = super.theInstance.getMap("intents");
-        intents = new SMap<>(rawIntents , super.serializer);
+        intents = new CMap<>(dbAdminService, dbService, "intents", serializer);
 
-        // TODO: disable near cache, disable read from backup for this IMap
-        IMap<byte[], byte[]> rawStates = super.theInstance.getMap("intent-states");
-        states = new SMap<>(rawStates , super.serializer);
-        EntryListener<IntentId, IntentState> listener = new RemoteIntentStateListener();
-        states.addEntryListener(listener , false);
+        states = new CMap<>(dbAdminService, dbService, "intent-states", serializer);
 
         transientStates.clear();
 
-        // TODO: disable near cache, disable read from backup for this IMap
-        IMap<byte[], byte[]> rawInstallables = super.theInstance.getMap("installable-intents");
-        installable = new SMap<>(rawInstallables , super.serializer);
+        installable = new CMap<>(dbAdminService, dbService, "installable-intents", serializer);
 
         log.info("Started");
     }
@@ -118,8 +116,8 @@
 
     @Override
     public IntentEvent createIntent(Intent intent) {
-        Intent existing = intents.putIfAbsent(intent.id(), intent);
-        if (existing != null) {
+        boolean absent = intents.putIfAbsent(intent.id(), intent);
+        if (!absent) {
             // duplicate, ignore
             return null;
         } else {
@@ -173,34 +171,47 @@
         IntentEvent.Type type = null;
         final IntentState prevParking;
         boolean transientStateChangeOnly = false;
+        boolean updated;
 
         // parking state transition
         switch (state) {
         case SUBMITTED:
-            prevParking = states.putIfAbsent(id, SUBMITTED);
+            prevParking = states.get(id);
             verify(prevParking == null,
                    "Illegal state transition attempted from %s to SUBMITTED",
                    prevParking);
+            updated = states.putIfAbsent(id, SUBMITTED);
+            verify(updated, "Conditional replace %s => %s failed", prevParking, SUBMITTED);
             type = IntentEvent.Type.SUBMITTED;
             break;
+
         case INSTALLED:
-            prevParking = states.replace(id, INSTALLED);
+            prevParking = states.get(id);
             verify(PRE_INSTALLED.contains(prevParking),
                    "Illegal state transition attempted from %s to INSTALLED",
                    prevParking);
+            updated = states.replace(id, prevParking, INSTALLED);
+            verify(updated, "Conditional replace %s => %s failed", prevParking, INSTALLED);
             type = IntentEvent.Type.INSTALLED;
             break;
+
         case FAILED:
-            prevParking = states.replace(id, FAILED);
+            prevParking = states.get(id);
+            updated = states.replace(id, prevParking, FAILED);
+            verify(updated, "Conditional replace %s => %s failed", prevParking, FAILED);
             type = IntentEvent.Type.FAILED;
             break;
+
         case WITHDRAWN:
-            prevParking = states.replace(id, WITHDRAWN);
+            prevParking = states.get(id);
             verify(PRE_WITHDRAWN.contains(prevParking),
                    "Illegal state transition attempted from %s to WITHDRAWN",
                    prevParking);
+            updated = states.replace(id, prevParking, WITHDRAWN);
+            verify(updated, "Conditional replace %s => %s failed", prevParking, WITHDRAWN);
             type = IntentEvent.Type.WITHDRAWN;
             break;
+
         default:
             transientStateChangeOnly = true;
             prevParking = null;
@@ -233,22 +244,4 @@
     public void removeInstalledIntents(IntentId intentId) {
         installable.remove(intentId);
     }
-
-    public final class RemoteIntentStateListener extends EntryAdapter<IntentId, IntentState> {
-
-        @Override
-        public void onEntryEvent(EntryEvent<IntentId, IntentState> event) {
-            final Member myself = theInstance.getCluster().getLocalMember();
-            if (!myself.equals(event.getMember())) {
-                // When Intent state was modified by remote node,
-                // clear local transient state.
-                final IntentId intentId = event.getKey();
-                IntentState oldState = transientStates.remove(intentId);
-                if (oldState != null) {
-                    log.debug("{} state updated remotely, removing transient state {}",
-                              intentId, oldState);
-                }
-            }
-        }
-    }
 }
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/intent/impl/HazelcastIntentStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/intent/impl/HazelcastIntentStore.java
new file mode 100644
index 0000000..eaad56d
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/intent/impl/HazelcastIntentStore.java
@@ -0,0 +1,254 @@
+/*
+ * Copyright 2014 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onlab.onos.store.intent.impl;
+
+import com.google.common.collect.ImmutableSet;
+import com.hazelcast.core.EntryAdapter;
+import com.hazelcast.core.EntryEvent;
+import com.hazelcast.core.EntryListener;
+import com.hazelcast.core.IMap;
+import com.hazelcast.core.Member;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Service;
+import org.onlab.onos.net.intent.Intent;
+import org.onlab.onos.net.intent.IntentEvent;
+import org.onlab.onos.net.intent.IntentId;
+import org.onlab.onos.net.intent.IntentState;
+import org.onlab.onos.net.intent.IntentStore;
+import org.onlab.onos.net.intent.IntentStoreDelegate;
+import org.onlab.onos.store.hz.AbstractHazelcastStore;
+import org.onlab.onos.store.hz.SMap;
+import org.onlab.onos.store.serializers.KryoNamespaces;
+import org.onlab.onos.store.serializers.KryoSerializer;
+import org.onlab.util.KryoNamespace;
+import org.slf4j.Logger;
+
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+import static com.google.common.base.Verify.verify;
+import static org.onlab.onos.net.intent.IntentState.*;
+import static org.slf4j.LoggerFactory.getLogger;
+
+@Component(immediate = false, enabled = false)
+@Service
+public class HazelcastIntentStore
+        extends AbstractHazelcastStore<IntentEvent, IntentStoreDelegate>
+        implements IntentStore {
+
+    /** Valid parking state, which can transition to INSTALLED. */
+    private static final Set<IntentState> PRE_INSTALLED = EnumSet.of(SUBMITTED, FAILED);
+
+    /** Valid parking state, which can transition to WITHDRAWN. */
+    private static final Set<IntentState> PRE_WITHDRAWN = EnumSet.of(INSTALLED, FAILED);
+
+    private final Logger log = getLogger(getClass());
+
+    // Assumption: IntentId will not have synonyms
+    private SMap<IntentId, Intent> intents;
+    private SMap<IntentId, IntentState> states;
+
+    // Map to store instance local intermediate state transition
+    private transient Map<IntentId, IntentState> transientStates = new ConcurrentHashMap<>();
+
+    private SMap<IntentId, List<Intent>> installable;
+
+    @Override
+    @Activate
+    public void activate() {
+        // FIXME: We need a way to add serializer for intents which has been plugged-in.
+        // As a short term workaround, relax Kryo config to
+        // registrationRequired=false
+        super.activate();
+        super.serializer = new KryoSerializer() {
+
+            @Override
+            protected void setupKryoPool() {
+                serializerPool = KryoNamespace.newBuilder()
+                        .setRegistrationRequired(false)
+                        .register(KryoNamespaces.API)
+                        .build()
+                        .populate(1);
+            }
+
+        };
+
+        // TODO: enable near cache, allow read from backup for this IMap
+        IMap<byte[], byte[]> rawIntents = super.theInstance.getMap("intents");
+        intents = new SMap<>(rawIntents , super.serializer);
+
+        // TODO: disable near cache, disable read from backup for this IMap
+        IMap<byte[], byte[]> rawStates = super.theInstance.getMap("intent-states");
+        states = new SMap<>(rawStates , super.serializer);
+        EntryListener<IntentId, IntentState> listener = new RemoteIntentStateListener();
+        states.addEntryListener(listener , false);
+
+        transientStates.clear();
+
+        // TODO: disable near cache, disable read from backup for this IMap
+        IMap<byte[], byte[]> rawInstallables = super.theInstance.getMap("installable-intents");
+        installable = new SMap<>(rawInstallables , super.serializer);
+
+        log.info("Started");
+    }
+
+    @Deactivate
+    public void deactivate() {
+        log.info("Stopped");
+    }
+
+    @Override
+    public IntentEvent createIntent(Intent intent) {
+        Intent existing = intents.putIfAbsent(intent.id(), intent);
+        if (existing != null) {
+            // duplicate, ignore
+            return null;
+        } else {
+            return this.setState(intent, IntentState.SUBMITTED);
+        }
+    }
+
+    @Override
+    public IntentEvent removeIntent(IntentId intentId) {
+        Intent intent = intents.remove(intentId);
+        installable.remove(intentId);
+        if (intent == null) {
+            // was already removed
+            return null;
+        }
+        IntentEvent event = this.setState(intent, WITHDRAWN);
+        states.remove(intentId);
+        transientStates.remove(intentId);
+        // TODO: Should we callremoveInstalledIntents if this Intent was
+        return event;
+    }
+
+    @Override
+    public long getIntentCount() {
+        return intents.size();
+    }
+
+    @Override
+    public Iterable<Intent> getIntents() {
+        return ImmutableSet.copyOf(intents.values());
+    }
+
+    @Override
+    public Intent getIntent(IntentId intentId) {
+        return intents.get(intentId);
+    }
+
+    @Override
+    public IntentState getIntentState(IntentId id) {
+        final IntentState localState = transientStates.get(id);
+        if (localState != null) {
+            return localState;
+        }
+        return states.get(id);
+    }
+
+
+    @Override
+    public IntentEvent setState(Intent intent, IntentState state) {
+        final IntentId id = intent.id();
+        IntentEvent.Type type = null;
+        final IntentState prevParking;
+        boolean transientStateChangeOnly = false;
+
+        // parking state transition
+        switch (state) {
+        case SUBMITTED:
+            prevParking = states.putIfAbsent(id, SUBMITTED);
+            verify(prevParking == null,
+                   "Illegal state transition attempted from %s to SUBMITTED",
+                   prevParking);
+            type = IntentEvent.Type.SUBMITTED;
+            break;
+        case INSTALLED:
+            prevParking = states.replace(id, INSTALLED);
+            verify(PRE_INSTALLED.contains(prevParking),
+                   "Illegal state transition attempted from %s to INSTALLED",
+                   prevParking);
+            type = IntentEvent.Type.INSTALLED;
+            break;
+        case FAILED:
+            prevParking = states.replace(id, FAILED);
+            type = IntentEvent.Type.FAILED;
+            break;
+        case WITHDRAWN:
+            prevParking = states.replace(id, WITHDRAWN);
+            verify(PRE_WITHDRAWN.contains(prevParking),
+                   "Illegal state transition attempted from %s to WITHDRAWN",
+                   prevParking);
+            type = IntentEvent.Type.WITHDRAWN;
+            break;
+        default:
+            transientStateChangeOnly = true;
+            prevParking = null;
+            break;
+        }
+        if (!transientStateChangeOnly) {
+            log.debug("Parking State change: {} {}=>{}",  id, prevParking, state);
+        }
+        // Update instance local state, which includes non-parking state transition
+        final IntentState prevTransient = transientStates.put(id, state);
+        log.debug("Transient State change: {} {}=>{}", id, prevTransient, state);
+
+        if (type == null) {
+            return null;
+        }
+        return new IntentEvent(type, intent);
+    }
+
+    @Override
+    public void setInstallableIntents(IntentId intentId, List<Intent> result) {
+        installable.put(intentId, result);
+    }
+
+    @Override
+    public List<Intent> getInstallableIntents(IntentId intentId) {
+        return installable.get(intentId);
+    }
+
+    @Override
+    public void removeInstalledIntents(IntentId intentId) {
+        installable.remove(intentId);
+    }
+
+    public final class RemoteIntentStateListener extends EntryAdapter<IntentId, IntentState> {
+
+        @Override
+        public void onEntryEvent(EntryEvent<IntentId, IntentState> event) {
+            final Member myself = theInstance.getCluster().getLocalMember();
+            if (!myself.equals(event.getMember())) {
+                // When Intent state was modified by remote node,
+                // clear local transient state.
+                final IntentId intentId = event.getKey();
+                IntentState oldState = transientStates.remove(intentId);
+                if (oldState != null) {
+                    log.debug("{} state updated remotely, removing transient state {}",
+                              intentId, oldState);
+                }
+            }
+        }
+    }
+}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseStateMachine.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseStateMachine.java
index f6ea217..2fbef0b 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseStateMachine.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseStateMachine.java
@@ -321,7 +321,7 @@
         private final Map<String, Map<String, VersionedValue>> tableData = Maps.newHashMap();
         private long versionCounter = 1;
 
-        public Map<String, VersionedValue> getTable(String tableName) {
+        Map<String, VersionedValue> getTable(String tableName) {
             return tableData.get(tableName);
         }
 
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/MapDBLog.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/MapDBLog.java
index 9ca5494..0821b4d 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/MapDBLog.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/MapDBLog.java
@@ -55,6 +55,7 @@
                 .mmapFileEnableIfSupported()
                 .cacheSize(cacheSize)
                 .makeTxMaker();
+        log.info("Raft log file: {}", dbFile.getCanonicalPath());
     }
 
     @Override