[AETHER-72] Refactoring RouteService

- to use bulk updates interface
- to use new getRoutesForNextHops API
- to use multi-thread resolver
- to use multi-thread hostexec
- to use a concurrent hashmap instead of synchronized
- to use a non-blocking resolved store

Additionally updates unit tests

Change-Id: Id960abd0f2a1b03066ce34b6a2f72b76566bb58c
diff --git a/apps/route-service/app/src/main/java/org/onosproject/routeservice/impl/RouteManager.java b/apps/route-service/app/src/main/java/org/onosproject/routeservice/impl/RouteManager.java
index 72d5453..25a74ae 100644
--- a/apps/route-service/app/src/main/java/org/onosproject/routeservice/impl/RouteManager.java
+++ b/apps/route-service/app/src/main/java/org/onosproject/routeservice/impl/RouteManager.java
@@ -19,6 +19,7 @@
 import com.google.common.collect.ImmutableList;
 import org.onlab.packet.IpAddress;
 import org.onlab.packet.IpPrefix;
+import org.onlab.util.PredictableExecutor;
 import org.onosproject.cluster.ClusterService;
 import org.onosproject.net.Host;
 import org.onosproject.net.host.HostEvent;
@@ -32,7 +33,6 @@
 import org.onosproject.routeservice.RouteInfo;
 import org.onosproject.routeservice.RouteListener;
 import org.onosproject.routeservice.RouteService;
-import org.onosproject.routeservice.RouteSet;
 import org.onosproject.routeservice.RouteStore;
 import org.onosproject.routeservice.RouteStoreDelegate;
 import org.onosproject.routeservice.RouteTableId;
@@ -45,16 +45,12 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.annotation.concurrent.GuardedBy;
 import java.util.Collection;
-import java.util.Comparator;
-import java.util.HashMap;
 import java.util.Map;
-import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.Executor;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadFactory;
@@ -71,6 +67,8 @@
 
     private final Logger log = LoggerFactory.getLogger(getClass());
 
+    private static final int DEFAULT_BUCKETS = 0;
+
     private RouteStoreDelegate delegate = new InternalRouteStoreDelegate();
     private InternalHostListener hostListener = new InternalHostListener();
 
@@ -90,18 +88,21 @@
 
     private RouteMonitor routeMonitor;
 
-    @GuardedBy(value = "this")
-    private Map<RouteListener, ListenerQueue> listeners = new HashMap<>();
+    protected RouteResolver routeResolver;
+
+    private Map<RouteListener, ListenerQueue> listeners = new ConcurrentHashMap<>();
 
     private ThreadFactory threadFactory;
 
