[ONOS-5607] Revise LISP ctrl impl class to track msgs and routers

Change-Id: I4a51a8ef9162e3feee543f40fa92a0435186d1c9
diff --git a/protocols/lisp/ctl/src/main/java/org/onosproject/lisp/ctl/impl/LispControllerBootstrap.java b/protocols/lisp/ctl/src/main/java/org/onosproject/lisp/ctl/impl/LispControllerBootstrap.java
index 29af596..449983a 100644
--- a/protocols/lisp/ctl/src/main/java/org/onosproject/lisp/ctl/impl/LispControllerBootstrap.java
+++ b/protocols/lisp/ctl/src/main/java/org/onosproject/lisp/ctl/impl/LispControllerBootstrap.java
@@ -48,6 +48,7 @@
 
     private EventLoopGroup eventLoopGroup;
     private Class<? extends AbstractChannel> channelClass;
+    private List<ChannelFuture> channelFutures = Lists.newArrayList();
 
     /**
      * Stitches all channel handlers into server bootstrap.
@@ -59,8 +60,6 @@
 
             configBootstrapOptions(bootstrap);
 
-            List<ChannelFuture> channelFutures = Lists.newArrayList();
-
             lispPorts.forEach(p -> {
                 InetSocketAddress sa = new InetSocketAddress(p);
                 channelFutures.add(bootstrap.bind(sa));
@@ -152,6 +151,7 @@
         try {
             // try to shutdown all open event groups
             eventLoopGroup.shutdownGracefully().sync();
+            closeChannels(channelFutures);
         } catch (InterruptedException e) {
             log.warn("Failed to stop LISP controller. Reasons: {}.", e.getMessage());
         }
diff --git a/protocols/lisp/ctl/src/main/java/org/onosproject/lisp/ctl/impl/LispControllerImpl.java b/protocols/lisp/ctl/src/main/java/org/onosproject/lisp/ctl/impl/LispControllerImpl.java
index 0d1bdf0..5d82527 100644
--- a/protocols/lisp/ctl/src/main/java/org/onosproject/lisp/ctl/impl/LispControllerImpl.java
+++ b/protocols/lisp/ctl/src/main/java/org/onosproject/lisp/ctl/impl/LispControllerImpl.java
@@ -15,6 +15,7 @@
  */
 package org.onosproject.lisp.ctl.impl;
 
+import com.google.common.collect.Maps;
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
@@ -29,18 +30,29 @@
 import org.onosproject.lisp.ctl.LispController;
 import org.onosproject.lisp.ctl.LispMessageListener;
 import org.onosproject.lisp.ctl.LispRouter;
+import org.onosproject.lisp.ctl.LispRouterAgent;
 import org.onosproject.lisp.ctl.LispRouterId;
 import org.onosproject.lisp.ctl.LispRouterListener;
 import org.onosproject.lisp.msg.authentication.LispAuthenticationConfig;
-import org.onosproject.net.device.DeviceService;
+import org.onosproject.lisp.msg.protocols.LispInfoReply;
+import org.onosproject.lisp.msg.protocols.LispInfoRequest;
+import org.onosproject.lisp.msg.protocols.LispMessage;
 import org.osgi.service.component.ComponentContext;
 import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.util.Dictionary;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.ExecutorService;
 
+import static java.util.concurrent.Executors.newFixedThreadPool;
+import static java.util.stream.Collectors.toConcurrentMap;
 import static org.onlab.util.Tools.get;
 import static org.onlab.util.Tools.getIntegerProperty;
