[ONOS-5607] Revise LISP ctrl impl class to track msgs and routers
Change-Id: I4a51a8ef9162e3feee543f40fa92a0435186d1c9
diff --git a/protocols/lisp/ctl/src/main/java/org/onosproject/lisp/ctl/impl/LispControllerBootstrap.java b/protocols/lisp/ctl/src/main/java/org/onosproject/lisp/ctl/impl/LispControllerBootstrap.java
index 29af596..449983a 100644
--- a/protocols/lisp/ctl/src/main/java/org/onosproject/lisp/ctl/impl/LispControllerBootstrap.java
+++ b/protocols/lisp/ctl/src/main/java/org/onosproject/lisp/ctl/impl/LispControllerBootstrap.java
@@ -48,6 +48,7 @@
private EventLoopGroup eventLoopGroup;
private Class<? extends AbstractChannel> channelClass;
+ private List<ChannelFuture> channelFutures = Lists.newArrayList();
/**
* Stitches all channel handlers into server bootstrap.
@@ -59,8 +60,6 @@
configBootstrapOptions(bootstrap);
- List<ChannelFuture> channelFutures = Lists.newArrayList();
-
lispPorts.forEach(p -> {
InetSocketAddress sa = new InetSocketAddress(p);
channelFutures.add(bootstrap.bind(sa));
@@ -152,6 +151,7 @@
try {
// try to shutdown all open event groups
eventLoopGroup.shutdownGracefully().sync();
+ closeChannels(channelFutures);
} catch (InterruptedException e) {
log.warn("Failed to stop LISP controller. Reasons: {}.", e.getMessage());
}
diff --git a/protocols/lisp/ctl/src/main/java/org/onosproject/lisp/ctl/impl/LispControllerImpl.java b/protocols/lisp/ctl/src/main/java/org/onosproject/lisp/ctl/impl/LispControllerImpl.java
index 0d1bdf0..5d82527 100644
--- a/protocols/lisp/ctl/src/main/java/org/onosproject/lisp/ctl/impl/LispControllerImpl.java
+++ b/protocols/lisp/ctl/src/main/java/org/onosproject/lisp/ctl/impl/LispControllerImpl.java
@@ -15,6 +15,7 @@
*/
package org.onosproject.lisp.ctl.impl;
+import com.google.common.collect.Maps;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
@@ -29,18 +30,29 @@
import org.onosproject.lisp.ctl.LispController;
import org.onosproject.lisp.ctl.LispMessageListener;
import org.onosproject.lisp.ctl.LispRouter;
+import org.onosproject.lisp.ctl.LispRouterAgent;
import org.onosproject.lisp.ctl.LispRouterId;
import org.onosproject.lisp.ctl.LispRouterListener;
import org.onosproject.lisp.msg.authentication.LispAuthenticationConfig;
-import org.onosproject.net.device.DeviceService;
+import org.onosproject.lisp.msg.protocols.LispInfoReply;
+import org.onosproject.lisp.msg.protocols.LispInfoRequest;
+import org.onosproject.lisp.msg.protocols.LispMessage;
import org.osgi.service.component.ComponentContext;
import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.util.Dictionary;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.ExecutorService;
+import static java.util.concurrent.Executors.newFixedThreadPool;
+import static java.util.stream.Collectors.toConcurrentMap;
import static org.onlab.util.Tools.get;
import static org.onlab.util.Tools.getIntegerProperty;
+import static org.onlab.util.Tools.groupedThreads;
+import static org.slf4j.LoggerFactory.getLogger;
/**
* LISP controller initiation class.
@@ -51,8 +63,7 @@
private static final String APP_ID = "org.onosproject.lisp-base";
- private static final Logger log =
- LoggerFactory.getLogger(LispControllerImpl.class);
+ private static final Logger log = getLogger(LispControllerImpl.class);
private static final String DEFAULT_LISP_AUTH_KEY = "onos";
private static final short DEFAULT_LISP_AUTH_KEY_ID = 1;
@@ -61,45 +72,64 @@
protected CoreService coreService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected DeviceService deviceService;
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ComponentConfigService cfgService;
@Property(name = "lispAuthKey", value = DEFAULT_LISP_AUTH_KEY,
label = "Authentication key which is used to calculate authentication " +
"data for LISP control message; default value is onos")
- protected String lispAuthKey = DEFAULT_LISP_AUTH_KEY;
+ private String lispAuthKey = DEFAULT_LISP_AUTH_KEY;
@Property(name = "lispAuthKeyId", intValue = DEFAULT_LISP_AUTH_KEY_ID,
label = "Authentication key id which denotes the authentication method " +
"that ONOS uses to calculate the authentication data; " +
"1 denotes HMAC SHA1 encryption, 2 denotes HMAC SHA256 encryption; " +
"default value is 1")
- protected int lispAuthKeyId = DEFAULT_LISP_AUTH_KEY_ID;
+ private int lispAuthKeyId = DEFAULT_LISP_AUTH_KEY_ID;
+
+ ExecutorService executorMessages =
+ newFixedThreadPool(4, groupedThreads("onos/lisp", "event-stats-%d", log));
+
+ protected LispRouterAgent agent = new DefaultLispRouterAgent();
+
+ ConcurrentMap<LispRouterId, LispRouter> connectedRouters = Maps.newConcurrentMap();
+
+ private Set<LispRouterListener> lispRouterListeners = new CopyOnWriteArraySet<>();
+ private Set<LispMessageListener> lispMessageListeners = new CopyOnWriteArraySet<>();
private final LispControllerBootstrap bootstrap = new LispControllerBootstrap();
private final LispAuthenticationConfig authConfig = LispAuthenticationConfig.getInstance();
@Activate
public void activate(ComponentContext context) {
+ coreService.registerApplication(APP_ID, this::cleanup);
cfgService.registerProperties(getClass());
- coreService.registerApplication(APP_ID);
initAuthConfig(context.getProperties());
bootstrap.start();
log.info("Started");
}
+ /**
+ * Shutdowns all listening channel and all LISP channels.
+ * Clean information about routers before deactivating.
+ */
+ private void cleanup() {
+ bootstrap.stop();
+ connectedRouters.values().forEach(LispRouter::disconnectRouter);
+ connectedRouters.clear();
+ }
+
@Deactivate
public void deactivate() {
+ cleanup();
cfgService.unregisterProperties(getClass(), false);
- bootstrap.stop();
log.info("Stopped");
}
@Modified
public void modified(ComponentContext context) {
readComponentConfiguration(context);
+ bootstrap.stop();
+ bootstrap.start();
}
/**
@@ -138,31 +168,189 @@
@Override
public Iterable<LispRouter> getRouters() {
- return null;
+ return connectedRouters.values();
+ }
+
+ @Override
+ public Iterable<LispRouter> getSubscribedRouters() {
+ return connectedRouters.entrySet()
+ .stream()
+ .filter(e -> e.getValue().isSubscribed())
+ .collect(toConcurrentMap(Map.Entry::getKey,
+ Map.Entry::getValue)).values();
}
@Override
public LispRouter getRouter(LispRouterId routerId) {
- return null;
+ return connectedRouters.get(routerId);
}
@Override
public void addRouterListener(LispRouterListener listener) {
-
+ if (!lispRouterListeners.contains(listener)) {
+ lispRouterListeners.add(listener);
+ }
}
@Override
public void removeRouterListener(LispRouterListener listener) {
-
+ lispRouterListeners.remove(listener);
}
@Override
public void addMessageListener(LispMessageListener listener) {
-
+ if (!lispMessageListeners.contains(listener)) {
+ lispMessageListeners.add(listener);
+ }
}
@Override
public void removeMessageListener(LispMessageListener listener) {
+ lispMessageListeners.remove(listener);
+ }
+ /**
+ * Implementation of a LISP agent which is responsible for keeping track of
+ * connected LISP routers and the state in which they are in.
+ */
+ public final class DefaultLispRouterAgent implements LispRouterAgent {
+
+ private final Logger log = getLogger(DefaultLispRouterAgent.class);
+
+ /**
+ * Prevents object instantiation from external class.
+ */
+ private DefaultLispRouterAgent() {
+ }
+
+ @Override
+ public boolean addConnectedRouter(LispRouterId routerId, LispRouter router) {
+
+ if (connectedRouters.get(routerId) != null) {
+ log.error("Trying to add connectedRouter but found a previous " +
+ "value for routerId: {}", routerId);
+ return false;
+ } else {
+ log.info("Added router {}", routerId);
+ connectedRouters.put(routerId, router);
+ for (LispRouterListener listener : lispRouterListeners) {
+ listener.routerAdded(routerId);
+ }
+ return true;
+ }
+ }
+
+ @Override
+ public void removeConnectedRouter(LispRouterId routerId) {
+
+ if (connectedRouters.get(routerId) == null) {
+ log.error("Trying to remove router {} from connectedRouter " +
+ "list but no element was found", routerId);
+ } else {
+ log.info("Removed router {}", routerId);
+ connectedRouters.remove(routerId);
+ for (LispRouterListener listener : lispRouterListeners) {
+ listener.routerRemoved(routerId);
+ }
+ }
+ }
+
+ @Override
+ public void processUpstreamMessage(LispRouterId routerId, LispMessage message) {
+
+ switch (message.getType()) {
+ case LISP_MAP_REGISTER:
+ case LISP_MAP_REQUEST:
+ executorMessages.execute(
+ new LispIncomingMessageHandler(routerId, message));
+ break;
+ case LISP_INFO:
+ if (message instanceof LispInfoRequest) {
+ executorMessages.execute(
+ new LispIncomingMessageHandler(routerId, message));
+ } else {
+ log.warn("Not incoming LISP control message");
+ }
+ break;
+ default:
+ log.warn("Not incoming LISP control message");
+ break;
+ }
+ }
+
+ @Override
+ public void processDownstreamMessage(LispRouterId routerId, LispMessage message) {
+
+ switch (message.getType()) {
+ case LISP_MAP_NOTIFY:
+ case LISP_MAP_REPLY:
+ executorMessages.execute(
+ new LispOutgoingMessageHandler(routerId, message));
+ break;
+ case LISP_INFO:
+ if (message instanceof LispInfoReply) {
+ executorMessages.execute(
+ new LispOutgoingMessageHandler(routerId, message));
+ } else {
+ log.warn("Not outgoing LISP control message");
+ }
+ break;
+ default:
+ log.warn("Not outgoing LISP control message");
+ break;
+ }
+ }
+ }
+
+ /**
+ * LISP message handler.
+ */
+ protected class LispMessageHandler implements Runnable {
+
+ private final LispRouterId routerId;
+ private final LispMessage message;
+ private final boolean isIncoming;
+
+ LispMessageHandler(LispRouterId routerId,
+ LispMessage message, boolean isIncoming) {
+ this.routerId = routerId;
+ this.message = message;
+ this.isIncoming = isIncoming;
+ }
+
+ @Override
+ public void run() {
+ for (LispMessageListener listener : lispMessageListeners) {
+ if (isIncoming) {
+ listener.handleIncomingMessage(routerId, message);
+ } else {
+ listener.handleOutgoingMessage(routerId, message);
+ }
+ }
+ }
+ }
+
+ /**
+ * LISP incoming message handler.
+ */
+ protected final class LispIncomingMessageHandler
+ extends LispMessageHandler implements Runnable {
+
+ LispIncomingMessageHandler(LispRouterId routerId,
+ LispMessage message) {
+ super(routerId, message, true);
+ }
+ }
+
+ /**
+ * LISP outgoing message handler.
+ */
+ protected final class LispOutgoingMessageHandler
+ extends LispMessageHandler implements Runnable {
+
+ LispOutgoingMessageHandler(LispRouterId routerId,
+ LispMessage message) {
+ super(routerId, message, false);
+ }
}
}