Updates for SDN-IP:

 * Use the new Leadership Service instead of Distributed Lock to
   elect the SDN-IP Leader
 * Reimplement the SDN-IP Intent Synchronizer. In the new implementation
   the Point-to-Point Peer intents are also synchronized by and pushed
   only by the Leader (same as the Multipoint-to-SinglePoint Route intents)
 * Minor cleanups

Change-Id: I8e142781211a1d0f2d362875bc28fd05d843cd4b
diff --git a/apps/sdnip/pom.xml b/apps/sdnip/pom.xml
index 8f4a7c2..a7ece45 100644
--- a/apps/sdnip/pom.xml
+++ b/apps/sdnip/pom.xml
@@ -49,6 +49,12 @@
     </dependency>
 
     <dependency>
+      <groupId>org.apache.commons</groupId>
+      <artifactId>commons-collections4</artifactId>
+      <version>4.0</version>
+    </dependency>
+
+    <dependency>
       <groupId>org.onlab.onos</groupId>
       <artifactId>onlab-thirdparty</artifactId>
     </dependency>
@@ -69,10 +75,12 @@
       <artifactId>onos-cli</artifactId>
       <version>${project.version}</version>
     </dependency>
+
     <dependency>
       <groupId>org.apache.karaf.shell</groupId>
       <artifactId>org.apache.karaf.shell.console</artifactId>
     </dependency>
+
     <dependency>
       <groupId>org.osgi</groupId>
       <artifactId>org.osgi.core</artifactId>
diff --git a/apps/sdnip/src/main/java/org/onlab/onos/sdnip/IntentSynchronizer.java b/apps/sdnip/src/main/java/org/onlab/onos/sdnip/IntentSynchronizer.java
index 55a9d24..530e6d3 100644
--- a/apps/sdnip/src/main/java/org/onlab/onos/sdnip/IntentSynchronizer.java
+++ b/apps/sdnip/src/main/java/org/onlab/onos/sdnip/IntentSynchronizer.java
@@ -20,33 +20,37 @@
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Semaphore;
 
-import org.apache.commons.lang3.tuple.Pair;
 import org.onlab.onos.core.ApplicationId;
 import org.onlab.onos.net.flow.criteria.Criteria.IPCriterion;
 import org.onlab.onos.net.flow.criteria.Criterion;
 import org.onlab.onos.net.intent.Intent;
+import org.onlab.onos.net.intent.IntentOperations;
 import org.onlab.onos.net.intent.IntentService;
 import org.onlab.onos.net.intent.IntentState;
 import org.onlab.onos.net.intent.MultiPointToSinglePointIntent;
+import org.onlab.onos.net.intent.PointToPointIntent;
 import org.onlab.packet.Ip4Prefix;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.base.Objects;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