+import static org.onlab.util.Tools.groupedThreads;
+import static org.slf4j.LoggerFactory.getLogger;
 
 /**
  * LISP controller initiation class.
@@ -51,8 +63,7 @@
 
     private static final String APP_ID = "org.onosproject.lisp-base";
 
-    private static final Logger log =
-            LoggerFactory.getLogger(LispControllerImpl.class);
+    private static final Logger log = getLogger(LispControllerImpl.class);
 
     private static final String DEFAULT_LISP_AUTH_KEY = "onos";
     private static final short DEFAULT_LISP_AUTH_KEY_ID = 1;
@@ -61,45 +72,64 @@
     protected CoreService coreService;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
-    protected DeviceService deviceService;
-
-    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected ComponentConfigService cfgService;
 
     @Property(name = "lispAuthKey", value = DEFAULT_LISP_AUTH_KEY,
             label = "Authentication key which is used to calculate authentication " +
                     "data for LISP control message; default value is onos")
-    protected String lispAuthKey = DEFAULT_LISP_AUTH_KEY;
+    private String lispAuthKey = DEFAULT_LISP_AUTH_KEY;
 
     @Property(name = "lispAuthKeyId", intValue = DEFAULT_LISP_AUTH_KEY_ID,
             label = "Authentication key id which denotes the authentication method " +
                     "that ONOS uses to calculate the authentication data; " +
                     "1 denotes HMAC SHA1 encryption, 2 denotes HMAC SHA256 encryption; " +
                     "default value is 1")
-    protected int lispAuthKeyId = DEFAULT_LISP_AUTH_KEY_ID;
+    private int lispAuthKeyId = DEFAULT_LISP_AUTH_KEY_ID;
+
+    ExecutorService executorMessages =
+            newFixedThreadPool(4, groupedThreads("onos/lisp", "event-stats-%d", log));
+
+    protected LispRouterAgent agent = new DefaultLispRouterAgent();
+
+    ConcurrentMap<LispRouterId, LispRouter> connectedRouters = Maps.newConcurrentMap();
+
+    private Set<LispRouterListener> lispRouterListeners = new CopyOnWriteArraySet<>();
+    private Set<LispMessageListener> lispMessageListeners = new CopyOnWriteArraySet<>();
 
     private final LispControllerBootstrap bootstrap = new LispControllerBootstrap();
     private final LispAuthenticationConfig authConfig = LispAuthenticationConfig.getInstance();
 
     @Activate
     public void activate(ComponentContext context) {
+        coreService.registerApplication(APP_ID, this::cleanup);
         cfgService.registerProperties(getClass());
-        coreService.registerApplication(APP_ID);
         initAuthConfig(context.getProperties());
         bootstrap.start();
         log.info("Started");
     }
 
+    /**
+     * Shutdowns all listening channel and all LISP channels.
+     * Clean information about routers before deactivating.
+     */
+    private void cleanup() {
+        bootstrap.stop();
+        connectedRouters.values().forEach(LispRouter::disconnectRouter);
+        connectedRouters.clear();
+    }
+
     @Deactivate
     public void deactivate() {
+        cleanup();
         cfgService.unregisterProperties(getClass(), false);
-        bootstrap.stop();
         log.info("Stopped");
     }
 
     @Modified
     public void modified(ComponentContext context) {
         readComponentConfiguration(context);
+        bootstrap.stop();
+        bootstrap.start();
     }
 
     /**
@@ -138,31 +168,189 @@
 
     @Override
     public Iterable<LispRouter> getRouters() {
-        return null;
+        return connectedRouters.values();
+    }
+
+    @Override
+    public Iterable<LispRouter> getSubscribedRouters() {
+        return connectedRouters.entrySet()
+                                .stream()
+                                .filter(e -> e.getValue().isSubscribed())
+                                .collect(toConcurrentMap(Map.Entry::getKey,
+                                         Map.Entry::getValue)).values();
     }
 
     @Override
     public LispRouter getRouter(LispRouterId routerId) {
-        return null;
+        return connectedRouters.get(routerId);
     }
 
     @Override
     public void addRouterListener(LispRouterListener listener) {
-
+        if (!lispRouterListeners.contains(listener)) {
+            lispRouterListeners.add(listener);
+        }
     }
 
     @Override
     public void removeRouterListener(LispRouterListener listener) {
-
+        lispRouterListeners.remove(listener);
     }
 
     @Override
     public void addMessageListener(LispMessageListener listener) {
-
+        if (!lispMessageListeners.contains(listener)) {
+            lispMessageListeners.add(listener);
+        }
     }
 
     @Override
     public void removeMessageListener(LispMessageListener listener) {
+        lispMessageListeners.remove(listener);
+    }
 
+    /**
+     * Implementation of a LISP agent which is responsible for keeping track of
+     * connected LISP routers and the state in which they are in.
+     */
+    public final class DefaultLispRouterAgent implements LispRouterAgent {
+
+        private final Logger log = getLogger(DefaultLispRouterAgent.class);
+
+        /**
+         * Prevents object instantiation from external class.
+         */
+        private DefaultLispRouterAgent() {
+        }
+
+        @Override
+        public boolean addConnectedRouter(LispRouterId routerId, LispRouter router) {
+
+            if (connectedRouters.get(routerId) != null) {
+                log.error("Trying to add connectedRouter but found a previous " +
+                          "value for routerId: {}", routerId);
+                return false;
+            } else {
+                log.info("Added router {}", routerId);
+                connectedRouters.put(routerId, router);
+                for (LispRouterListener listener : lispRouterListeners) {
+                    listener.routerAdded(routerId);
+                }
+                return true;
+            }
+        }
+
+        @Override
+        public void removeConnectedRouter(LispRouterId routerId) {
+
+            if (connectedRouters.get(routerId) == null) {
+                log.error("Trying to remove router {} from connectedRouter " +
+                          "list but no element was found", routerId);
+            } else {
+                log.info("Removed router {}", routerId);
+                connectedRouters.remove(routerId);
+                for (LispRouterListener listener : lispRouterListeners) {
+                    listener.routerRemoved(routerId);
+                }
+            }
+        }
+
+        @Override
+        public void processUpstreamMessage(LispRouterId routerId, LispMessage message) {
+
+            switch (message.getType()) {
+                case LISP_MAP_REGISTER:
+                case LISP_MAP_REQUEST:
+                    executorMessages.execute(
+                            new LispIncomingMessageHandler(routerId, message));
+                    break;
+                case LISP_INFO:
+                    if (message instanceof LispInfoRequest) {
+                        executorMessages.execute(
+                                new LispIncomingMessageHandler(routerId, message));
+                    } else {
+                        log.warn("Not incoming LISP control message");
+                    }
+                    break;
+                default:
+                    log.warn("Not incoming LISP control message");
+                    break;
+            }
+        }
+
+        @Override
+        public void processDownstreamMessage(LispRouterId routerId, LispMessage message) {
+
+            switch (message.getType()) {
+                case LISP_MAP_NOTIFY:
+                case LISP_MAP_REPLY:
+                    executorMessages.execute(
+                            new LispOutgoingMessageHandler(routerId, message));
+                    break;
+                case LISP_INFO:
+                    if (message instanceof LispInfoReply) {
+                        executorMessages.execute(
+                                new LispOutgoingMessageHandler(routerId, message));
+                    } else {
+                        log.warn("Not outgoing LISP control message");
+                    }
+                    break;
+                default:
+                    log.warn("Not outgoing LISP control message");
+                    break;
+            }
+        }
+    }
+
+    /**
+     * LISP message handler.
+     */
+    protected class LispMessageHandler implements Runnable {
+
+        private final LispRouterId routerId;
+        private final LispMessage message;
+        private final boolean isIncoming;
+
+        LispMessageHandler(LispRouterId routerId,
+                                  LispMessage message, boolean isIncoming) {
+            this.routerId = routerId;
+            this.message = message;
+            this.isIncoming = isIncoming;
+        }
+
+        @Override
+        public void run() {
+            for (LispMessageListener listener : lispMessageListeners) {
+                if (isIncoming) {
+                    listener.handleIncomingMessage(routerId, message);
+                } else {
+                    listener.handleOutgoingMessage(routerId, message);
+                }
+            }
+        }
+    }
+
+    /**
+     * LISP incoming message handler.
+     */
+    protected final class LispIncomingMessageHandler
+                    extends LispMessageHandler implements Runnable {
+
+        LispIncomingMessageHandler(LispRouterId routerId,
+                                          LispMessage message) {
+            super(routerId, message, true);
+        }
+    }
+
+    /**
+     * LISP outgoing message handler.
+     */
+    protected final class LispOutgoingMessageHandler
+                    extends LispMessageHandler implements Runnable {
+
+        LispOutgoingMessageHandler(LispRouterId routerId,
+                                          LispMessage message) {
+            super(routerId, message, false);
+        }
     }
 }