Enabled leadership service implementation based on consistent map + added precondition checks to partitioned database.

Change-Id: Ia76f8479d9113e7ad67e583e4ca157e62a1cabc7
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/HazelcastLeadershipService.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/HazelcastLeadershipService.java
index f75cd48..b48ce21 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/HazelcastLeadershipService.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/HazelcastLeadershipService.java
@@ -73,7 +73,7 @@
  * the current leader (e.g., for informational purpose).
  * </p>
  */
-@Component(immediate = true)
+@Component(immediate = true, enabled = false)
 @Service
 public class HazelcastLeadershipService implements LeadershipService {
     private static final Logger log =
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseManager.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseManager.java
index 7ccdd80..9eed61a 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseManager.java
@@ -145,7 +145,7 @@
         });
         try {
             if (!latch.await(DATABASE_STARTUP_TIMEOUT_SEC, TimeUnit.SECONDS)) {
-                log.warn("Timeed out watiing for database to initialize.");
+                log.warn("Timed out waiting for database to initialize.");
             }
         } catch (InterruptedException e) {
             Thread.currentThread().interrupt();
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DistributedLeadershipManager.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DistributedLeadershipManager.java
index 3b65ec0..0d38dd3 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DistributedLeadershipManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DistributedLeadershipManager.java
@@ -52,7 +52,7 @@
  * detection capabilities to detect and purge stale locks.
  * TODO: Ensure lock safety and liveness.
  */
-@Component(immediate = true, enabled = false)
+@Component(immediate = true, enabled = true)
 @Service
 public class DistributedLeadershipManager implements LeadershipService {
 
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabase.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabase.java
index 04e5e5c..24ec92b 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabase.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabase.java
@@ -36,6 +36,8 @@
 
 import net.kuujo.copycat.cluster.internal.coordinator.ClusterCoordinator;
 
+import static com.google.common.base.Preconditions.checkState;
+
 /**
  * A database that partitions the keys across one or more database partitions.
  */
@@ -44,11 +46,21 @@
     private Partitioner<String> partitioner;
     private final ClusterCoordinator coordinator;
     private final Map<String, Database> partitions = Maps.newConcurrentMap();
+    private final AtomicBoolean isOpen = new AtomicBoolean(false);
+    private static final String DB_NOT_OPEN = "Database is not open";
 
     protected PartitionedDatabase(ClusterCoordinator coordinator) {
         this.coordinator = coordinator;
     }
 
+    /**
+     * Returns true if the database is open.
+     * @return true if open, false otherwise
+     */
+    public boolean isOpen() {
+        return isOpen.get();
+    }
+
     @Override
     public void registerPartition(String name, Database partition) {
         partitions.put(name, partition);
@@ -61,6 +73,7 @@
 
     @Override
     public CompletableFuture<Integer> size(String tableName) {
+        checkState(isOpen.get(), DB_NOT_OPEN);
         AtomicInteger totalSize = new AtomicInteger(0);
         return CompletableFuture.allOf(partitions
                     .values()
@@ -72,16 +85,19 @@
 
     @Override
     public CompletableFuture<Boolean> isEmpty(String tableName) {
+        checkState(isOpen.get(), DB_NOT_OPEN);
         return size(tableName).thenApply(size -> size == 0);
     }
 
     @Override
     public CompletableFuture<Boolean> containsKey(String tableName, String key) {
+        checkState(isOpen.get(), DB_NOT_OPEN);
         return partitioner.getPartition(tableName, key).containsKey(tableName, key);
     }
 
     @Override
     public CompletableFuture<Boolean> containsValue(String tableName, byte[] value) {
+        checkState(isOpen.get(), DB_NOT_OPEN);
         AtomicBoolean containsValue = new AtomicBoolean(false);
         return CompletableFuture.allOf(partitions
                     .values()
@@ -93,21 +109,25 @@
 
     @Override
     public CompletableFuture<Versioned<byte[]>> get(String tableName, String key) {
+        checkState(isOpen.get(), DB_NOT_OPEN);
         return partitioner.getPartition(tableName, key).get(tableName, key);
     }
 
     @Override
     public CompletableFuture<Versioned<byte[]>> put(String tableName, String key, byte[] value) {
+        checkState(isOpen.get(), DB_NOT_OPEN);
         return partitioner.getPartition(tableName, key).put(tableName, key, value);
     }
 
     @Override
     public CompletableFuture<Versioned<byte[]>> remove(String tableName, String key) {
+        checkState(isOpen.get(), DB_NOT_OPEN);
         return partitioner.getPartition(tableName, key).remove(tableName, key);
     }
 
     @Override
     public CompletableFuture<Void> clear(String tableName) {
+        checkState(isOpen.get(), DB_NOT_OPEN);
         return CompletableFuture.allOf(partitions
                     .values()
                     .stream()
@@ -117,6 +137,7 @@
 
     @Override
     public CompletableFuture<Set<String>> keySet(String tableName) {
+        checkState(isOpen.get(), DB_NOT_OPEN);
         Set<String> keySet = Sets.newConcurrentHashSet();
         return CompletableFuture.allOf(partitions
                     .values()
@@ -128,6 +149,7 @@
 
     @Override
     public CompletableFuture<Collection<Versioned<byte[]>>> values(String tableName) {
+        checkState(isOpen.get(), DB_NOT_OPEN);
         List<Versioned<byte[]>> values = new CopyOnWriteArrayList<>();
         return CompletableFuture.allOf(partitions
                     .values()
@@ -139,6 +161,7 @@
 
     @Override
     public CompletableFuture<Set<Entry<String, Versioned<byte[]>>>> entrySet(String tableName) {
+        checkState(isOpen.get(), DB_NOT_OPEN);
         Set<Entry<String, Versioned<byte[]>>> entrySet = Sets.newConcurrentHashSet();
         return CompletableFuture.allOf(partitions
                     .values()
@@ -150,31 +173,37 @@
 
     @Override
     public CompletableFuture<Versioned<byte[]>> putIfAbsent(String tableName, String key, byte[] value) {
+        checkState(isOpen.get(), DB_NOT_OPEN);
         return partitioner.getPartition(tableName, key).putIfAbsent(tableName, key, value);
     }
 
     @Override
     public CompletableFuture<Boolean> remove(String tableName, String key, byte[] value) {
+        checkState(isOpen.get(), DB_NOT_OPEN);
         return partitioner.getPartition(tableName, key).remove(tableName, key, value);
     }
 
     @Override
     public CompletableFuture<Boolean> remove(String tableName, String key, long version) {
+        checkState(isOpen.get(), DB_NOT_OPEN);
         return partitioner.getPartition(tableName, key).remove(tableName, key, version);
     }
 
     @Override
     public CompletableFuture<Boolean> replace(String tableName, String key, byte[] oldValue, byte[] newValue) {
+        checkState(isOpen.get(), DB_NOT_OPEN);
         return partitioner.getPartition(tableName, key).replace(tableName, key, oldValue, newValue);
     }
 
     @Override
     public CompletableFuture<Boolean> replace(String tableName, String key, long oldVersion, byte[] newValue) {
+        checkState(isOpen.get(), DB_NOT_OPEN);
         return partitioner.getPartition(tableName, key).replace(tableName, key, oldVersion, newValue);
     }
 
     @Override
     public CompletableFuture<Boolean> atomicBatchUpdate(List<UpdateOperation<String, byte[]>> updates) {
+        checkState(isOpen.get(), DB_NOT_OPEN);
         Map<Database, List<UpdateOperation<String, byte[]>>> perPartitionUpdates = Maps.newHashMap();
         for (UpdateOperation<String, byte[]> update : updates) {
             Database partition = partitioner.getPartition(update.tableName(), update.key());
@@ -207,12 +236,15 @@
                                                         .stream()
                                                         .map(Database::open)
                                                         .toArray(CompletableFuture[]::new))
-                                 .thenApply(v -> this));
+                                 .thenApply(v -> {
+                                     isOpen.set(true);
+                                     return this; }));
 
     }
 
     @Override
     public CompletableFuture<Void> close() {
+        checkState(isOpen.get(), DB_NOT_OPEN);
         CompletableFuture<Void> closePartitions = CompletableFuture.allOf(partitions
                 .values()
                 .stream()
diff --git a/core/store/dist/src/main/java/org/onosproject/store/intent/impl/PartitionManager.java b/core/store/dist/src/main/java/org/onosproject/store/intent/impl/PartitionManager.java
index d913a38..e2e6322 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/intent/impl/PartitionManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/intent/impl/PartitionManager.java
@@ -33,6 +33,8 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.base.Objects;
+
 import java.util.List;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
@@ -169,7 +171,7 @@
         public void event(LeadershipEvent event) {
             Leadership leadership = event.subject();
 
-            if (leadership.leader().equals(clusterService.getLocalNode().id()) &&
+            if (Objects.equal(leadership.leader(), clusterService.getLocalNode().id()) &&
                     leadership.topic().startsWith(ELECTION_PREFIX)) {
 
                 // See if we need to let some partitions go