-    protected Executor hostEventExecutor = newSingleThreadExecutor(
-        groupedThreads("rm-event-host", "%d", log));
+    protected PredictableExecutor hostEventExecutors;
 
     @Activate
     protected void activate() {
         routeMonitor = new RouteMonitor(this, clusterService, storageService);
+        routeResolver = new RouteResolver(this, hostService);
         threadFactory = groupedThreads("onos/route", "listener-%d", log);
+        hostEventExecutors = new PredictableExecutor(DEFAULT_BUCKETS, groupedThreads("onos/route-manager",
+                                                                                     "event-host-%d", log));
 
         resolvedRouteStore = new DefaultResolvedRouteStore();
 
@@ -110,15 +111,14 @@
 
         routeStore.getRouteTables().stream()
                 .flatMap(id -> routeStore.getRoutes(id).stream())
-                .forEach(this::resolve);
+                .forEach(routeSet -> routeResolver.resolve(routeSet));
     }
 
     @Deactivate
     protected void deactivate() {
         routeMonitor.shutdown();
-        synchronized (this) {
-            listeners.values().forEach(ListenerQueue::stop);
-        }
+        routeResolver.shutdown();
+        listeners.values().forEach(ListenerQueue::stop);
 
         routeStore.unsetDelegate(delegate);
         hostService.removeListener(hostListener);
@@ -136,8 +136,9 @@
      */
     @Override
     public void addListener(RouteListener listener) {
-        synchronized (this) {
-            log.debug("Synchronizing current routes to new listener");
+        log.debug("Synchronizing current routes to new listener");
+        ListenerQueue listenerQueue = listeners.compute(listener, (key, value) -> {
+            // Create listener regardless the existence of a previous value
             ListenerQueue l = createListenerQueue(listener);
             resolvedRouteStore.getRouteTables().stream()
                     .map(resolvedRouteStore::getRoutes)
@@ -145,21 +146,18 @@
                     .map(route -> new RouteEvent(RouteEvent.Type.ROUTE_ADDED, route,
                                                  resolvedRouteStore.getAllRoutes(route.prefix())))
                     .forEach(l::post);
-
-            listeners.put(listener, l);
-
-            l.start();
-            log.debug("Route synchronization complete");
-        }
+            return l;
+        });
+        // Start draining the events
+        listenerQueue.start();
+        log.debug("Route synchronization complete");
     }
 
     @Override
     public void removeListener(RouteListener listener) {
-        synchronized (this) {
-            ListenerQueue l = listeners.remove(listener);
-            if (l != null) {
-                l.stop();
-            }
+        ListenerQueue l = listeners.remove(listener);
+        if (l != null) {
+            l.stop();
         }
     }
 
@@ -171,16 +169,10 @@
     private void post(RouteEvent event) {
         if (event != null) {
             log.debug("Sending event {}", event);
-            synchronized (this) {
-                listeners.values().forEach(l -> l.post(event));
-            }
+            listeners.values().forEach(l -> l.post(event));
         }
     }
 
-    private Collection<Route> reformatRoutes(Collection<RouteSet> routeSets) {
-        return routeSets.stream().flatMap(r -> r.routes().stream()).collect(Collectors.toList());
-    }
-
     @Override
     public Collection<RouteTableId> getRouteTables() {
         return routeStore.getRouteTables();
@@ -190,24 +182,11 @@
     public Collection<RouteInfo> getRoutes(RouteTableId id) {
         return routeStore.getRoutes(id).stream()
                 .map(routeSet -> new RouteInfo(routeSet.prefix(),
-                        resolvedRouteStore.getRoute(routeSet.prefix()).orElse(null), resolveRouteSet(routeSet)))
+                                               resolvedRouteStore.getRoute(routeSet.prefix()).orElse(null),
+                                               routeResolver.resolveRouteSet(routeSet)))
                 .collect(Collectors.toList());
     }
 
-    private Set<ResolvedRoute> resolveRouteSet(RouteSet routeSet) {
-        return routeSet.routes().stream()
-                .map(this::tryResolve)
-                .collect(Collectors.toSet());
-    }
-
-    private ResolvedRoute tryResolve(Route route) {
-        ResolvedRoute resolvedRoute = resolve(route);
-        if (resolvedRoute == null) {
-            resolvedRoute = new ResolvedRoute(route, null, null);
-        }
-        return resolvedRoute;
-    }
-
     @Override
     public Collection<ResolvedRoute> getResolvedRoutes(RouteTableId id) {
         return resolvedRouteStore.getRoutes(id);
@@ -225,22 +204,14 @@
 
     @Override
     public void update(Collection<Route> routes) {
-        synchronized (this) {
-            routes.forEach(route -> {
-                log.debug("Received update {}", route);
-                routeStore.updateRoute(route);
-            });
-        }
+        log.debug("Received update {}", routes);
+        routeStore.updateRoutes(routes);
     }
 
     @Override
     public void withdraw(Collection<Route> routes) {
-        synchronized (this) {
-            routes.forEach(route -> {
-                log.debug("Received withdraw {}", route);
-                routeStore.removeRoute(route);
-            });
-        }
+        log.debug("Received withdraw {}", routes);
+        routeStore.removeRoutes(routes);
     }
 
     @Override
@@ -250,48 +221,14 @@
                 .orElse(null);
     }
 
-    private ResolvedRoute resolve(Route route) {
-        hostService.startMonitoringIp(route.nextHop());
-        Set<Host> hosts = hostService.getHostsByIp(route.nextHop());
-
-        return hosts.stream().findFirst()
-                .map(host -> new ResolvedRoute(route, host.mac(), host.vlan()))
-                .orElse(null);
-    }
-
-    private ResolvedRoute decide(ResolvedRoute route1, ResolvedRoute route2) {
-        return Comparator.comparing(ResolvedRoute::nextHop)
-                       .compare(route1, route2) <= 0 ? route1 : route2;
-    }
-
-    private void store(ResolvedRoute route, Set<ResolvedRoute> alternatives) {
+    void store(ResolvedRoute route, Set<ResolvedRoute> alternatives) {
         post(resolvedRouteStore.updateRoute(route, alternatives));
     }
 
-    private void remove(IpPrefix prefix) {
+    void remove(IpPrefix prefix) {
         post(resolvedRouteStore.removeRoute(prefix));
     }
 
-    private void resolve(RouteSet routes) {
-        if (routes.routes() == null) {
-            // The routes were removed before we got to them, nothing to do
-            return;
-        }
-        Set<ResolvedRoute> resolvedRoutes = routes.routes().stream()
-                .map(this::resolve)
-                .filter(Objects::nonNull)
-                .collect(Collectors.toSet());
-
-        Optional<ResolvedRoute> bestRoute = resolvedRoutes.stream()
-                    .reduce(this::decide);
-
-        if (bestRoute.isPresent()) {
-            store(bestRoute.get(), resolvedRoutes);
-        } else {
-            remove(routes.prefix());
-        }
-    }
-
     private void hostUpdated(Host host) {
         hostChanged(host);
     }
@@ -301,12 +238,8 @@
     }
 
     private void hostChanged(Host host) {
-        synchronized (this) {
-            host.ipAddresses().stream()
-                    .flatMap(ip -> routeStore.getRoutesForNextHop(ip).stream())
-                    .map(route -> routeStore.getRoutes(route.prefix()))
-                    .forEach(this::resolve);
-        }
+        routeStore.getRoutesForNextHops(host.ipAddresses())
+                .forEach(routeSet -> routeResolver.resolve(routeSet));
     }
 
     /**
@@ -377,10 +310,10 @@
         public void notify(InternalRouteEvent event) {
             switch (event.type()) {
             case ROUTE_ADDED:
-                resolve(event.subject());
+                routeResolver.resolve(event.subject());
                 break;
             case ROUTE_REMOVED:
-                resolve(event.subject());
+                routeResolver.resolve(event.subject());
                 break;
             default:
                 break;
@@ -399,11 +332,11 @@
             case HOST_UPDATED:
             case HOST_MOVED:
                 log.trace("Scheduled host event {}", event);
-                hostEventExecutor.execute(() -> hostUpdated(event.subject()));
+                hostEventExecutors.execute(() -> hostUpdated(event.subject()), event.subject().id().hashCode());
                 break;
             case HOST_REMOVED:
                 log.trace("Scheduled host event {}", event);
-                hostEventExecutor.execute(() -> hostRemoved(event.subject()));
+                hostEventExecutors.execute(() -> hostRemoved(event.subject()), event.subject().id().hashCode());
                 break;
             default:
                 break;