Implemented a PartitionManager to keep track of partitions
assigned to instances.

Also updated GossipIntentStore a little to the new API. This work is not
yet complete.

Change-Id: I64d1779b669de51c35da686b65006a80ac4819b0
diff --git a/core/store/dist/src/main/java/org/onosproject/store/intent/impl/GossipIntentStore.java b/core/store/dist/src/main/java/org/onosproject/store/intent/impl/GossipIntentStore.java
index ba348d4..f3917c9 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/intent/impl/GossipIntentStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/intent/impl/GossipIntentStore.java
@@ -26,11 +26,14 @@
 import org.onosproject.cluster.ClusterService;
 import org.onosproject.net.intent.BatchWrite;
 import org.onosproject.net.intent.Intent;
+import org.onosproject.net.intent.IntentData;
 import org.onosproject.net.intent.IntentEvent;
 import org.onosproject.net.intent.IntentId;
+import org.onosproject.net.intent.IntentOperation;
 import org.onosproject.net.intent.IntentState;
 import org.onosproject.net.intent.IntentStore;
 import org.onosproject.net.intent.IntentStoreDelegate;
+import org.onosproject.net.intent.Key;
 import org.onosproject.store.AbstractStore;
 import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
 import org.onosproject.store.impl.EventuallyConsistentMap;
@@ -66,12 +69,18 @@
 
     private EventuallyConsistentMap<IntentId, List<Intent>> installables;
 
+    // Map of intent key => pending intent operation
+    private EventuallyConsistentMap<String, IntentOperation> pending;
+
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected ClusterCommunicationService clusterCommunicator;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected ClusterService clusterService;
 
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected PartitionService partitionService;
+
     @Activate
     public void activate() {
         KryoNamespace.Builder intentSerializer = KryoNamespace.newBuilder()
@@ -93,16 +102,25 @@
                                                          intentSerializer,
                                                          new WallclockClockManager<>());
 
+        pending = new EventuallyConsistentMapImpl<>("intent-pending",
+                                                    clusterService,
+                                                    clusterCommunicator,
+                                                    intentSerializer, // TODO
+                                                    new WallclockClockManager<>());
+
         intentStates.addListener(new InternalIntentStatesListener());
+        pending.addListener(new InternalPendingListener());
 
         log.info("Started");
     }
 
     @Deactivate
     public void deactivate() {
+
         intents.destroy();
         intentStates.destroy();
         installables.destroy();
+        pending.destroy();
 
         log.info("Stopped");
     }
@@ -148,6 +166,9 @@
                 intents.put(intent.id(), intent);
                 intentStates.put(intent.id(), INSTALL_REQ);
 
+                // TODO remove from pending?
+
+
                 break;
             case REMOVE_INTENT:
                 checkArgument(op.args().size() == 1,
@@ -193,6 +214,41 @@
         return failed;
     }
 
+    @Override
+    public void write(IntentData newData) {
+        // TODO
+    }
+
+    @Override
+    public void batchWrite(Iterable<IntentData> updates) {
+        // TODO
+    }
+
+    @Override
+    public Intent getIntent(Key key) {
+        return null; // TODO
+    }
+
+    @Override
+    public IntentData getIntentData(Key key) {
+        return null; // TODO
+    }
+
+    @Override
+    public void addPending(IntentData data) {
+        // TODO implement
+
+        // Check the intent versions
+        //pending.put(op.key(), op);
+    }
+
+    @Override
+    public boolean isMaster(Intent intent) {
+        // TODO
+        //return partitionService.isMine(intent.key());
+        return false;
+    }
+
     private void notifyDelegateIfNotNull(IntentEvent event) {
         if (event != null) {
             notifyDelegate(event);
@@ -219,5 +275,22 @@
         }
     }
 
+    private final class InternalPendingListener implements
+            EventuallyConsistentMapListener<String, IntentOperation> {
+        @Override
+        public void event(
+                EventuallyConsistentMapEvent<String, IntentOperation> event) {
+            if (event.type() == EventuallyConsistentMapEvent.Type.PUT) {
+                // The pending intents map has been updated. If we are master for
+                // this intent's partition, notify the Manager that they should do
+                // some work.
+                if (isMaster(event.value().intent())) {
+                    // TODO delegate.process(event.value());
+                    log.debug("implement this");
+                }
+            }
+        }
+    }
+
 }
 
