Extend FPM module to handle routes from multiple peers.
This has the side-effect of fixing a bug when the same peer changes its
route advertisement for a particular prefix.
Change-Id: I09af3baf0a7741919be2a2986112db6db2556666
diff --git a/apps/routing/fpm/src/main/java/org/onosproject/routing/fpm/FpmInfoService.java b/apps/routing/fpm/src/main/java/org/onosproject/routing/fpm/FpmInfoService.java
index 018b1bb..cd24cc6 100644
--- a/apps/routing/fpm/src/main/java/org/onosproject/routing/fpm/FpmInfoService.java
+++ b/apps/routing/fpm/src/main/java/org/onosproject/routing/fpm/FpmInfoService.java
@@ -16,13 +16,17 @@
package org.onosproject.routing.fpm;
-import java.net.SocketAddress;
import java.util.Map;
/**
- * Created by jono on 2/2/16.
+ * Provides information about the FPM route receiver module.
*/
public interface FpmInfoService {
- Map<SocketAddress, Long> peers();
+ /**
+ * Returns the FPM peers that are currently connected.
+ *
+ * @return a map of FPM peer to connection time.
+ */
+ Map<FpmPeer, Long> peers();
}
diff --git a/apps/routing/fpm/src/main/java/org/onosproject/routing/fpm/FpmListener.java b/apps/routing/fpm/src/main/java/org/onosproject/routing/fpm/FpmListener.java
index 37fe1f7..3092ae0 100644
--- a/apps/routing/fpm/src/main/java/org/onosproject/routing/fpm/FpmListener.java
+++ b/apps/routing/fpm/src/main/java/org/onosproject/routing/fpm/FpmListener.java
@@ -18,8 +18,6 @@
import org.onosproject.routing.fpm.protocol.FpmHeader;
-import java.net.SocketAddress;
-
/**
* Listener for events from the route source.
*/
@@ -28,22 +26,23 @@
/**
* Handles an FPM message.
*
+ * @param peer FPM peer
* @param fpmMessage FPM message
*/
- void fpmMessage(FpmHeader fpmMessage);
+ void fpmMessage(FpmPeer peer, FpmHeader fpmMessage);
/**
* Signifies that a new peer has attempted to initiate an FPM connection.
*
- * @param address remote address of the peer
+ * @param peer FPM peer
* @return true if the connection should be admitted, otherwise false
*/
- boolean peerConnected(SocketAddress address);
+ boolean peerConnected(FpmPeer peer);
/**
* Signifies that an FPM connection has been disconnected.
*
- * @param address remote address of the peer
+ * @param peer FPM peer
*/
- void peerDisconnected(SocketAddress address);
+ void peerDisconnected(FpmPeer peer);
}
diff --git a/apps/routing/fpm/src/main/java/org/onosproject/routing/fpm/FpmManager.java b/apps/routing/fpm/src/main/java/org/onosproject/routing/fpm/FpmManager.java
index bb1448f..9019960 100644
--- a/apps/routing/fpm/src/main/java/org/onosproject/routing/fpm/FpmManager.java
+++ b/apps/routing/fpm/src/main/java/org/onosproject/routing/fpm/FpmManager.java
@@ -16,7 +16,6 @@
package org.onosproject.routing.fpm;
-import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
@@ -54,7 +53,6 @@
import org.slf4j.LoggerFactory;
import java.net.InetSocketAddress;
-import java.net.SocketAddress;
import java.util.Dictionary;
import java.util.LinkedList;
import java.util.List;
@@ -84,9 +82,9 @@
private Channel serverChannel;
private ChannelGroup allChannels = new DefaultChannelGroup();
- private Map<SocketAddress, Long> peers = new ConcurrentHashMap<>();
+ private Map<FpmPeer, Long> peers = new ConcurrentHashMap<>();
- private Map<IpPrefix, Route> fpmRoutes = new ConcurrentHashMap<>();
+ private Map<FpmPeer, Map<IpPrefix, Route>> fpmRoutes = new ConcurrentHashMap<>();
@Property(name = "clearRoutes", boolValue = true,
label = "Whether to clear routes when the FPM connection goes down")
@@ -171,11 +169,11 @@
}
if (clearRoutes) {
- clearRoutes();
+ peers.keySet().forEach(this::clearRoutes);
}
}
- private void fpmMessage(FpmHeader fpmMessage) {
+ private void fpmMessage(FpmPeer peer, FpmHeader fpmMessage) {
Netlink netlink = fpmMessage.netlink();
RtNetlink rtNetlink = netlink.rtNetlink();
@@ -221,12 +219,17 @@
}
route = new Route(Route.Source.FPM, prefix, gateway);
- fpmRoutes.put(prefix, route);
+ Route oldRoute = fpmRoutes.get(peer).put(prefix, route);
+
+ if (oldRoute != null) {
+ log.trace("Swapping {} with {}", oldRoute, route);
+ withdraws.add(oldRoute);
+ }
updates.add(route);
break;
case RTM_DELROUTE:
- Route existing = fpmRoutes.remove(prefix);
+ Route existing = fpmRoutes.get(peer).remove(prefix);
if (existing == null) {
log.warn("Got delete for non-existent prefix");
return;
@@ -246,41 +249,45 @@
}
- private void clearRoutes() {
- log.info("Clearing all routes");
- routeService.withdraw(ImmutableList.copyOf(fpmRoutes.values()));
+ private void clearRoutes(FpmPeer peer) {
+ log.info("Clearing all routes for peer {}", peer);
+ Map<IpPrefix, Route> routes = fpmRoutes.remove(peer);
+ if (routes != null) {
+ routeService.withdraw(routes.values());
+ }
}
@Override
- public Map<SocketAddress, Long> peers() {
+ public Map<FpmPeer, Long> peers() {
return ImmutableMap.copyOf(peers);
}
private class InternalFpmListener implements FpmListener {
@Override
- public void fpmMessage(FpmHeader fpmMessage) {
- FpmManager.this.fpmMessage(fpmMessage);
+ public void fpmMessage(FpmPeer peer, FpmHeader fpmMessage) {
+ FpmManager.this.fpmMessage(peer, fpmMessage);
}
@Override
- public boolean peerConnected(SocketAddress address) {
- if (peers.keySet().contains(address)) {
+ public boolean peerConnected(FpmPeer peer) {
+ if (peers.keySet().contains(peer)) {
return false;
}
- peers.put(address, System.currentTimeMillis());
+ peers.put(peer, System.currentTimeMillis());
+ fpmRoutes.computeIfAbsent(peer, p -> new ConcurrentHashMap<>());
return true;
}
@Override
- public void peerDisconnected(SocketAddress address) {
- log.info("FPM connection to {} went down", address);
+ public void peerDisconnected(FpmPeer peer) {
+ log.info("FPM connection to {} went down", peer);
if (clearRoutes) {
- clearRoutes();
+ clearRoutes(peer);
}
- peers.remove(address);
+ peers.remove(peer);
}
}
diff --git a/apps/routing/fpm/src/main/java/org/onosproject/routing/fpm/FpmPeer.java b/apps/routing/fpm/src/main/java/org/onosproject/routing/fpm/FpmPeer.java
new file mode 100644
index 0000000..d9a11fc
--- /dev/null
+++ b/apps/routing/fpm/src/main/java/org/onosproject/routing/fpm/FpmPeer.java
@@ -0,0 +1,95 @@
+/*
+ * Copyright 2017-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.routing.fpm;
+
+import org.onlab.packet.IpAddress;
+
+import java.net.InetSocketAddress;
+import java.util.Objects;
+
+import static com.google.common.base.MoreObjects.toStringHelper;
+
+/**
+ * Represents an FPM peer.
+ */
+public class FpmPeer {
+
+ private final IpAddress address;
+ private final int port;
+
+ /**
+ * Creates a new FPM peer.
+ *
+ * @param address peer IP address
+ * @param port peer TCP port number
+ */
+ public FpmPeer(IpAddress address, int port) {
+ this.address = address;
+ this.port = port;
+ }
+
+ /**
+ * Returns the peers IP address.
+ *
+ * @return IP address
+ */
+ public IpAddress address() {
+ return address;
+ }
+
+ /**
+ * Returns the peer port number.
+ *
+ * @return port number
+ */
+ public int port() {
+ return port;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(address, port);
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (this == other) {
+ return true;
+ }
+
+ if (!(other instanceof FpmPeer)) {
+ return false;
+ }
+
+ FpmPeer that = (FpmPeer) other;
+
+ return Objects.equals(this.address, that.address) &&
+ Objects.equals(this.port, that.port);
+ }
+
+ @Override
+ public String toString() {
+ return toStringHelper(this)
+ .add("address", address)
+ .add("port", port)
+ .toString();
+ }
+
+ public static FpmPeer fromSocketAddress(InetSocketAddress address) {
+ return new FpmPeer(IpAddress.valueOf(address.getAddress()), address.getPort());
+ }
+}
diff --git a/apps/routing/fpm/src/main/java/org/onosproject/routing/fpm/FpmSessionHandler.java b/apps/routing/fpm/src/main/java/org/onosproject/routing/fpm/FpmSessionHandler.java
index 33f845e..f6e982f 100644
--- a/apps/routing/fpm/src/main/java/org/onosproject/routing/fpm/FpmSessionHandler.java
+++ b/apps/routing/fpm/src/main/java/org/onosproject/routing/fpm/FpmSessionHandler.java
@@ -26,6 +26,9 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+
import static com.google.common.base.Preconditions.checkNotNull;
/**
@@ -38,6 +41,7 @@
private final FpmListener fpmListener;
private Channel channel;
+ private FpmPeer us;
/**
* Class constructor.
@@ -52,7 +56,7 @@
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
throws Exception {
FpmHeader fpmMessage = (FpmHeader) e.getMessage();
- fpmListener.fpmMessage(fpmMessage);
+ fpmListener.fpmMessage(us, fpmMessage);
}
@Override
@@ -68,7 +72,15 @@
@Override
public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent e)
throws Exception {
- if (!fpmListener.peerConnected(ctx.getChannel().getRemoteAddress())) {
+ SocketAddress socketAddress = ctx.getChannel().getRemoteAddress();
+
+ if (!(socketAddress instanceof InetSocketAddress)) {
+ throw new IllegalStateException("Address type is not InetSocketAddress");
+ }
+
+ us = FpmPeer.fromSocketAddress((InetSocketAddress) socketAddress);
+
+ if (!fpmListener.peerConnected(us)) {
log.error("Received new FPM connection while already connected");
ctx.getChannel().close();
return;
@@ -94,7 +106,9 @@
}
private void handleDisconnect() {
- fpmListener.peerDisconnected(channel.getRemoteAddress());
+ if (us != null) {
+ fpmListener.peerDisconnected(us);
+ }
channel = null;
}
}
diff --git a/apps/routing/fpm/src/main/java/org/onosproject/routing/fpm/cli/FpmConnectionsList.java b/apps/routing/fpm/src/main/java/org/onosproject/routing/fpm/cli/FpmConnectionsList.java
index 86dd44c..5a398b8 100644
--- a/apps/routing/fpm/src/main/java/org/onosproject/routing/fpm/cli/FpmConnectionsList.java
+++ b/apps/routing/fpm/src/main/java/org/onosproject/routing/fpm/cli/FpmConnectionsList.java
@@ -21,8 +21,6 @@
import org.onosproject.cli.AbstractShellCommand;
import org.onosproject.routing.fpm.FpmInfoService;
-import java.net.InetSocketAddress;
-
/**
* Displays the current FPM connections.
*/
@@ -36,14 +34,8 @@
protected void execute() {
FpmInfoService fpmInfo = AbstractShellCommand.get(FpmInfoService.class);
- fpmInfo.peers().forEach((socketAddress, timestamp) -> {
- if (socketAddress instanceof InetSocketAddress) {
- InetSocketAddress inet = (InetSocketAddress) socketAddress;
-
- print(FORMAT, inet.getHostString(), inet.getPort(), Tools.timeAgo(timestamp));
- } else {
- print("Unknown data format");
- }
+ fpmInfo.peers().forEach((peer, timestamp) -> {
+ print(FORMAT, peer.address(), peer.port(), Tools.timeAgo(timestamp));
});
}
}