Unit tests for PartitionManager

Change-Id: I721ed6489ce19cb78ce9e2f150dfed90882f3b0e
diff --git a/core/api/src/test/java/org/onosproject/cluster/LeadershipServiceAdapter.java b/core/api/src/test/java/org/onosproject/cluster/LeadershipServiceAdapter.java
new file mode 100644
index 0000000..818edb9
--- /dev/null
+++ b/core/api/src/test/java/org/onosproject/cluster/LeadershipServiceAdapter.java
@@ -0,0 +1,65 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.cluster;
+
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Test adapter for leadership service.
+ */
+public class LeadershipServiceAdapter implements LeadershipService {
+
+    @Override
+    public NodeId getLeader(String path) {
+        return null;
+    }
+
+    @Override
+    public Leadership getLeadership(String path) {
+        return null;
+    }
+
+    @Override
+    public Set<String> ownedTopics(NodeId nodeId) {
+        return null;
+    }
+
+    @Override
+    public void runForLeadership(String path) {
+
+    }
+
+    @Override
+    public void withdraw(String path) {
+
+    }
+
+    @Override
+    public Map<String, Leadership> getLeaderBoard() {
+        return null;
+    }
+
+    @Override
+    public void addListener(LeadershipEventListener listener) {
+
+    }
+
+    @Override
+    public void removeListener(LeadershipEventListener listener) {
+
+    }
+}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/intent/impl/PartitionManager.java b/core/store/dist/src/main/java/org/onosproject/store/intent/impl/PartitionManager.java
index 72590aa..36475b8 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/intent/impl/PartitionManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/intent/impl/PartitionManager.java
@@ -57,7 +57,7 @@
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected ClusterService clusterService;
 
-    private static final int NUM_PARTITIONS = 14;
+    static final int NUM_PARTITIONS = 14;
     private static final int BACKOFF_TIME = 2;
     private static final int CHECK_PERIOD = 10;
 
@@ -90,6 +90,17 @@
         clusterService.removeListener(clusterListener);
     }
 
+    /**
+     * Sets the specified executor to be used for scheduling background tasks.
+     *
+     * @param executor scheduled executor service for background tasks
+     * @return this PartitionManager
+     */
+    public PartitionManager withScheduledExecutor(ScheduledExecutorService executor) {
+        this.executor = executor;
+        return this;
+    }
+
     private String getPartitionPath(int i) {
         return ELECTION_PREFIX + i;
     }