+import static com.google.common.base.Preconditions.checkArgument;
+
 public class IntentSynchronizer {
     private static final Logger log =
         LoggerFactory.getLogger(IntentSynchronizer.class);
 
     private final ApplicationId appId;
     private final IntentService intentService;
-    private final Map<Ip4Prefix, MultiPointToSinglePointIntent> pushedRouteIntents;
+    private final Map<IntentKey, PointToPointIntent> peerIntents;
+    private final Map<Ip4Prefix, MultiPointToSinglePointIntent> routeIntents;
 
     //
     // State to deal with SDN-IP Leader election and pushing Intents
@@ -65,7 +69,8 @@
     IntentSynchronizer(ApplicationId appId, IntentService intentService) {
         this.appId = appId;
         this.intentService = intentService;
-        pushedRouteIntents = new ConcurrentHashMap<>();
+        peerIntents = new ConcurrentHashMap<>();
+        routeIntents = new ConcurrentHashMap<>();
 
         bgpIntentsSynchronizerExecutor = Executors.newSingleThreadExecutor(
                 new ThreadFactoryBuilder()
@@ -88,28 +93,47 @@
      * Stops the synchronizer.
      */
     public void stop() {
-        // Stop the thread(s)
-        bgpIntentsSynchronizerExecutor.shutdownNow();
+        synchronized (this) {
+            // Stop the thread(s)
+            bgpIntentsSynchronizerExecutor.shutdownNow();
 
-        //
-        // Withdraw all SDN-IP intents
-        //
-        if (!isElectedLeader) {
-            return;         // Nothing to do: not the leader anymore
-        }
-        log.debug("Withdrawing all SDN-IP Route Intents...");
-        for (Intent intent : intentService.getIntents()) {
-            if (!(intent instanceof MultiPointToSinglePointIntent)
-                || !intent.appId().equals(appId)) {
-                continue;
+            //
+            // Withdraw all SDN-IP intents
+            //
+            if (!isElectedLeader) {
+                return;         // Nothing to do: not the leader anymore
             }
-            intentService.withdraw(intent);
-        }
 
-        pushedRouteIntents.clear();
+            //
+            // Build a batch operation to withdraw all intents from this
+            // application.
+            //
+            log.debug("Withdrawing all SDN-IP Intents...");
+            IntentOperations.Builder builder = IntentOperations.builder();
+            for (Intent intent : intentService.getIntents()) {
+                // Skip the intents from other applications
+                if (!intent.appId().equals(appId)) {
+                    continue;
+                }
+
+                // Skip the intents that are already withdrawn
+                IntentState intentState =
+                    intentService.getIntentState(intent.id());
+                if (intentState.equals(IntentState.WITHDRAWING) ||
+                    intentState.equals(IntentState.WITHDRAWN)) {
+                    continue;
+                }
+
+                builder.addWithdrawOperation(intent.id());
+            }
+            intentService.execute(builder.build());
+            leaderChanged(false);
+
+            peerIntents.clear();
+            routeIntents.clear();
+        }
     }
 
-    //@Override TODO hook this up to something
     public void leaderChanged(boolean isLeader) {
         log.debug("Leader changed: {}", isLeader);
 
@@ -128,18 +152,18 @@
     }
 
     /**
-     * Gets the pushed route intents.
+     * Gets the route intents.
      *
-     * @return the pushed route intents
+     * @return the route intents
      */
-    public Collection<MultiPointToSinglePointIntent> getPushedRouteIntents() {
-        List<MultiPointToSinglePointIntent> pushedIntents = new LinkedList<>();
+    public Collection<MultiPointToSinglePointIntent> getRouteIntents() {
+        List<MultiPointToSinglePointIntent> result = new LinkedList<>();
 
         for (Map.Entry<Ip4Prefix, MultiPointToSinglePointIntent> entry :
-            pushedRouteIntents.entrySet()) {
-            pushedIntents.add(entry.getValue());
+            routeIntents.entrySet()) {
+            result.add(entry.getValue());
         }
-        return pushedIntents;
+        return result;
     }
 
     /**
@@ -158,11 +182,11 @@
                     intentsSynchronizerSemaphore.drainPermits();
                 } catch (InterruptedException e) {
                     log.debug("Interrupted while waiting to become " +
-                                      "Intent Synchronization leader");
+                              "Intent Synchronization leader");
                     interrupted = true;
                     break;
                 }
-                syncIntents();
+                synchronizeIntents();
             }
         } finally {
             if (interrupted) {
@@ -172,6 +196,29 @@
     }
 
     /**
+     * Submits a collection of point-to-point intents.
+     *
+     * @param intents the intents to submit
+     */
+    void submitPeerIntents(Collection<PointToPointIntent> intents) {
+        synchronized (this) {
+            // Store the intents in memory
+            for (PointToPointIntent intent : intents) {
+                peerIntents.put(new IntentKey(intent), intent);
+            }
+
+            // Push the intents
+            if (isElectedLeader && isActivatedLeader) {
+                log.debug("Submitting all SDN-IP Peer Intents...");
+                // TODO: We should use a single Intent batch operation
+                for (Intent intent : intents) {
+                    intentService.submit(intent);
+                }
+            }
+        }
+    }
+
+    /**
      * Submits a multi-point-to-single-point intent.
      *
      * @param prefix the IPv4 matching prefix for the intent to submit
@@ -180,14 +227,21 @@
     void submitRouteIntent(Ip4Prefix prefix,
                            MultiPointToSinglePointIntent intent) {
         synchronized (this) {
+            MultiPointToSinglePointIntent oldIntent =
+                routeIntents.put(prefix, intent);
+
             if (isElectedLeader && isActivatedLeader) {
                 log.debug("Intent installation: adding Intent for prefix: {}",
                           prefix);
+                if (oldIntent != null) {
+                    //
+                    // TODO: Short-term solution to explicitly withdraw
+                    // instead of using "replace" operation.
+                    //
+                    intentService.withdraw(oldIntent);
+                }
                 intentService.submit(intent);
             }
-
-            // Maintain the Intent
-            pushedRouteIntents.put(prefix, intent);
         }
     }
 
@@ -199,11 +253,11 @@
     void withdrawRouteIntent(Ip4Prefix prefix) {
         synchronized (this) {
             MultiPointToSinglePointIntent intent =
-                pushedRouteIntents.remove(prefix);
+                routeIntents.remove(prefix);
 
             if (intent == null) {
-                log.debug("There is no intent in pushedRouteIntents to delete " +
-                          "for prefix: {}", prefix);
+                log.debug("There is no Intent in routeIntents to " +
+                          "delete for prefix: {}", prefix);
                 return;
             }
 
@@ -216,191 +270,312 @@
     }
 
     /**
-     * Performs Intents Synchronization between the internally stored Route
-     * Intents and the installed Route Intents.
+     * Synchronize the in-memory Intents with the Intents in the Intent
+     * framework.
      */
-    private void syncIntents() {
+    void synchronizeIntents() {
         synchronized (this) {
+
+            Map<IntentKey, Intent> localIntents = new HashMap<>();
+            Map<IntentKey, Intent> fetchedIntents = new HashMap<>();
+            Collection<Intent> storeInMemoryIntents = new LinkedList<>();
+            Collection<Intent> addIntents = new LinkedList<>();
+            Collection<Intent> deleteIntents = new LinkedList<>();
+
             if (!isElectedLeader) {
                 return;         // Nothing to do: not the leader anymore
             }
-            log.debug("Syncing SDN-IP Route Intents...");
+            log.debug("Syncing SDN-IP Intents...");
 
-            Map<Ip4Prefix, MultiPointToSinglePointIntent> fetchedIntents =
-                    new HashMap<>();
+            // Prepare the local intents
+            for (Intent intent : routeIntents.values()) {
+                localIntents.put(new IntentKey(intent), intent);
+            }
+            for (Intent intent : peerIntents.values()) {
+                localIntents.put(new IntentKey(intent), intent);
+            }
 
-            //
-            // Fetch all intents, and classify the Multi-Point-to-Point Intents
-            // based on the matching prefix.
-            //
+            // Fetch all intents for this application
             for (Intent intent : intentService.getIntents()) {
-
-                if (!(intent instanceof MultiPointToSinglePointIntent)
-                        || !intent.appId().equals(appId)) {
+                if (!intent.appId().equals(appId)) {
                     continue;
                 }
-                MultiPointToSinglePointIntent mp2pIntent =
-                        (MultiPointToSinglePointIntent) intent;
-
-                Criterion c =
-                    mp2pIntent.selector().getCriterion(Criterion.Type.IPV4_DST);
-                if (c != null && c instanceof IPCriterion) {
-                    IPCriterion ipCriterion = (IPCriterion) c;
-                    Ip4Prefix ip4Prefix = ipCriterion.ip().getIp4Prefix();
-                    if (ip4Prefix == null) {
-                        // TODO: For now we support only IPv4
-                        continue;
-                    }
-                    fetchedIntents.put(ip4Prefix, mp2pIntent);
-                } else {
-                    log.warn("No IPV4_DST criterion found for intent {}",
-                            mp2pIntent.id());
-                }
-
+                fetchedIntents.put(new IntentKey(intent), intent);
             }
 
-            //
-            // Compare for each prefix the local IN-MEMORY Intents with the
-            // FETCHED Intents:
-            //  - If the IN-MEMORY Intent is same as the FETCHED Intent, store
-            //    the FETCHED Intent in the local memory (i.e., override the
-            //    IN-MEMORY Intent) to preserve the original Intent ID
-            //  - if the IN-MEMORY Intent is not same as the FETCHED Intent,
-            //    delete the FETCHED Intent, and push/install the IN-MEMORY
-            //    Intent.
-            //  - If there is an IN-MEMORY Intent for a prefix, but no FETCHED
-            //    Intent for same prefix, then push/install the IN-MEMORY
-            //    Intent.
-            //  - If there is a FETCHED Intent for a prefix, but no IN-MEMORY
-            //    Intent for same prefix, then delete/withdraw the FETCHED
-            //    Intent.
-            //
-            Collection<Pair<Ip4Prefix, MultiPointToSinglePointIntent>>
-                    storeInMemoryIntents = new LinkedList<>();
-            Collection<Pair<Ip4Prefix, MultiPointToSinglePointIntent>>
-                    addIntents = new LinkedList<>();
-            Collection<Pair<Ip4Prefix, MultiPointToSinglePointIntent>>
-                    deleteIntents = new LinkedList<>();
-            for (Map.Entry<Ip4Prefix, MultiPointToSinglePointIntent> entry :
-                    pushedRouteIntents.entrySet()) {
-                Ip4Prefix prefix = entry.getKey();
-                MultiPointToSinglePointIntent inMemoryIntent =
-                        entry.getValue();
-                MultiPointToSinglePointIntent fetchedIntent =
-                        fetchedIntents.get(prefix);
-
-                if (fetchedIntent == null) {
-                    //
-                    // No FETCHED Intent for same prefix: push the IN-MEMORY
-                    // Intent.
-                    //
-                    addIntents.add(Pair.of(prefix, inMemoryIntent));
-                    continue;
-                }
-
-                IntentState state = intentService.getIntentState(fetchedIntent.id());
-                if (state == IntentState.WITHDRAWING ||
-                        state == IntentState.WITHDRAWN) {
-                    // The intent has been withdrawn but according to our route
-                    // table it should be installed. We'll reinstall it.
-                    addIntents.add(Pair.of(prefix, inMemoryIntent));
-                }
-
-                //
-                // If IN-MEMORY Intent is same as the FETCHED Intent,
-                // store the FETCHED Intent in the local memory.
-                //
-                if (compareMultiPointToSinglePointIntents(inMemoryIntent,
-                                                          fetchedIntent)) {
-                    storeInMemoryIntents.add(Pair.of(prefix, fetchedIntent));
-                } else {
-                    //
-                    // The IN-MEMORY Intent is not same as the FETCHED Intent,
-                    // hence delete the FETCHED Intent, and install the
-                    // IN-MEMORY Intent.
-                    //
-                    deleteIntents.add(Pair.of(prefix, fetchedIntent));
-                    addIntents.add(Pair.of(prefix, inMemoryIntent));
-                }
-                fetchedIntents.remove(prefix);
-            }
-
-            //
-            // Any remaining FETCHED Intents have to be deleted/withdrawn
-            //
-            for (Map.Entry<Ip4Prefix, MultiPointToSinglePointIntent> entry :
-                    fetchedIntents.entrySet()) {
-                Ip4Prefix prefix = entry.getKey();
-                MultiPointToSinglePointIntent fetchedIntent = entry.getValue();
-                deleteIntents.add(Pair.of(prefix, fetchedIntent));
-            }
+            computeIntentsDelta(localIntents, fetchedIntents,
+                                storeInMemoryIntents, addIntents,
+                                deleteIntents);
 
             //
             // Perform the actions:
             // 1. Store in memory fetched intents that are same. Can be done
             //    even if we are not the leader anymore
-            // 2. Delete intents: check if the leader before each operation
-            // 3. Add intents: check if the leader before each operation
+            // 2. Delete intents: check if the leader before the operation
+            // 3. Add intents: check if the leader before the operation
             //
-            for (Pair<Ip4Prefix, MultiPointToSinglePointIntent> pair :
-                    storeInMemoryIntents) {
-                Ip4Prefix prefix = pair.getLeft();
-                MultiPointToSinglePointIntent intent = pair.getRight();
-                log.debug("Intent synchronization: updating in-memory " +
-                                  "Intent for prefix: {}", prefix);
-                pushedRouteIntents.put(prefix, intent);
-            }
-            //
-            isActivatedLeader = true;           // Allow push of Intents
-            for (Pair<Ip4Prefix, MultiPointToSinglePointIntent> pair :
-                    deleteIntents) {
-                Ip4Prefix prefix = pair.getLeft();
-                MultiPointToSinglePointIntent intent = pair.getRight();
-                if (!isElectedLeader) {
-                    isActivatedLeader = false;
-                    return;
+            for (Intent intent : storeInMemoryIntents) {
+                // Store the intent in memory based on its type
+                if (intent instanceof MultiPointToSinglePointIntent) {
+                    MultiPointToSinglePointIntent mp2pIntent =
+                        (MultiPointToSinglePointIntent) intent;
+                    // Find the IP prefix
+                    Criterion c =
+                        mp2pIntent.selector().getCriterion(Criterion.Type.IPV4_DST);
+                    if (c != null && c instanceof IPCriterion) {
+                        IPCriterion ipCriterion = (IPCriterion) c;
+                        Ip4Prefix ip4Prefix = ipCriterion.ip().getIp4Prefix();
+                        if (ip4Prefix == null) {
+                            // TODO: For now we support only IPv4
+                            continue;
+                        }
+                        log.debug("Intent synchronization: updating " +
+                                  "in-memory Route Intent for prefix {}",
+                                  ip4Prefix);
+                        routeIntents.put(ip4Prefix, mp2pIntent);
+                    } else {
+                        log.warn("No IPV4_DST criterion found for Intent {}",
+                                 mp2pIntent.id());
+                    }
+                    continue;
                 }
-                log.debug("Intent synchronization: deleting Intent for " +
-                                  "prefix: {}", prefix);
-                intentService.withdraw(intent);
-            }
-            //
-            for (Pair<Ip4Prefix, MultiPointToSinglePointIntent> pair :
-                    addIntents) {
-                Ip4Prefix prefix = pair.getLeft();
-                MultiPointToSinglePointIntent intent = pair.getRight();
-                if (!isElectedLeader) {
-                    isActivatedLeader = false;
-                    return;
+                if (intent instanceof PointToPointIntent) {
+                    PointToPointIntent p2pIntent = (PointToPointIntent) intent;
+                    log.debug("Intent synchronization: updating " +
+                              "in-memory Peer Intent {}", p2pIntent);
+                    peerIntents.put(new IntentKey(intent), p2pIntent);
+                    continue;
                 }
-                log.debug("Intent synchronization: adding Intent for " +
-                                  "prefix: {}", prefix);
-                intentService.submit(intent);
+            }
+
+            // Withdraw Intents
+            IntentOperations.Builder builder = IntentOperations.builder();
+            for (Intent intent : deleteIntents) {
+                builder.addWithdrawOperation(intent.id());
+                log.debug("Intent synchronization: deleting Intent {}",
+                          intent);
             }
             if (!isElectedLeader) {
                 isActivatedLeader = false;
+                return;
+            }
+            intentService.execute(builder.build());
+
+            // Add Intents
+            builder = IntentOperations.builder();
+            for (Intent intent : addIntents) {
+                builder.addSubmitOperation(intent);
+                log.debug("Intent synchronization: adding Intent {}", intent);
+            }
+            if (!isElectedLeader) {
+                isActivatedLeader = false;
+                return;
+            }
+            intentService.execute(builder.build());
+
+            if (isElectedLeader) {
+                isActivatedLeader = true;       // Allow push of Intents
+            } else {
+                isActivatedLeader = false;
             }
             log.debug("Syncing SDN-IP routes completed.");
         }
     }
 
     /**
-     * Compares two Multi-point to Single Point Intents whether they represent
-     * same logical intention.
+     * Computes the delta in two sets of Intents: local in-memory Intents,
+     * and intents fetched from the Intent framework.
      *
-     * @param intent1 the first Intent to compare
-     * @param intent2 the second Intent to compare
-     * @return true if both Intents represent same logical intention, otherwise
-     * false
+     * @param localIntents the local in-memory Intents
+     * @param fetchedIntents the Intents fetched from the Intent framework
+     * @param storeInMemoryIntents the Intents that should be stored in memory.
+     * Note: This Collection must be allocated by the caller, and it will
+     * be populated by this method.
+     * @param addIntents the Intents that should be added to the Intent
+     * framework. Note: This Collection must be allocated by the caller, and
+     * it will be populated by this method.
+     * @param deleteIntents the Intents that should be deleted from the Intent
+     * framework. Note: This Collection must be allocated by the caller, and
+     * it will be populated by this method.
      */
-    private boolean compareMultiPointToSinglePointIntents(
-            MultiPointToSinglePointIntent intent1,
-            MultiPointToSinglePointIntent intent2) {
+    private void computeIntentsDelta(
+                                final Map<IntentKey, Intent> localIntents,
+                                final Map<IntentKey, Intent> fetchedIntents,
+                                Collection<Intent> storeInMemoryIntents,
+                                Collection<Intent> addIntents,
+                                Collection<Intent> deleteIntents) {
 
-        return Objects.equal(intent1.appId(), intent2.appId()) &&
-                Objects.equal(intent1.selector(), intent2.selector()) &&
-                Objects.equal(intent1.treatment(), intent2.treatment()) &&
-                Objects.equal(intent1.ingressPoints(), intent2.ingressPoints()) &&
-                Objects.equal(intent1.egressPoint(), intent2.egressPoint());
+        //
+        // Compute the deltas between the LOCAL in-memory Intents and the
+        // FETCHED Intents:
+        //  - If an Intent is in both the LOCAL and FETCHED sets:
+        //    If the FETCHED Intent is WITHDRAWING or WITHDRAWN, then
+        //    the LOCAL Intent should be added/installed; otherwise the
+        //    FETCHED intent should be stored in the local memory
+        //    (i.e., override the LOCAL Intent) to preserve the original
+        //    Intent ID.
+        //  - if a LOCAL Intent is not in the FETCHED set, then the LOCAL
+        //    Intent should be added/installed.
+        //  - If a FETCHED Intent is not in the LOCAL set, then the FETCHED
+        //    Intent should be deleted/withdrawn.
+        //
+        for (Map.Entry<IntentKey, Intent> entry : localIntents.entrySet()) {
+            IntentKey intentKey = entry.getKey();
+            Intent localIntent = entry.getValue();
+            Intent fetchedIntent = fetchedIntents.get(intentKey);
+
+            if (fetchedIntent == null) {
+                //
+                // No FETCHED Intent found: push the LOCAL Intent.
+                //
+                addIntents.add(localIntent);
+                continue;
+            }
+
+            IntentState state =
+                intentService.getIntentState(fetchedIntent.id());
+            if (state == IntentState.WITHDRAWING ||
+                state == IntentState.WITHDRAWN) {
+                // The intent has been withdrawn but according to our route
+                // table it should be installed. We'll reinstall it.
+                addIntents.add(localIntent);
+                continue;
+            }
+            storeInMemoryIntents.add(fetchedIntent);
+        }
+
+        for (Map.Entry<IntentKey, Intent> entry : fetchedIntents.entrySet()) {
+            IntentKey intentKey = entry.getKey();
+            Intent fetchedIntent = entry.getValue();
+            Intent localIntent = localIntents.get(intentKey);
+
+            if (localIntent != null) {
+                continue;
+            }
+
+            IntentState state =
+                intentService.getIntentState(fetchedIntent.id());
+            if (state == IntentState.WITHDRAWING ||
+                state == IntentState.WITHDRAWN) {
+                // Nothing to do. The intent has been already withdrawn.
+                continue;
+            }
+            //
+            // No LOCAL Intent found: delete/withdraw the FETCHED Intent.
+            //
+            deleteIntents.add(fetchedIntent);
+        }
+    }
+
+    /**
+     * Helper class that can be used to compute the key for an Intent by
+     * by excluding the Intent ID.
+     */
+    static final class IntentKey {
+        private final Intent intent;
+
+        /**
+         * Constructor.
+         *
+         * @param intent the intent to use
+         */
+        IntentKey(Intent intent) {
+            checkArgument((intent instanceof MultiPointToSinglePointIntent) ||
+                          (intent instanceof PointToPointIntent),
+                          "Intent type not recognized", intent);
+            this.intent = intent;
+        }
+
+        /**
+         * Compares two Multi-Point to Single-Point Intents whether they
+         * represent same logical intention.
+         *
+         * @param intent1 the first Intent to compare
+         * @param intent2 the second Intent to compare
+         * @return true if both Intents represent same logical intention,
+         * otherwise false
+         */
+        static boolean equalIntents(MultiPointToSinglePointIntent intent1,
+                                    MultiPointToSinglePointIntent intent2) {
+            return Objects.equals(intent1.appId(), intent2.appId()) &&
+                Objects.equals(intent1.selector(), intent2.selector()) &&
+                Objects.equals(intent1.treatment(), intent2.treatment()) &&
+                Objects.equals(intent1.ingressPoints(), intent2.ingressPoints()) &&
+                Objects.equals(intent1.egressPoint(), intent2.egressPoint());
+        }
+
+        /**
+         * Compares two Point-to-Point Intents whether they represent
+         * same logical intention.
+         *
+         * @param intent1 the first Intent to compare
+         * @param intent2 the second Intent to compare
+         * @return true if both Intents represent same logical intention,
+         * otherwise false
+         */
+        static boolean equalIntents(PointToPointIntent intent1,
+                                    PointToPointIntent intent2) {
+            return Objects.equals(intent1.appId(), intent2.appId()) &&
+                Objects.equals(intent1.selector(), intent2.selector()) &&
+                Objects.equals(intent1.treatment(), intent2.treatment()) &&
+                Objects.equals(intent1.ingressPoint(), intent2.ingressPoint()) &&
+                Objects.equals(intent1.egressPoint(), intent2.egressPoint());
+        }
+
+        @Override
+        public int hashCode() {
+            if (intent instanceof PointToPointIntent) {
+                PointToPointIntent p2pIntent = (PointToPointIntent) intent;
+                return Objects.hash(p2pIntent.appId(),
+                                    p2pIntent.resources(),
+                                    p2pIntent.selector(),
+                                    p2pIntent.treatment(),
+                                    p2pIntent.constraints(),
+                                    p2pIntent.ingressPoint(),
+                                    p2pIntent.egressPoint());
+            }
+            if (intent instanceof MultiPointToSinglePointIntent) {
+                MultiPointToSinglePointIntent m2pIntent =
+                    (MultiPointToSinglePointIntent) intent;
+                return Objects.hash(m2pIntent.appId(),
+                                    m2pIntent.resources(),
+                                    m2pIntent.selector(),
+                                    m2pIntent.treatment(),
+                                    m2pIntent.constraints(),
+                                    m2pIntent.ingressPoints(),
+                                    m2pIntent.egressPoint());
+            }
+            checkArgument(false, "Intent type not recognized", intent);
+            return 0;
+        }
+
+        @Override
+        public boolean equals(Object obj) {
+            if (this == obj) {
+                return true;
+            }
+            if ((obj == null) || (!(obj instanceof IntentKey))) {
+                return false;
+            }
+            IntentKey other = (IntentKey) obj;
+
+            if (this.intent instanceof PointToPointIntent) {
+                if (!(other.intent instanceof PointToPointIntent)) {
+                    return false;
+                }
+                return equalIntents((PointToPointIntent) this.intent,
+                                    (PointToPointIntent) other.intent);
+            }
+            if (this.intent instanceof MultiPointToSinglePointIntent) {
+                if (!(other.intent instanceof MultiPointToSinglePointIntent)) {
+                    return false;
+                }
+                return equalIntents(
+                                (MultiPointToSinglePointIntent) this.intent,
+                                (MultiPointToSinglePointIntent) other.intent);
+            }
+            checkArgument(false, "Intent type not recognized", intent);
+            return false;
+        }
     }
 }
diff --git a/apps/sdnip/src/main/java/org/onlab/onos/sdnip/PeerConnectivityManager.java b/apps/sdnip/src/main/java/org/onlab/onos/sdnip/PeerConnectivityManager.java
index a691ade..3305766 100644
--- a/apps/sdnip/src/main/java/org/onlab/onos/sdnip/PeerConnectivityManager.java
+++ b/apps/sdnip/src/main/java/org/onlab/onos/sdnip/PeerConnectivityManager.java
@@ -16,6 +16,7 @@
 package org.onlab.onos.sdnip;
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.List;
 
 import org.onlab.onos.core.ApplicationId;
@@ -24,8 +25,6 @@
 import org.onlab.onos.net.flow.DefaultTrafficTreatment;
 import org.onlab.onos.net.flow.TrafficSelector;
 import org.onlab.onos.net.flow.TrafficTreatment;
-import org.onlab.onos.net.intent.Intent;
-import org.onlab.onos.net.intent.IntentService;
 import org.onlab.onos.net.intent.PointToPointIntent;
 import org.onlab.onos.sdnip.bgp.BgpConstants;
 import org.onlab.onos.sdnip.config.BgpPeer;
@@ -48,9 +47,9 @@
     private static final Logger log = LoggerFactory.getLogger(
             PeerConnectivityManager.class);
 
+    private final IntentSynchronizer intentSynchronizer;
     private final SdnIpConfigService configService;
     private final InterfaceService interfaceService;
-    private final IntentService intentService;
 
     private final ApplicationId appId;
 
@@ -58,18 +57,18 @@
      * Creates a new PeerConnectivityManager.
      *
      * @param appId             the application ID
+     * @param intentSynchronizer the intent synchronizer
      * @param configService     the SDN-IP config service
      * @param interfaceService  the interface service
-     * @param intentService     the intent service
      */
     public PeerConnectivityManager(ApplicationId appId,
+                                   IntentSynchronizer intentSynchronizer,
                                    SdnIpConfigService configService,
-                                   InterfaceService interfaceService,
-                                   IntentService intentService) {
+                                   InterfaceService interfaceService) {
         this.appId = appId;
+        this.intentSynchronizer = intentSynchronizer;
         this.configService = configService;
         this.interfaceService = interfaceService;
-        this.intentService = intentService;
     }
 
     /**
@@ -107,6 +106,8 @@
      * {@link BgpSpeaker}s and all external {@link BgpPeer}s.
      */
     private void setUpConnectivity() {
+        List<PointToPointIntent> intents = new ArrayList<>();
+
         for (BgpSpeaker bgpSpeaker : configService.getBgpSpeakers()
                 .values()) {
             log.debug("Start to set up BGP paths for BGP speaker: {}",
@@ -117,9 +118,12 @@
                 log.debug("Start to set up BGP paths between BGP speaker: {} "
                                   + "to BGP peer: {}", bgpSpeaker, bgpPeer);
 
-                buildPeerIntents(bgpSpeaker, bgpPeer);
+                intents.addAll(buildPeerIntents(bgpSpeaker, bgpPeer));
             }
         }
+
+        // Submit all the intents.
+        intentSynchronizer.submitPeerIntents(intents);
     }
 
     /**
@@ -128,9 +132,12 @@
      *
      * @param bgpSpeaker the BGP speaker
      * @param bgpPeer the BGP peer
+     * @return the intents to install
      */
-    private void buildPeerIntents(BgpSpeaker bgpSpeaker, BgpPeer bgpPeer) {
-        List<Intent> intents = new ArrayList<Intent>();
+    private Collection<PointToPointIntent> buildPeerIntents(
+                                                BgpSpeaker bgpSpeaker,
+                                                BgpPeer bgpPeer) {
+        List<PointToPointIntent> intents = new ArrayList<>();
 
         ConnectPoint bgpdConnectPoint = bgpSpeaker.connectPoint();
 
@@ -142,7 +149,7 @@
 
         if (peerInterface == null) {
             log.error("No interface found for peer {}", bgpPeer.ipAddress());
-            return;
+            return intents;
         }
 
         IpAddress bgpdAddress = null;
@@ -156,7 +163,7 @@
         if (bgpdAddress == null) {
             log.debug("No IP address found for peer {} on interface {}",
                       bgpPeer, bgpPeer.connectPoint());
-            return;
+            return intents;
         }
 
         IpAddress bgpdPeerAddress = bgpPeer.ipAddress();
@@ -231,11 +238,7 @@
         intents.add(new PointToPointIntent(appId, selector, treatment,
                                bgpdPeerConnectPoint, bgpdConnectPoint));
 
-        // Submit all the intents.
-        // TODO submit as a batch
-        for (Intent intent : intents) {
-            intentService.submit(intent);
-        }
+        return intents;
     }
 
     /**
@@ -249,7 +252,8 @@
      * @return the new traffic selector
      */
     private TrafficSelector buildSelector(byte ipProto, IpAddress srcIp,
-                                 IpAddress dstIp, Short srcTcpPort, Short dstTcpPort) {
+                                          IpAddress dstIp, Short srcTcpPort,
+                                          Short dstTcpPort) {
         TrafficSelector.Builder builder = DefaultTrafficSelector.builder()
                 .matchEthType(Ethernet.TYPE_IPV4)
                 .matchIPProtocol(ipProto)
diff --git a/apps/sdnip/src/main/java/org/onlab/onos/sdnip/Router.java b/apps/sdnip/src/main/java/org/onlab/onos/sdnip/Router.java
index b442b09..f26aab3 100644
--- a/apps/sdnip/src/main/java/org/onlab/onos/sdnip/Router.java
+++ b/apps/sdnip/src/main/java/org/onlab/onos/sdnip/Router.java
@@ -92,18 +92,19 @@
      *
      * @param appId             the application ID
      * @param intentSynchronizer the intent synchronizer
-     * @param hostService       the host service
      * @param configService     the configuration service
      * @param interfaceService  the interface service
+     * @param hostService       the host service
      */
     public Router(ApplicationId appId, IntentSynchronizer intentSynchronizer,
-                  HostService hostService, SdnIpConfigService configService,
-                  InterfaceService interfaceService) {
+                  SdnIpConfigService configService,
+                  InterfaceService interfaceService,
+                  HostService hostService) {
         this.appId = appId;
         this.intentSynchronizer = intentSynchronizer;
-        this.hostService = hostService;
         this.configService = configService;
         this.interfaceService = interfaceService;
+        this.hostService = hostService;
 
         this.hostListener = new InternalHostListener();
 
diff --git a/apps/sdnip/src/main/java/org/onlab/onos/sdnip/SdnIp.java b/apps/sdnip/src/main/java/org/onlab/onos/sdnip/SdnIp.java
index 85f9114..5c19c33 100644
--- a/apps/sdnip/src/main/java/org/onlab/onos/sdnip/SdnIp.java
+++ b/apps/sdnip/src/main/java/org/onlab/onos/sdnip/SdnIp.java
@@ -18,8 +18,6 @@
 import static org.slf4j.LoggerFactory.getLogger;
 
 import java.util.Collection;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
@@ -27,6 +25,11 @@
 import org.apache.felix.scr.annotations.Reference;
 import org.apache.felix.scr.annotations.ReferenceCardinality;
 import org.apache.felix.scr.annotations.Service;
+import org.onlab.onos.cluster.ClusterService;
+import org.onlab.onos.cluster.ControllerNode;
+import org.onlab.onos.cluster.LeadershipEvent;
+import org.onlab.onos.cluster.LeadershipEventListener;
+import org.onlab.onos.cluster.LeadershipService;
 import org.onlab.onos.core.ApplicationId;
 import org.onlab.onos.core.CoreService;
 import org.onlab.onos.net.host.HostService;
@@ -35,11 +38,8 @@
 import org.onlab.onos.sdnip.bgp.BgpSession;
 import org.onlab.onos.sdnip.bgp.BgpSessionManager;
 import org.onlab.onos.sdnip.config.SdnIpConfigReader;
-import org.onlab.onos.store.service.Lock;
-import org.onlab.onos.store.service.LockService;
-import org.slf4j.Logger;
 
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.slf4j.Logger;
 
 /**
  * Component for the SDN-IP peering application.
@@ -65,55 +65,49 @@
     protected HostService hostService;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
-    protected LockService lockService;
+    protected ClusterService clusterService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected LeadershipService leadershipService;
 
     private IntentSynchronizer intentSynchronizer;
     private SdnIpConfigReader config;
     private PeerConnectivityManager peerConnectivity;
     private Router router;
     private BgpSessionManager bgpSessionManager;
-
-    private ExecutorService leaderElectionExecutor;
-    private Lock leaderLock;
-    private volatile boolean isShutdown = true;
+    private LeadershipEventListener leadershipEventListener =
+        new InnerLeadershipEventListener();
+    ApplicationId appId;
+    private ControllerNode localControllerNode;
 
     @Activate
     protected void activate() {
         log.info("SDN-IP started");
-        isShutdown = false;
 
-        ApplicationId appId = coreService.registerApplication(SDN_IP_APP);
+        appId = coreService.registerApplication(SDN_IP_APP);
         config = new SdnIpConfigReader();
         config.init();
 
+        localControllerNode = clusterService.getLocalNode();
+
         InterfaceService interfaceService =
             new HostToInterfaceAdaptor(hostService);
 
         intentSynchronizer = new IntentSynchronizer(appId, intentService);
         intentSynchronizer.start();
 
-        peerConnectivity = new PeerConnectivityManager(appId, config,
-                interfaceService, intentService);
+        peerConnectivity = new PeerConnectivityManager(appId,
+                                                       intentSynchronizer,
+                                                       config,
+                                                       interfaceService);
         peerConnectivity.start();
 
-        router = new Router(appId, intentSynchronizer, hostService, config,
-                            interfaceService);
+        router = new Router(appId, intentSynchronizer, config,
+                            interfaceService, hostService);
         router.start();
 
-        leaderLock = lockService.create(SDN_IP_APP + "/sdnIpLeaderLock");
-        leaderElectionExecutor = Executors.newSingleThreadExecutor(
-                new ThreadFactoryBuilder()
-                .setNameFormat("sdnip-leader-election-%d").build());
-        leaderElectionExecutor.execute(new Runnable() {
-            @Override
-            public void run() {
-                doLeaderElectionThread();
-            }
-        });
-
-        // Manually set the instance as the leader to allow testing
-        // TODO change this when we get a leader election
-        // intentSynchronizer.leaderChanged(true);
+        leadershipService.addListener(leadershipEventListener);
+        leadershipService.runForLeadership(appId.name());
 
         bgpSessionManager = new BgpSessionManager(router);
         // TODO: the local BGP listen port number should be configurable
@@ -124,17 +118,16 @@
 
     @Deactivate
     protected void deactivate() {
-        isShutdown = true;
 
         bgpSessionManager.stop();
         router.stop();
         peerConnectivity.stop();
         intentSynchronizer.stop();
 
-        // Stop the thread(s)
-        leaderElectionExecutor.shutdownNow();
+        leadershipService.withdraw(appId.name());
+        leadershipService.removeListener(leadershipEventListener);
 
-        log.info("Stopped");
+        log.info("SDN-IP Stopped");
     }
 
     @Override
@@ -162,63 +155,38 @@
     }
 
     /**
-     * Performs the leader election.
+     * A listener for Leadership Events.
      */
-    private void doLeaderElectionThread() {
+    private class InnerLeadershipEventListener
+        implements LeadershipEventListener {
 
-        //
-        // Try to acquire the lock and keep extending it until the instance
-        // is shutdown.
-        //
-        while (!isShutdown) {
-            log.debug("SDN-IP Leader Election begin");
+        @Override
+        public void event(LeadershipEvent event) {
+            log.debug("Leadership Event: time = {} type = {} event = {}",
+                      event.time(), event.type(), event);
 
-            // Block until it becomes the leader
-            try {
-                leaderLock.lock(LEASE_DURATION_MS);
+            if (!event.subject().topic().equals(appId.name())) {
+                return;         // Not our topic: ignore
+            }
+            if (!event.subject().leader().id().equals(
+                        localControllerNode.id())) {
+                return;         // The event is not about this instance: ignore
+            }
 
-                // This instance is the leader
+            switch (event.type()) {
+            case LEADER_ELECTED:
                 log.info("SDN-IP Leader Elected");
                 intentSynchronizer.leaderChanged(true);
-
-                // Keep extending the expiration until shutdown
-                int extensionFailedCountdown = LEASE_EXTEND_RETRY_MAX - 1;
-
-                //
-                // Keep periodically extending the lock expiration.
-                // If there are multiple back-to-back failures to extend (with
-                // extra sleep time between retrials), then release the lock.
-                //
-                while (!isShutdown) {
-                    Thread.sleep(LEASE_DURATION_MS / LEASE_EXTEND_RETRY_MAX);
-                    if (leaderLock.extendExpiration(LEASE_DURATION_MS)) {
-                        log.trace("SDN-IP Leader Extended");
-                        extensionFailedCountdown = LEASE_EXTEND_RETRY_MAX;
-                    } else {
-                        log.debug("SDN-IP Leader Cannot Extend Election");
-                        if (!leaderLock.isLocked()) {
-                            log.debug("SDN-IP Leader Lock Lost");
-                            intentSynchronizer.leaderChanged(false);
-                            break;              // Try again to get the lock
-                        }
-                        extensionFailedCountdown--;
-                        if (extensionFailedCountdown <= 0) {
-                            // Failed too many times to extend.
-                            // Release the lock.
-                            log.debug("SDN-IP Leader Lock Released");
-                            intentSynchronizer.leaderChanged(false);
-                            leaderLock.unlock();
-                            break;              // Try again to get the lock
-                        }
-                    }
-                }
-            } catch (InterruptedException e) {
-                // Thread interrupted. Time to shutdown
-                log.debug("SDN-IP Leader Interrupted");
+                break;
+            case LEADER_BOOTED:
+                log.info("SDN-IP Leader Lost Election");
+                intentSynchronizer.leaderChanged(false);
+                break;
+            case LEADER_REELECTED:
+                break;
+            default:
+                break;
             }
         }
-        // If we reach here, the instance was shutdown
-        intentSynchronizer.leaderChanged(false);
-        leaderLock.unlock();
     }
 }
diff --git a/apps/sdnip/src/test/java/org/onlab/onos/sdnip/IntentSyncTest.java b/apps/sdnip/src/test/java/org/onlab/onos/sdnip/IntentSyncTest.java
index 4b29ba8..3d9cd23 100644
--- a/apps/sdnip/src/test/java/org/onlab/onos/sdnip/IntentSyncTest.java
+++ b/apps/sdnip/src/test/java/org/onlab/onos/sdnip/IntentSyncTest.java
@@ -5,16 +5,23 @@
 import static org.easymock.EasyMock.expect;
 import static org.easymock.EasyMock.expectLastCall;
 import static org.easymock.EasyMock.replay;
+import static org.easymock.EasyMock.reportMatcher;
 import static org.easymock.EasyMock.reset;
 import static org.easymock.EasyMock.verify;
+import static org.hamcrest.Matchers.is;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 
 import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 
+import org.apache.commons.collections4.CollectionUtils;
+import org.easymock.IArgumentMatcher;
 import org.junit.Before;
 import org.junit.Test;
 import org.onlab.junit.TestUtils;
@@ -35,10 +42,14 @@
 import org.onlab.onos.net.host.HostService;
 import org.onlab.onos.net.host.InterfaceIpAddress;
 import org.onlab.onos.net.intent.Intent;
+import org.onlab.onos.net.intent.IntentId;
+import org.onlab.onos.net.intent.IntentOperation;
+import org.onlab.onos.net.intent.IntentOperations;
 import org.onlab.onos.net.intent.IntentService;
 import org.onlab.onos.net.intent.IntentState;
 import org.onlab.onos.net.intent.MultiPointToSinglePointIntent;
 import org.onlab.onos.net.provider.ProviderId;
+import org.onlab.onos.sdnip.IntentSynchronizer.IntentKey;
 import org.onlab.onos.sdnip.config.Interface;
 import org.onlab.packet.Ethernet;
 import org.onlab.packet.IpAddress;
@@ -97,8 +108,8 @@
         intentService = createMock(IntentService.class);
 
         intentSynchronizer = new IntentSynchronizer(APPID, intentService);
-        router = new Router(APPID, intentSynchronizer,
-                hostService, null, interfaceService);
+        router = new Router(APPID, intentSynchronizer, null, interfaceService,
+                            hostService);
     }
 
     /**
@@ -263,17 +274,16 @@
         // Compose a intent, which is equal to intent5 but the id is different.
         MultiPointToSinglePointIntent intent5New =
                 staticIntentBuilder(intent5, routeEntry5, "00:00:00:00:00:01");
-        assertTrue(TestUtils.callMethod(intentSynchronizer,
-                "compareMultiPointToSinglePointIntents",
-                new Class<?>[] {MultiPointToSinglePointIntent.class,
-                MultiPointToSinglePointIntent.class},
-                intent5, intent5New).equals(true));
+        assertThat(IntentSynchronizer.IntentKey.equalIntents(
+                        intent5, intent5New),
+                   is(true));
         assertFalse(intent5.equals(intent5New));
 
         MultiPointToSinglePointIntent intent6 = intentBuilder(
                 routeEntry6.prefix(), "00:00:00:00:00:01",  SW1_ETH1);
 
-        // Set up the bgpRoutes and pushedRouteIntents fields in Router class
+        // Set up the bgpRoutes field in Router class and routeIntents fields
+        // in IntentSynchronizer class
         InvertedRadixTree<RouteEntry> bgpRoutes =
                 new ConcurrentInvertedRadixTree<>(
                 new DefaultByteArrayNodeFactory());
@@ -292,15 +302,14 @@
         TestUtils.setField(router, "bgpRoutes", bgpRoutes);
 
         ConcurrentHashMap<Ip4Prefix, MultiPointToSinglePointIntent>
-        pushedRouteIntents =  new ConcurrentHashMap<>();
-        pushedRouteIntents.put(routeEntry1.prefix(), intent1);
-        pushedRouteIntents.put(routeEntry3.prefix(), intent3);
-        pushedRouteIntents.put(routeEntry4Update.prefix(), intent4Update);
-        pushedRouteIntents.put(routeEntry5.prefix(), intent5New);
-        pushedRouteIntents.put(routeEntry6.prefix(), intent6);
-        pushedRouteIntents.put(routeEntry7.prefix(), intent7);
-        TestUtils.setField(intentSynchronizer, "pushedRouteIntents",
-                           pushedRouteIntents);
+        routeIntents =  new ConcurrentHashMap<>();
+        routeIntents.put(routeEntry1.prefix(), intent1);
+        routeIntents.put(routeEntry3.prefix(), intent3);
+        routeIntents.put(routeEntry4Update.prefix(), intent4Update);
+        routeIntents.put(routeEntry5.prefix(), intent5New);
+        routeIntents.put(routeEntry6.prefix(), intent6);
+        routeIntents.put(routeEntry7.prefix(), intent7);
+        TestUtils.setField(intentSynchronizer, "routeIntents", routeIntents);
 
         // Set up expectation
         reset(intentService);
@@ -322,18 +331,26 @@
                 .andReturn(IntentState.WITHDRAWING).anyTimes();
         expect(intentService.getIntents()).andReturn(intents).anyTimes();
 
-        intentService.withdraw(intent2);
-        intentService.submit(intent3);
-        intentService.withdraw(intent4);
-        intentService.submit(intent4Update);
-        intentService.submit(intent6);
-        intentService.submit(intent7);
+        IntentOperations.Builder builder = IntentOperations.builder();
+        builder.addWithdrawOperation(intent2.id());
+        builder.addWithdrawOperation(intent4.id());
+        intentService.execute(eqExceptId(builder.build()));
+
+        builder = IntentOperations.builder();
+        builder.addSubmitOperation(intent3);
+        builder.addSubmitOperation(intent4Update);
+        builder.addSubmitOperation(intent6);
+        builder.addSubmitOperation(intent7);
+        intentService.execute(eqExceptId(builder.build()));
         replay(intentService);
 
         // Start the test
         intentSynchronizer.leaderChanged(true);
-        TestUtils.callMethod(intentSynchronizer, "syncIntents",
+        /*
+        TestUtils.callMethod(intentSynchronizer, "synchronizeIntents",
                              new Class<?>[] {});
+        */
+        intentSynchronizer.synchronizeIntents();
 
         // Verify
         assertEquals(router.getRoutes().size(), 6);
@@ -343,12 +360,12 @@
         assertTrue(router.getRoutes().contains(routeEntry5));
         assertTrue(router.getRoutes().contains(routeEntry6));
 
-        assertEquals(intentSynchronizer.getPushedRouteIntents().size(), 6);
-        assertTrue(intentSynchronizer.getPushedRouteIntents().contains(intent1));
-        assertTrue(intentSynchronizer.getPushedRouteIntents().contains(intent3));
-        assertTrue(intentSynchronizer.getPushedRouteIntents().contains(intent4Update));
-        assertTrue(intentSynchronizer.getPushedRouteIntents().contains(intent5));
-        assertTrue(intentSynchronizer.getPushedRouteIntents().contains(intent6));
+        assertEquals(intentSynchronizer.getRouteIntents().size(), 6);
+        assertTrue(intentSynchronizer.getRouteIntents().contains(intent1));
+        assertTrue(intentSynchronizer.getRouteIntents().contains(intent3));
+        assertTrue(intentSynchronizer.getRouteIntents().contains(intent4Update));
+        assertTrue(intentSynchronizer.getRouteIntents().contains(intent5));
+        assertTrue(intentSynchronizer.getRouteIntents().contains(intent6));
 
         verify(intentService);
     }
@@ -410,4 +427,129 @@
                 "ingressPoints", intent.ingressPoints());
         return intentNew;
     }
+
+    /*
+     * EasyMock matcher that matches {@link IntenOperations} but
+     * ignores the {@link IntentId} when matching.
+     * <p/>
+     * The normal intent equals method tests that the intent IDs are equal,
+     * however in these tests we can't know what the intent IDs will be in
+     * advance, so we can't set up expected intents with the correct IDs. Thus,
+     * the solution is to use an EasyMock matcher that verifies that all the
+     * value properties of the provided intent match the expected values, but
+     * ignores the intent ID when testing equality.
+     */
+    private static final class IdAgnosticIntentOperationsMatcher implements
+                IArgumentMatcher {
+
+        private final IntentOperations intentOperations;
+        private String providedString;
+
+        /**
+         * Constructor taking the expected intent operations to match against.
+         *
+         * @param intentOperations the expected intent operations
+         */
+        public IdAgnosticIntentOperationsMatcher(
+                        IntentOperations intentOperations) {
+            this.intentOperations = intentOperations;
+        }
+
+        @Override
+        public void appendTo(StringBuffer strBuffer) {
+            strBuffer.append("IntentOperationsMatcher unable to match: "
+                    + providedString);
+        }
+
+        @Override
+        public boolean matches(Object object) {
+            if (!(object instanceof IntentOperations)) {
+                return false;
+            }
+
+            IntentOperations providedIntentOperations =
+                (IntentOperations) object;
+            providedString = providedIntentOperations.toString();
+
+            List<IntentKey> thisSubmitIntents = new LinkedList<>();
+            List<IntentId> thisWithdrawIntentIds = new LinkedList<>();
+            List<IntentKey> thisReplaceIntents = new LinkedList<>();
+            List<IntentKey> thisUpdateIntents = new LinkedList<>();
+            List<IntentKey> providedSubmitIntents = new LinkedList<>();
+            List<IntentId> providedWithdrawIntentIds = new LinkedList<>();
+            List<IntentKey> providedReplaceIntents = new LinkedList<>();
+            List<IntentKey> providedUpdateIntents = new LinkedList<>();
+
+            extractIntents(intentOperations, thisSubmitIntents,
+                           thisWithdrawIntentIds, thisReplaceIntents,
+                           thisUpdateIntents);
+            extractIntents(providedIntentOperations, providedSubmitIntents,
+                           providedWithdrawIntentIds, providedReplaceIntents,
+                           providedUpdateIntents);
+
+            return CollectionUtils.isEqualCollection(thisSubmitIntents,
+                                                     providedSubmitIntents) &&
+                CollectionUtils.isEqualCollection(thisWithdrawIntentIds,
+                                                  providedWithdrawIntentIds) &&
+                CollectionUtils.isEqualCollection(thisUpdateIntents,
+                                                  providedUpdateIntents) &&
+                CollectionUtils.isEqualCollection(thisReplaceIntents,
+                                                  providedReplaceIntents);
+        }
+
+        /**
+         * Extracts the intents per operation type. Each intent is encapsulated
+         * in IntentKey so it can be compared by excluding the Intent ID.
+         *
+         * @param intentOperations the container with the intent operations
+         * to extract the intents from
+         * @param submitIntents the SUBMIT intents
+         * @param withdrawIntentIds the WITHDRAW intents IDs
+         * @param replaceIntents the REPLACE intents
+         * @param updateIntents the UPDATE intens
+         */
+        private void extractIntents(IntentOperations intentOperations,
+                                    List<IntentKey> submitIntents,
+                                    List<IntentId> withdrawIntentIds,
+                                    List<IntentKey> replaceIntents,
+                                    List<IntentKey> updateIntents) {
+            for (IntentOperation oper : intentOperations.operations()) {
+                IntentId intentId;
+                IntentKey intentKey;
+                switch (oper.type()) {
+                case SUBMIT:
+                    intentKey = new IntentKey(oper.intent());
+                    submitIntents.add(intentKey);
+                    break;
+                case WITHDRAW:
+                    intentId = oper.intentId();
+                    withdrawIntentIds.add(intentId);
+                    break;
+                case REPLACE:
+                    intentKey = new IntentKey(oper.intent());
+                    replaceIntents.add(intentKey);
+                    break;
+                case UPDATE:
+                    intentKey = new IntentKey(oper.intent());
+                    updateIntents.add(intentKey);
+                    break;
+                default:
+                    break;
+                }
+            }
+        }
+    }
+
+    /**
+     * Matcher method to set an expected intent to match against (ignoring the
+     * the intent ID).
+     *
+     * @param intent the expected intent
+     * @return something of type IntentOperations
+     */
+    private static IntentOperations eqExceptId(
+                IntentOperations intentOperations) {
+        reportMatcher(new IdAgnosticIntentOperationsMatcher(intentOperations));
+        return intentOperations;
+    }
 }
diff --git a/apps/sdnip/src/test/java/org/onlab/onos/sdnip/PeerConnectivityManagerTest.java b/apps/sdnip/src/test/java/org/onlab/onos/sdnip/PeerConnectivityManagerTest.java
index d21bb2e..b7cdbc0 100644
--- a/apps/sdnip/src/test/java/org/onlab/onos/sdnip/PeerConnectivityManagerTest.java
+++ b/apps/sdnip/src/test/java/org/onlab/onos/sdnip/PeerConnectivityManagerTest.java
@@ -20,6 +20,8 @@
 import org.junit.Before;
 import org.junit.Ignore;
 import org.junit.Test;
+import org.onlab.junit.TestUtils;
+import org.onlab.junit.TestUtils.TestUtilsException;
 import org.onlab.onos.core.ApplicationId;
 import org.onlab.onos.net.ConnectPoint;
 import org.onlab.onos.net.DeviceId;
@@ -70,9 +72,10 @@
     };
 
     private PeerConnectivityManager peerConnectivityManager;
-    private IntentService intentService;
+    private IntentSynchronizer intentSynchronizer;
     private SdnIpConfigService configInfoService;
     private InterfaceService interfaceService;
+    private IntentService intentService;
 
     private Map<String, BgpSpeaker> bgpSpeakers;
     private Map<String, Interface> interfaces;
@@ -525,8 +528,10 @@
 
     /**
      * Initializes peer connectivity testing environment.
+     *
+     * @throws TestUtilsException if exceptions when using TestUtils
      */
-    private void initPeerConnectivity() {
+    private void initPeerConnectivity() throws TestUtilsException {
 
         configInfoService = createMock(SdnIpConfigService.class);
         expect(configInfoService.getBgpPeers()).andReturn(peers).anyTimes();
@@ -536,8 +541,13 @@
         intentService = createMock(IntentService.class);
         replay(intentService);
 
-        peerConnectivityManager = new PeerConnectivityManager(APPID, configInfoService,
-                interfaceService, intentService);
+        intentSynchronizer = new IntentSynchronizer(APPID, intentService);
+        intentSynchronizer.leaderChanged(true);
+        TestUtils.setField(intentSynchronizer, "isActivatedLeader", true);
+
+        peerConnectivityManager =
+            new PeerConnectivityManager(APPID, intentSynchronizer,
+                                        configInfoService, interfaceService);
     }
 
     /*
diff --git a/apps/sdnip/src/test/java/org/onlab/onos/sdnip/RouterTest.java b/apps/sdnip/src/test/java/org/onlab/onos/sdnip/RouterTest.java
index b5beb4a..db21d65 100644
--- a/apps/sdnip/src/test/java/org/onlab/onos/sdnip/RouterTest.java
+++ b/apps/sdnip/src/test/java/org/onlab/onos/sdnip/RouterTest.java
@@ -115,8 +115,8 @@
         intentService = createMock(IntentService.class);
 
         intentSynchronizer = new IntentSynchronizer(APPID, intentService);
-        router = new Router(APPID, intentSynchronizer,
-                hostService, sdnIpConfigService, interfaceService);
+        router = new Router(APPID, intentSynchronizer, sdnIpConfigService,
+                            interfaceService, hostService);
     }
 
     /**
@@ -267,8 +267,8 @@
         // Verify
         assertEquals(router.getRoutes().size(), 1);
         assertTrue(router.getRoutes().contains(routeEntry));
-        assertEquals(intentSynchronizer.getPushedRouteIntents().size(), 1);
-        assertEquals(intentSynchronizer.getPushedRouteIntents().iterator().next(),
+        assertEquals(intentSynchronizer.getRouteIntents().size(), 1);
+        assertEquals(intentSynchronizer.getRouteIntents().iterator().next(),
                 intent);
         verify(intentService);
     }
@@ -347,8 +347,8 @@
         // Verify
         assertEquals(router.getRoutes().size(), 1);
         assertTrue(router.getRoutes().contains(routeEntryUpdate));
-        assertEquals(intentSynchronizer.getPushedRouteIntents().size(), 1);
-        assertEquals(intentSynchronizer.getPushedRouteIntents().iterator().next(),
+        assertEquals(intentSynchronizer.getRouteIntents().size(), 1);
+        assertEquals(intentSynchronizer.getRouteIntents().iterator().next(),
                 intentNew);
         verify(intentService);
     }
@@ -397,7 +397,7 @@
 
         // Verify
         assertEquals(router.getRoutes().size(), 0);
-        assertEquals(intentSynchronizer.getPushedRouteIntents().size(), 0);
+        assertEquals(intentSynchronizer.getRouteIntents().size(), 0);
         verify(intentService);
     }
 
@@ -425,7 +425,7 @@
         // Verify
         assertEquals(router.getRoutes().size(), 1);
         assertTrue(router.getRoutes().contains(routeEntry));
-        assertEquals(intentSynchronizer.getPushedRouteIntents().size(), 0);
+        assertEquals(intentSynchronizer.getRouteIntents().size(), 0);
         verify(intentService);
     }
 }
diff --git a/apps/sdnip/src/test/java/org/onlab/onos/sdnip/RouterTestWithAsyncArp.java b/apps/sdnip/src/test/java/org/onlab/onos/sdnip/RouterTestWithAsyncArp.java
index 3f147cc..b4389c8 100644
--- a/apps/sdnip/src/test/java/org/onlab/onos/sdnip/RouterTestWithAsyncArp.java
+++ b/apps/sdnip/src/test/java/org/onlab/onos/sdnip/RouterTestWithAsyncArp.java
@@ -117,7 +117,7 @@
 
         intentSynchronizer = new IntentSynchronizer(APPID, intentService);
         router = new Router(APPID, intentSynchronizer,
-                hostService, sdnIpConfigService, interfaceService);
+                            sdnIpConfigService, interfaceService, hostService);
         internalHostListener = router.new InternalHostListener();
     }
 
@@ -229,8 +229,8 @@
         // Verify
         assertEquals(router.getRoutes().size(), 1);
         assertTrue(router.getRoutes().contains(routeEntry));
-        assertEquals(intentSynchronizer.getPushedRouteIntents().size(), 1);
-        assertEquals(intentSynchronizer.getPushedRouteIntents().iterator().next(),
+        assertEquals(intentSynchronizer.getRouteIntents().size(), 1);
+        assertEquals(intentSynchronizer.getRouteIntents().iterator().next(),
                      intent);
         verify(intentService);
         verify(hostService);
@@ -254,9 +254,9 @@
         MultiPointToSinglePointIntent intent = staticIntentBuilder();
 
         // Set up the bgpRoutes field of Router class with existing route, and
-        // pushedRouteIntents field with the corresponding existing intent
+        // routeIntents field with the corresponding existing intent
         setBgpRoutesField(routeEntry);
-        setPushedRouteIntentsField(routeEntry, intent);
+        setRouteIntentsField(routeEntry, intent);
 
         // Start to construct a new route entry and new intent
         RouteEntry routeEntryUpdate = new RouteEntry(
@@ -312,8 +312,8 @@
         // Verify
         assertEquals(router.getRoutes().size(), 1);
         assertTrue(router.getRoutes().contains(routeEntryUpdate));
-        assertEquals(intentSynchronizer.getPushedRouteIntents().size(), 1);
-        assertEquals(intentSynchronizer.getPushedRouteIntents().iterator().next(),
+        assertEquals(intentSynchronizer.getRouteIntents().size(), 1);
+        assertEquals(intentSynchronizer.getRouteIntents().iterator().next(),
                 intentNew);
         verify(intentService);
         verify(hostService);
@@ -334,9 +334,9 @@
         MultiPointToSinglePointIntent intent = staticIntentBuilder();
 
         // Set up the bgpRoutes field of Router class with existing route, and
-        // pushedRouteIntents field with the corresponding existing intent
+        // routeIntents field with the corresponding existing intent
         setBgpRoutesField(routeEntry);
-        setPushedRouteIntentsField(routeEntry, intent);
+        setRouteIntentsField(routeEntry, intent);
 
         // Set up expectation
         reset(intentService);
@@ -350,7 +350,7 @@
 
         // Verify
         assertEquals(router.getRoutes().size(), 0);
-        assertEquals(intentSynchronizer.getPushedRouteIntents().size(), 0);
+        assertEquals(intentSynchronizer.getRouteIntents().size(), 0);
         verify(intentService);
     }
 
@@ -397,17 +397,17 @@
     }
 
     /**
-     * Sets pushedRouteIntentsField in Router class.
+     * Sets routeIntentsField in IntentSynchronizer class.
      *
      * @throws TestUtilsException
      */
-    private void setPushedRouteIntentsField(RouteEntry routeEntry,
+    private void setRouteIntentsField(RouteEntry routeEntry,
             MultiPointToSinglePointIntent intent)
             throws TestUtilsException {
 
         ConcurrentHashMap<Ip4Prefix, MultiPointToSinglePointIntent>
-            pushedRouteIntents =  new ConcurrentHashMap<>();
-        pushedRouteIntents.put(routeEntry.prefix(), intent);
-        TestUtils.setField(router, "pushedRouteIntents", pushedRouteIntents);
+            routeIntents =  new ConcurrentHashMap<>();
+        routeIntents.put(routeEntry.prefix(), intent);
+        TestUtils.setField(intentSynchronizer, "routeIntents", routeIntents);
     }
 }
\ No newline at end of file
diff --git a/apps/sdnip/src/test/java/org/onlab/onos/sdnip/SdnIpTest.java b/apps/sdnip/src/test/java/org/onlab/onos/sdnip/SdnIpTest.java
index 0cf706a..f83f846 100644
--- a/apps/sdnip/src/test/java/org/onlab/onos/sdnip/SdnIpTest.java
+++ b/apps/sdnip/src/test/java/org/onlab/onos/sdnip/SdnIpTest.java
@@ -113,8 +113,8 @@
         random = new Random();
 
         intentSynchronizer = new IntentSynchronizer(APPID, intentService);
-        router = new Router(APPID, intentSynchronizer, hostService,
-                sdnIpConfigService, interfaceService);
+        router = new Router(APPID, intentSynchronizer, sdnIpConfigService,
+                            interfaceService, hostService);
     }
 
     /**
@@ -241,7 +241,7 @@
         latch.await(5000, TimeUnit.MILLISECONDS);
 
         assertEquals(router.getRoutes().size(), numRoutes);
-        assertEquals(intentSynchronizer.getPushedRouteIntents().size(),
+        assertEquals(intentSynchronizer.getRouteIntents().size(),
                      numRoutes);
 
         verify(intentService);
@@ -317,7 +317,7 @@
         deleteCount.await(5000, TimeUnit.MILLISECONDS);
 
         assertEquals(0, router.getRoutes().size());
-        assertEquals(0, intentSynchronizer.getPushedRouteIntents().size());
+        assertEquals(0, intentSynchronizer.getRouteIntents().size());
         verify(intentService);
     }
 
diff --git a/pom.xml b/pom.xml
index 508b681..eec0ad7 100644
--- a/pom.xml
+++ b/pom.xml
@@ -139,6 +139,12 @@
             </dependency>
 
             <dependency>
+                <groupId>org.apache.commons</groupId>
+                <artifactId>commons-collections4</artifactId>
+                <version>4.0</version>
+            </dependency>
+
+            <dependency>
                 <groupId>org.codehaus.jackson</groupId>
                 <artifactId>jackson-core-asl</artifactId>
                 <version>1.9.13</version>