diff --git a/core/store/dist/src/main/java/org/onosproject/store/intent/impl/PartitionId.java b/core/store/dist/src/main/java/org/onosproject/store/intent/impl/PartitionId.java
new file mode 100644
index 0000000..3b3e538
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/intent/impl/PartitionId.java
@@ -0,0 +1,59 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.intent.impl;
+
+import com.google.common.base.MoreObjects;
+
+import java.util.Objects;
+
+/**
+ * Identifies a partition of the intent keyspace which will be assigned to and
+ * processed by a single ONOS instance at a time.
+ */
+public class PartitionId {
+    private final int id;
+
+    /**
+     * Creates a new partition ID.
+     *
+     * @param id the partition ID
+     */
+    PartitionId(int id) {
+        this.id = id;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (!(o instanceof PartitionId)) {
+            return false;
+        }
+
+        PartitionId that = (PartitionId) o;
+        return Objects.equals(this.id, that.id);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(id);
+    }
+
+    @Override
+    public String toString() {
+        return MoreObjects.toStringHelper(getClass())
+                .add("partition ID", id)
+                .toString();
+    }
+}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/intent/impl/PartitionManager.java b/core/store/dist/src/main/java/org/onosproject/store/intent/impl/PartitionManager.java
new file mode 100644
index 0000000..eb831c5
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/intent/impl/PartitionManager.java
@@ -0,0 +1,121 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.intent.impl;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.apache.felix.scr.annotations.Service;
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.Leadership;
+import org.onosproject.cluster.LeadershipEvent;
+import org.onosproject.cluster.LeadershipEventListener;
+import org.onosproject.cluster.LeadershipService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * Manages the assignment of intent keyspace partitions to instances.
+ */
+@Component(immediate = true)
+@Service
+public class PartitionManager implements PartitionService {
+
+    private static final Logger log = LoggerFactory.getLogger(PartitionManager.class);
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected LeadershipService leadershipService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected ClusterService clusterService;
+
+    // TODO make configurable
+    private static final int NUM_PARTITIONS = 100;
+
+    private static final String ELECTION_PREFIX = "intent-partition-";
+
+    private LeadershipEventListener leaderListener = new InternalLeadershipListener();
+
+    private Set<PartitionId> myPartitions;
+
+    @Activate
+    public void activate() {
+        myPartitions = Collections.newSetFromMap(new ConcurrentHashMap<>());
+
+        leadershipService.addListener(leaderListener);
+
+        for (int i = 0; i < NUM_PARTITIONS; i++) {
+            leadershipService.runForLeadership(ELECTION_PREFIX + i);
+        }
+    }
+
+    @Deactivate
+    public void deactivate() {
+        leadershipService.removeListener(leaderListener);
+    }
+
+    private PartitionId getPartitionForKey(String intentKey) {
+        return new PartitionId(intentKey.hashCode() % NUM_PARTITIONS);
+    }
+
+    @Override
+    public boolean isMine(String intentKey) {
+        return checkNotNull(
+                myPartitions.contains(getPartitionForKey(intentKey)));
+    }
+
+    private final class InternalLeadershipListener implements LeadershipEventListener {
+
+        @Override
+        public void event(LeadershipEvent event) {
+            Leadership leadership = event.subject();
+            // update internal state about which partitions I'm leader of
+            if (leadership.leader().equals(clusterService.getLocalNode().id()) &&
+                    leadership.topic().startsWith(ELECTION_PREFIX)) {
+
+                // Parse out the partition ID
+                String[] splitted = leadership.topic().split("-");
+                if (splitted.length != 3) {
+                    log.warn("Couldn't parse leader election topic {}", leadership.topic());
+                    return;
+                }
+
+                int partitionId;
+                try {
+                    partitionId = Integer.parseInt(splitted[2]);
+                } catch (NumberFormatException e) {
+                    log.warn("Couldn't parse partition ID {}", splitted[2]);
+                    return;
+                }
+
+                if (event.type() == LeadershipEvent.Type.LEADER_ELECTED) {
+                    myPartitions.add(new PartitionId(partitionId));
+                } else if (event.type() == LeadershipEvent.Type.LEADER_BOOTED) {
+                    myPartitions.remove(new PartitionId(partitionId));
+                }
+            }
+
+        }
+    }
+}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/intent/impl/PartitionService.java b/core/store/dist/src/main/java/org/onosproject/store/intent/impl/PartitionService.java
new file mode 100644
index 0000000..1062b65
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/intent/impl/PartitionService.java
@@ -0,0 +1,33 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.intent.impl;
+
+/**
+ * Service for interacting with the partition-to-instance assignments.
+ */
+public interface PartitionService {
+
+    /**
+     * Returns whether the given intent key is in a partition owned by this
+     * instance or not.
+     *
+     * @param intentKey intent key to query
+     * @return true if the key is owned by this instance, otherwise false
+     */
+    boolean isMine(String intentKey);
+
+    // TODO add API for rebalancing partitions
+}