Extend FPM module to handle routes from multiple peers.

This has the side-effect of fixing a bug when the same peer changes its
route advertisement for a particular prefix.

Change-Id: I09af3baf0a7741919be2a2986112db6db2556666
diff --git a/apps/routing/fpm/src/main/java/org/onosproject/routing/fpm/FpmInfoService.java b/apps/routing/fpm/src/main/java/org/onosproject/routing/fpm/FpmInfoService.java
index 018b1bb..cd24cc6 100644
--- a/apps/routing/fpm/src/main/java/org/onosproject/routing/fpm/FpmInfoService.java
+++ b/apps/routing/fpm/src/main/java/org/onosproject/routing/fpm/FpmInfoService.java
@@ -16,13 +16,17 @@
 
 package org.onosproject.routing.fpm;
 
-import java.net.SocketAddress;
 import java.util.Map;
 
 /**
- * Created by jono on 2/2/16.
+ * Provides information about the FPM route receiver module.
  */
 public interface FpmInfoService {
 
-    Map<SocketAddress, Long> peers();
+    /**
+     * Returns the FPM peers that are currently connected.
+     *
+     * @return a map of FPM peer to connection time.
+     */
+    Map<FpmPeer, Long> peers();
 }
diff --git a/apps/routing/fpm/src/main/java/org/onosproject/routing/fpm/FpmListener.java b/apps/routing/fpm/src/main/java/org/onosproject/routing/fpm/FpmListener.java
index 37fe1f7..3092ae0 100644
--- a/apps/routing/fpm/src/main/java/org/onosproject/routing/fpm/FpmListener.java
+++ b/apps/routing/fpm/src/main/java/org/onosproject/routing/fpm/FpmListener.java
@@ -18,8 +18,6 @@
 
 import org.onosproject.routing.fpm.protocol.FpmHeader;
 
-import java.net.SocketAddress;
-
 /**
  * Listener for events from the route source.
  */
@@ -28,22 +26,23 @@
     /**
      * Handles an FPM message.
      *
+     * @param peer FPM peer
      * @param fpmMessage FPM message
      */
-    void fpmMessage(FpmHeader fpmMessage);
+    void fpmMessage(FpmPeer peer, FpmHeader fpmMessage);
 
     /**
      * Signifies that a new peer has attempted to initiate an FPM connection.
      *
-     * @param address remote address of the peer
+     * @param peer FPM peer
      * @return true if the connection should be admitted, otherwise false
      */
-    boolean peerConnected(SocketAddress address);
+    boolean peerConnected(FpmPeer peer);
 
     /**
      * Signifies that an FPM connection has been disconnected.
      *
-     * @param address remote address of the peer
+     * @param peer FPM peer
      */
-    void peerDisconnected(SocketAddress address);
+    void peerDisconnected(FpmPeer peer);
 }
diff --git a/apps/routing/fpm/src/main/java/org/onosproject/routing/fpm/FpmManager.java b/apps/routing/fpm/src/main/java/org/onosproject/routing/fpm/FpmManager.java
index bb1448f..9019960 100644
--- a/apps/routing/fpm/src/main/java/org/onosproject/routing/fpm/FpmManager.java
+++ b/apps/routing/fpm/src/main/java/org/onosproject/routing/fpm/FpmManager.java
@@ -16,7 +16,6 @@
 
 package org.onosproject.routing.fpm;
 
-import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
@@ -54,7 +53,6 @@
 import org.slf4j.LoggerFactory;
 
 import java.net.InetSocketAddress;
-import java.net.SocketAddress;
 import java.util.Dictionary;
 import java.util.LinkedList;
 import java.util.List;
@@ -84,9 +82,9 @@
     private Channel serverChannel;
     private ChannelGroup allChannels = new DefaultChannelGroup();
 
-    private Map<SocketAddress, Long> peers = new ConcurrentHashMap<>();
+    private Map<FpmPeer, Long> peers = new ConcurrentHashMap<>();
 
-    private Map<IpPrefix, Route> fpmRoutes = new ConcurrentHashMap<>();
+    private Map<FpmPeer, Map<IpPrefix, Route>> fpmRoutes = new ConcurrentHashMap<>();
 
     @Property(name = "clearRoutes", boolValue = true,
             label = "Whether to clear routes when the FPM connection goes down")