diff --git a/core/store/dist/src/test/java/org/onosproject/store/intent/impl/PartitionManagerTest.java b/core/store/dist/src/test/java/org/onosproject/store/intent/impl/PartitionManagerTest.java
new file mode 100644
index 0000000..c3b529e
--- /dev/null
+++ b/core/store/dist/src/test/java/org/onosproject/store/intent/impl/PartitionManagerTest.java
@@ -0,0 +1,300 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.intent.impl;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.onlab.junit.NullScheduledExecutor;
+import org.onlab.packet.IpAddress;
+import org.onosproject.cluster.ClusterServiceAdapter;
+import org.onosproject.cluster.ControllerNode;
+import org.onosproject.cluster.DefaultControllerNode;
+import org.onosproject.cluster.Leadership;
+import org.onosproject.cluster.LeadershipEvent;
+import org.onosproject.cluster.LeadershipEventListener;
+import org.onosproject.cluster.LeadershipService;
+import org.onosproject.cluster.LeadershipServiceAdapter;
+import org.onosproject.cluster.NodeId;
+import org.onosproject.net.intent.Key;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+
+import static junit.framework.TestCase.assertFalse;
+import static org.easymock.EasyMock.anyObject;
+import static org.easymock.EasyMock.anyString;
+import static org.easymock.EasyMock.createMock;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.expectLastCall;
+import static org.easymock.EasyMock.replay;
+import static org.easymock.EasyMock.reset;
+import static org.easymock.EasyMock.verify;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Unit tests for the PartitionManager class.
+ */
+public class PartitionManagerTest {
+
+    private final LeadershipEvent event
+            = new LeadershipEvent(LeadershipEvent.Type.LEADER_ELECTED,
+                                  new Leadership(ELECTION_PREFIX + "0",
+                                                 MY_NODE_ID, 0, 0));
+
+    private static final NodeId MY_NODE_ID = new NodeId("local");
+    private static final NodeId OTHER_NODE_ID = new NodeId("other");
+    private static final NodeId INACTIVE_NODE_ID = new NodeId("inactive");
+
+    private static final String ELECTION_PREFIX = "intent-partition-";
+
+        private LeadershipService leadershipService;
+    private LeadershipEventListener leaderListener;
+
+    private PartitionManager partitionManager;
+
+    @Before
+    public void setUp() {
+        leadershipService = createMock(LeadershipService.class);
+
+        leadershipService.addListener(anyObject(LeadershipEventListener.class));
+        expectLastCall().andDelegateTo(new TestLeadershipService());
+        leadershipService.runForLeadership(anyString());
+        expectLastCall().anyTimes();
+
+        partitionManager = new PartitionManager()
+                .withScheduledExecutor(new NullScheduledExecutor());
+
+        partitionManager.clusterService = new TestClusterService();
+        partitionManager.leadershipService = leadershipService;
+    }
+
+    /**
+     * Configures a mock leadership service to have the specified number of
+     * partitions owned by the local node and all other partitions owned by a
+     * (fake) remote node.
+     *
+     * @param numMine number of partitions that should be owned by the local node
+     */
+    private void setUpLeadershipService(int numMine) {
+        Map<String, Leadership> leaderBoard = new HashMap<>();
+
+        for (int i = 0; i < numMine; i++) {
+            expect(leadershipService.getLeader(ELECTION_PREFIX + i))
+                    .andReturn(MY_NODE_ID).anyTimes();
+            leaderBoard.put(ELECTION_PREFIX + i,
+                            new Leadership(ELECTION_PREFIX + i, MY_NODE_ID, 0, 0));
+        }
+
+        for (int i = numMine; i < PartitionManager.NUM_PARTITIONS; i++) {
+            expect(leadershipService.getLeader(ELECTION_PREFIX + i))
+                    .andReturn(OTHER_NODE_ID).anyTimes();
+
+            leaderBoard.put(ELECTION_PREFIX + i,
+                            new Leadership(ELECTION_PREFIX + i, OTHER_NODE_ID, 0, 0));
+        }
+
+        expect(leadershipService.getLeaderBoard()).andReturn(leaderBoard).anyTimes();
+    }
+
+    /**
+     * Tests that the PartitionManager's activate method correctly runs for
+     * all the leader elections that it should.
+     */
+    @Test
+    public void testActivate() {
+        reset(leadershipService);
+
+        leadershipService.addListener(anyObject(LeadershipEventListener.class));
+
+        for (int i = 0; i < PartitionManager.NUM_PARTITIONS; i++) {
+            leadershipService.runForLeadership(ELECTION_PREFIX + i);
+        }
+
+        replay(leadershipService);
+
+        partitionManager.activate();
+
+        verify(leadershipService);
+    }
+
+    /**
+     * Tests that the isMine method returns the correct result based on the
+     * underlying leadership service data.
+     */
+    @Test
+    public void testIsMine() {
+        // We'll own only the first partition
+        setUpLeadershipService(1);
+        replay(leadershipService);
+
+        Key myKey = new ControllableHashKey(0);
+        Key notMyKey = new ControllableHashKey(1);
+
+        assertTrue(partitionManager.isMine(myKey));
+        assertFalse(partitionManager.isMine(notMyKey));
+
+        // Make us the owner of 4 partitions now
+        reset(leadershipService);
+        setUpLeadershipService(4);
+        replay(leadershipService);
+
+        assertTrue(partitionManager.isMine(myKey));
+        // notMyKey is now my key because because we're in control of that
+        // partition now
+        assertTrue(partitionManager.isMine(notMyKey));
+
+        assertFalse(partitionManager.isMine(new ControllableHashKey(4)));
+    }
+
+    /**
+     * Tests sending in LeadershipServiceEvents in the case when we have
+     * too many partitions. The event will trigger the partition manager to
+     * reassess how many partitions it has and relinquish some.
+     */
+    @Test
+    public void testRelinquish() {
+        // We have all the partitions so we'll need to relinquish some
+        setUpLeadershipService(PartitionManager.NUM_PARTITIONS);
+
+        leadershipService.withdraw(anyString());
+        expectLastCall().times(7);
+
+        replay(leadershipService);
+
+        partitionManager.activate();
+        // Send in the event
+        leaderListener.event(event);
+
+        verify(leadershipService);
+    }
+
+    /**
+     * Tests sending in LeadershipServiceEvents in the case when we have the
+     * right amount or too many partitions. These events will not trigger any
+     * partition reassignments.
+     */
+    @Test
+    public void testNoRelinquish() {
+        // Partitions are already perfectly balanced among the two active instances
+        setUpLeadershipService(PartitionManager.NUM_PARTITIONS / 2);
+        replay(leadershipService);
+
+        partitionManager.activate();
+
+        // Send in the event
+        leaderListener.event(event);
+
+        verify(leadershipService);
+
+        reset(leadershipService);
+        // We have a smaller share than we should
+        setUpLeadershipService(PartitionManager.NUM_PARTITIONS / 2 - 1);
+        replay(leadershipService);
+
+        // Send in the event
+        leaderListener.event(event);
+
+        verify(leadershipService);
+    }
+
+    /**
+     * LeadershipService that allows us to grab a reference to
+     * PartitionManager's LeadershipEventListener.
+     */
+    public class TestLeadershipService extends LeadershipServiceAdapter {
+        @Override
+        public void addListener(LeadershipEventListener listener) {
+            leaderListener = listener;
+        }
+    }
+
+    /**
+     * ClusterService set up with a very simple cluster - 3 nodes, one is the
+     * current node, one is a different active node, and one is an inactive node.
+     */
+    private class TestClusterService extends ClusterServiceAdapter {
+
+        private final ControllerNode self =
+                new DefaultControllerNode(MY_NODE_ID, IpAddress.valueOf(1));
+        private final ControllerNode otherNode =
+                new DefaultControllerNode(OTHER_NODE_ID, IpAddress.valueOf(2));
+        private final ControllerNode inactiveNode =
+                new DefaultControllerNode(INACTIVE_NODE_ID, IpAddress.valueOf(3));
+
+        Set<ControllerNode> nodes;
+
+        public TestClusterService() {
+            nodes = new HashSet<>();
+            nodes.add(self);
+            nodes.add(otherNode);
+            nodes.add(inactiveNode);
+        }
+
+        @Override
+        public ControllerNode getLocalNode() {
+            return self;
+        }
+
+        @Override
+        public Set<ControllerNode> getNodes() {
+            return nodes;
+        }
+
+        @Override
+        public ControllerNode getNode(NodeId nodeId) {
+            return nodes.stream()
+                    .filter(c -> c.id().equals(nodeId))
+                    .findFirst()
+                    .get();
+        }
+
+        @Override
+        public ControllerNode.State getState(NodeId nodeId) {
+            return nodeId.equals(INACTIVE_NODE_ID) ? ControllerNode.State.INACTIVE :
+                   ControllerNode.State.ACTIVE;
+        }
+    }
+
+    /**
+     * A key that always hashes to a value provided to the constructor. This
+     * allows us to control the hash of the key for unit tests.
+     */
+    private class ControllableHashKey extends Key {
+
+        protected ControllableHashKey(long hash) {
+            super(hash);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(hash());
+        }
+
+        @Override
+        public boolean equals(Object obj) {
+            if (!(obj instanceof ControllableHashKey)) {
+                return false;
+            }
+
+            ControllableHashKey that = (ControllableHashKey) obj;
+
+            return Objects.equals(this.hash(), that.hash());
+        }
+    }
+}
diff --git a/utils/junit/src/main/java/org/onlab/junit/NullScheduledExecutor.java b/utils/junit/src/main/java/org/onlab/junit/NullScheduledExecutor.java
new file mode 100644
index 0000000..6d95970
--- /dev/null
+++ b/utils/junit/src/main/java/org/onlab/junit/NullScheduledExecutor.java
@@ -0,0 +1,135 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onlab.junit;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * A scheduled executor service that does not do any of the work scheduled to it.
+ * <p>
+ * This is useful for testing when you want to disable a background scheduled
+ * task.
+ * </p>
+ */
+public class NullScheduledExecutor implements ScheduledExecutorService {
+    @Override
+    public ScheduledFuture<?> schedule(Runnable command, long delay,
+                                       TimeUnit unit) {
+        return null;
+    }
+
+    @Override
+    public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay,
+                                           TimeUnit unit) {
+        return null;
+    }
+
+    @Override
+    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
+                                                  long initialDelay,
+                                                  long period, TimeUnit unit) {
+        return null;
+    }
+
+    @Override
+    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
+                                                     long initialDelay,
+                                                     long delay,
+                                                     TimeUnit unit) {
+        return null;
+    }
+
+    @Override
+    public void shutdown() {
+
+    }
+
+    @Override
+    public List<Runnable> shutdownNow() {
+        return null;
+    }
+
+    @Override
+    public boolean isShutdown() {
+        return false;
+    }
+
+    @Override
+    public boolean isTerminated() {
+        return false;
+    }
+
+    @Override
+    public boolean awaitTermination(long timeout, TimeUnit unit)
+            throws InterruptedException {
+        return false;
+    }
+
+    @Override
+    public <T> Future<T> submit(Callable<T> task) {
+        return null;
+    }
+
+    @Override
+    public <T> Future<T> submit(Runnable task, T result) {
+        return null;
+    }
+
+    @Override
+    public Future<?> submit(Runnable task) {
+        return null;
+    }
+
+    @Override
+    public <T> List<Future<T>> invokeAll(
+            Collection<? extends Callable<T>> tasks)
+            throws InterruptedException {
+        return null;
+    }
+
+    @Override
+    public <T> List<Future<T>> invokeAll(
+            Collection<? extends Callable<T>> tasks, long timeout,
+            TimeUnit unit) throws InterruptedException {
+        return null;
+    }
+
+    @Override
+    public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
+            throws InterruptedException, ExecutionException {
+        return null;
+    }
+
+    @Override
+    public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
+                           long timeout, TimeUnit unit)
+            throws InterruptedException, ExecutionException, TimeoutException {
+        return null;
+    }
+
+    @Override
+    public void execute(Runnable command) {
+
+    }
+}