@@ -171,11 +169,11 @@
         }
 
         if (clearRoutes) {
-            clearRoutes();
+            peers.keySet().forEach(this::clearRoutes);
         }
     }
 
-    private void fpmMessage(FpmHeader fpmMessage) {
+    private void fpmMessage(FpmPeer peer, FpmHeader fpmMessage) {
         Netlink netlink = fpmMessage.netlink();
         RtNetlink rtNetlink = netlink.rtNetlink();
 
@@ -221,12 +219,17 @@
             }
             route = new Route(Route.Source.FPM, prefix, gateway);
 
-            fpmRoutes.put(prefix, route);
 
+            Route oldRoute = fpmRoutes.get(peer).put(prefix, route);
+
+            if (oldRoute != null) {
+                log.trace("Swapping {} with {}", oldRoute, route);
+                withdraws.add(oldRoute);
+            }
             updates.add(route);
             break;
         case RTM_DELROUTE:
-            Route existing = fpmRoutes.remove(prefix);
+            Route existing = fpmRoutes.get(peer).remove(prefix);
             if (existing == null) {
                 log.warn("Got delete for non-existent prefix");
                 return;
@@ -246,41 +249,45 @@
     }
 
 
-    private void clearRoutes() {
-        log.info("Clearing all routes");
-        routeService.withdraw(ImmutableList.copyOf(fpmRoutes.values()));
+    private void clearRoutes(FpmPeer peer) {
+        log.info("Clearing all routes for peer {}", peer);
+        Map<IpPrefix, Route> routes = fpmRoutes.remove(peer);
+        if (routes != null) {
+            routeService.withdraw(routes.values());
+        }
     }
 
     @Override
-    public Map<SocketAddress, Long> peers() {
+    public Map<FpmPeer, Long> peers() {
         return ImmutableMap.copyOf(peers);
     }
 
     private class InternalFpmListener implements FpmListener {
         @Override
-        public void fpmMessage(FpmHeader fpmMessage) {
-            FpmManager.this.fpmMessage(fpmMessage);
+        public void fpmMessage(FpmPeer peer, FpmHeader fpmMessage) {
+            FpmManager.this.fpmMessage(peer, fpmMessage);
         }
 
         @Override
-        public boolean peerConnected(SocketAddress address) {
-            if (peers.keySet().contains(address)) {
+        public boolean peerConnected(FpmPeer peer) {
+            if (peers.keySet().contains(peer)) {
                 return false;
             }
 
-            peers.put(address, System.currentTimeMillis());
+            peers.put(peer, System.currentTimeMillis());
+            fpmRoutes.computeIfAbsent(peer, p -> new ConcurrentHashMap<>());
             return true;
         }
 
         @Override
-        public void peerDisconnected(SocketAddress address) {
-            log.info("FPM connection to {} went down", address);
+        public void peerDisconnected(FpmPeer peer) {
+            log.info("FPM connection to {} went down", peer);
 
             if (clearRoutes) {
-                clearRoutes();
+                clearRoutes(peer);
             }
 
-            peers.remove(address);
+            peers.remove(peer);
         }
     }
 
diff --git a/apps/routing/fpm/src/main/java/org/onosproject/routing/fpm/FpmPeer.java b/apps/routing/fpm/src/main/java/org/onosproject/routing/fpm/FpmPeer.java
new file mode 100644
index 0000000..d9a11fc
--- /dev/null
+++ b/apps/routing/fpm/src/main/java/org/onosproject/routing/fpm/FpmPeer.java
@@ -0,0 +1,95 @@
+/*
+ * Copyright 2017-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.routing.fpm;
+
+import org.onlab.packet.IpAddress;
+
+import java.net.InetSocketAddress;
+import java.util.Objects;
+
+import static com.google.common.base.MoreObjects.toStringHelper;
+
+/**
+ * Represents an FPM peer.
+ */
+public class FpmPeer {
+
+    private final IpAddress address;
+    private final int port;
+
+    /**
+     * Creates a new FPM peer.
+     *
+     * @param address peer IP address
+     * @param port peer TCP port number
+     */
+    public FpmPeer(IpAddress address, int port) {
+        this.address = address;
+        this.port = port;
+    }
+
+    /**
+     * Returns the peers IP address.
+     *
+     * @return IP address
+     */
+    public IpAddress address() {
+        return address;
+    }
+
+    /**
+     * Returns the peer port number.
+     *
+     * @return port number
+     */
+    public int port() {
+        return port;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(address, port);
+    }
+
+    @Override
+    public boolean equals(Object other) {
+        if (this == other) {
+            return true;
+        }
+
+        if (!(other instanceof FpmPeer)) {
+            return false;
+        }
+
+        FpmPeer that = (FpmPeer) other;
+
+        return Objects.equals(this.address, that.address) &&
+                Objects.equals(this.port, that.port);
+    }
+
+    @Override
+    public String toString() {
+        return toStringHelper(this)
+                .add("address", address)
+                .add("port", port)
+                .toString();
+    }
+
+    public static FpmPeer fromSocketAddress(InetSocketAddress address) {
+        return new FpmPeer(IpAddress.valueOf(address.getAddress()), address.getPort());
+    }
+}
diff --git a/apps/routing/fpm/src/main/java/org/onosproject/routing/fpm/FpmSessionHandler.java b/apps/routing/fpm/src/main/java/org/onosproject/routing/fpm/FpmSessionHandler.java
index 33f845e..f6e982f 100644
--- a/apps/routing/fpm/src/main/java/org/onosproject/routing/fpm/FpmSessionHandler.java
+++ b/apps/routing/fpm/src/main/java/org/onosproject/routing/fpm/FpmSessionHandler.java
@@ -26,6 +26,9 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+
 import static com.google.common.base.Preconditions.checkNotNull;
 
 /**
@@ -38,6 +41,7 @@
     private final FpmListener fpmListener;
 
     private Channel channel;
+    private FpmPeer us;
 
     /**
      * Class constructor.
@@ -52,7 +56,7 @@
     public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
             throws Exception {
         FpmHeader fpmMessage = (FpmHeader) e.getMessage();
-        fpmListener.fpmMessage(fpmMessage);
+        fpmListener.fpmMessage(us, fpmMessage);
     }
 
     @Override
@@ -68,7 +72,15 @@
     @Override
     public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent e)
             throws Exception {
-        if (!fpmListener.peerConnected(ctx.getChannel().getRemoteAddress())) {
+        SocketAddress socketAddress = ctx.getChannel().getRemoteAddress();
+
+        if (!(socketAddress instanceof InetSocketAddress)) {
+            throw new IllegalStateException("Address type is not InetSocketAddress");
+        }
+
+        us = FpmPeer.fromSocketAddress((InetSocketAddress) socketAddress);
+
+        if (!fpmListener.peerConnected(us)) {
             log.error("Received new FPM connection while already connected");
             ctx.getChannel().close();
             return;
@@ -94,7 +106,9 @@
     }
 
     private void handleDisconnect() {
-        fpmListener.peerDisconnected(channel.getRemoteAddress());
+        if (us != null) {
+            fpmListener.peerDisconnected(us);
+        }
         channel = null;
     }
 }
diff --git a/apps/routing/fpm/src/main/java/org/onosproject/routing/fpm/cli/FpmConnectionsList.java b/apps/routing/fpm/src/main/java/org/onosproject/routing/fpm/cli/FpmConnectionsList.java
index 86dd44c..5a398b8 100644
--- a/apps/routing/fpm/src/main/java/org/onosproject/routing/fpm/cli/FpmConnectionsList.java
+++ b/apps/routing/fpm/src/main/java/org/onosproject/routing/fpm/cli/FpmConnectionsList.java
@@ -21,8 +21,6 @@
 import org.onosproject.cli.AbstractShellCommand;
 import org.onosproject.routing.fpm.FpmInfoService;
 
-import java.net.InetSocketAddress;
-
 /**
  * Displays the current FPM connections.
  */
@@ -36,14 +34,8 @@
     protected void execute() {
         FpmInfoService fpmInfo = AbstractShellCommand.get(FpmInfoService.class);
 
-        fpmInfo.peers().forEach((socketAddress, timestamp) -> {
-            if (socketAddress instanceof InetSocketAddress) {
-                InetSocketAddress inet = (InetSocketAddress) socketAddress;
-
-                print(FORMAT, inet.getHostString(), inet.getPort(), Tools.timeAgo(timestamp));
-            } else {
-                print("Unknown data format");
-            }
+        fpmInfo.peers().forEach((peer, timestamp) -> {
+            print(FORMAT, peer.address(), peer.port(), Tools.timeAgo(timestamp));
         });
     }
 }