[AETHER-72] Refactoring RouteService

- to use bulk updates interface
- to use new getRoutesForNextHops API
- to use multi-thread resolver
- to use multi-thread hostexec
- to use a concurrent hashmap instead of synchronized
- to use a non-blocking resolved store

Additionally updates unit tests

Change-Id: Id960abd0f2a1b03066ce34b6a2f72b76566bb58c
diff --git a/apps/route-service/app/src/main/java/org/onosproject/routeservice/impl/DefaultResolvedRouteStore.java b/apps/route-service/app/src/main/java/org/onosproject/routeservice/impl/DefaultResolvedRouteStore.java
index 725a7a9..51ef8c7 100644
--- a/apps/route-service/app/src/main/java/org/onosproject/routeservice/impl/DefaultResolvedRouteStore.java
+++ b/apps/route-service/app/src/main/java/org/onosproject/routeservice/impl/DefaultResolvedRouteStore.java
@@ -17,7 +17,6 @@
 package org.onosproject.routeservice.impl;
 
 import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Maps;
 import com.googlecode.concurrenttrees.common.KeyValuePair;
 import com.googlecode.concurrenttrees.radix.node.concrete.DefaultByteArrayNodeFactory;
 import com.googlecode.concurrenttrees.radixinverted.ConcurrentInvertedRadixTree;
@@ -121,7 +120,7 @@
             routeTable = new ConcurrentInvertedRadixTree<>(
                     new DefaultByteArrayNodeFactory());
 
-            alternativeRoutes = Maps.newHashMap();
+            alternativeRoutes = new ConcurrentHashMap<>();
         }
 
         /**
@@ -133,7 +132,6 @@
         public RouteEvent update(ResolvedRoute route, Set<ResolvedRoute> alternatives) {
             Set<ResolvedRoute> immutableAlternatives = checkAlternatives(route, alternatives);
 
-            synchronized (this) {
                 ResolvedRoute oldRoute = routeTable.put(createBinaryString(route.prefix()), route);
                 Set<ResolvedRoute> oldRoutes = alternativeRoutes.put(route.prefix(), immutableAlternatives);
 
@@ -153,7 +151,6 @@
                 }
 
                 return null;
-            }
         }
 
         /**
@@ -181,18 +178,16 @@
          * @param prefix prefix to remove
          */
         public RouteEvent remove(IpPrefix prefix) {
-            synchronized (this) {
-                String key = createBinaryString(prefix);
+            String key = createBinaryString(prefix);
 
-                ResolvedRoute route = routeTable.getValueForExactKey(key);
-                Set<ResolvedRoute> alternatives = alternativeRoutes.remove(prefix);
+            ResolvedRoute route = routeTable.getValueForExactKey(key);
+            Set<ResolvedRoute> alternatives = alternativeRoutes.remove(prefix);
 
-                if (route != null) {
-                    routeTable.remove(key);
-                    return new RouteEvent(RouteEvent.Type.ROUTE_REMOVED, route, alternatives);
-                }
-                return null;
+            if (route != null) {
+                routeTable.remove(key);
+                return new RouteEvent(RouteEvent.Type.ROUTE_REMOVED, route, alternatives);
             }
+            return null;
         }
 
         /**
diff --git a/apps/route-service/app/src/main/java/org/onosproject/routeservice/impl/RouteManager.java b/apps/route-service/app/src/main/java/org/onosproject/routeservice/impl/RouteManager.java
index 72d5453..25a74ae 100644
--- a/apps/route-service/app/src/main/java/org/onosproject/routeservice/impl/RouteManager.java
+++ b/apps/route-service/app/src/main/java/org/onosproject/routeservice/impl/RouteManager.java
@@ -19,6 +19,7 @@
 import com.google.common.collect.ImmutableList;
 import org.onlab.packet.IpAddress;
 import org.onlab.packet.IpPrefix;
+import org.onlab.util.PredictableExecutor;
 import org.onosproject.cluster.ClusterService;
 import org.onosproject.net.Host;
 import org.onosproject.net.host.HostEvent;
@@ -32,7 +33,6 @@
 import org.onosproject.routeservice.RouteInfo;
 import org.onosproject.routeservice.RouteListener;
 import org.onosproject.routeservice.RouteService;
-import org.onosproject.routeservice.RouteSet;
 import org.onosproject.routeservice.RouteStore;
 import org.onosproject.routeservice.RouteStoreDelegate;
 import org.onosproject.routeservice.RouteTableId;
@@ -45,16 +45,12 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.annotation.concurrent.GuardedBy;
 import java.util.Collection;
-import java.util.Comparator;
-import java.util.HashMap;
 import java.util.Map;
-import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.Executor;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadFactory;
@@ -71,6 +67,8 @@
 
     private final Logger log = LoggerFactory.getLogger(getClass());
 
+    private static final int DEFAULT_BUCKETS = 0;
+
     private RouteStoreDelegate delegate = new InternalRouteStoreDelegate();
     private InternalHostListener hostListener = new InternalHostListener();
 
@@ -90,18 +88,21 @@
 
     private RouteMonitor routeMonitor;
 
-    @GuardedBy(value = "this")
-    private Map<RouteListener, ListenerQueue> listeners = new HashMap<>();
+    protected RouteResolver routeResolver;
+
+    private Map<RouteListener, ListenerQueue> listeners = new ConcurrentHashMap<>();
 
     private ThreadFactory threadFactory;
 
-    protected Executor hostEventExecutor = newSingleThreadExecutor(
-        groupedThreads("rm-event-host", "%d", log));
+    protected PredictableExecutor hostEventExecutors;
 
     @Activate
     protected void activate() {
         routeMonitor = new RouteMonitor(this, clusterService, storageService);
+        routeResolver = new RouteResolver(this, hostService);
         threadFactory = groupedThreads("onos/route", "listener-%d", log);
+        hostEventExecutors = new PredictableExecutor(DEFAULT_BUCKETS, groupedThreads("onos/route-manager",
+                                                                                     "event-host-%d", log));
 
         resolvedRouteStore = new DefaultResolvedRouteStore();
 
@@ -110,15 +111,14 @@
 
         routeStore.getRouteTables().stream()
                 .flatMap(id -> routeStore.getRoutes(id).stream())
-                .forEach(this::resolve);
+                .forEach(routeSet -> routeResolver.resolve(routeSet));
     }
 
     @Deactivate
     protected void deactivate() {
         routeMonitor.shutdown();
-        synchronized (this) {
-            listeners.values().forEach(ListenerQueue::stop);
-        }
+        routeResolver.shutdown();
+        listeners.values().forEach(ListenerQueue::stop);
 
         routeStore.unsetDelegate(delegate);
         hostService.removeListener(hostListener);
@@ -136,8 +136,9 @@
      */
     @Override
     public void addListener(RouteListener listener) {
-        synchronized (this) {
-            log.debug("Synchronizing current routes to new listener");
+        log.debug("Synchronizing current routes to new listener");
+        ListenerQueue listenerQueue = listeners.compute(listener, (key, value) -> {
+            // Create listener regardless the existence of a previous value
             ListenerQueue l = createListenerQueue(listener);
             resolvedRouteStore.getRouteTables().stream()
                     .map(resolvedRouteStore::getRoutes)
@@ -145,21 +146,18 @@
                     .map(route -> new RouteEvent(RouteEvent.Type.ROUTE_ADDED, route,
                                                  resolvedRouteStore.getAllRoutes(route.prefix())))
                     .forEach(l::post);
-
-            listeners.put(listener, l);
-
-            l.start();
-            log.debug("Route synchronization complete");
-        }
+            return l;
+        });
+        // Start draining the events
+        listenerQueue.start();
+        log.debug("Route synchronization complete");
     }
 
     @Override
     public void removeListener(RouteListener listener) {
-        synchronized (this) {
-            ListenerQueue l = listeners.remove(listener);
-            if (l != null) {
-                l.stop();
-            }
+        ListenerQueue l = listeners.remove(listener);
+        if (l != null) {
+            l.stop();
         }
     }
 
@@ -171,16 +169,10 @@
     private void post(RouteEvent event) {
         if (event != null) {
             log.debug("Sending event {}", event);
-            synchronized (this) {
-                listeners.values().forEach(l -> l.post(event));
-            }
+            listeners.values().forEach(l -> l.post(event));
         }
     }
 
-    private Collection<Route> reformatRoutes(Collection<RouteSet> routeSets) {
-        return routeSets.stream().flatMap(r -> r.routes().stream()).collect(Collectors.toList());
-    }
-
     @Override
     public Collection<RouteTableId> getRouteTables() {
         return routeStore.getRouteTables();
@@ -190,24 +182,11 @@
     public Collection<RouteInfo> getRoutes(RouteTableId id) {
         return routeStore.getRoutes(id).stream()
                 .map(routeSet -> new RouteInfo(routeSet.prefix(),
-                        resolvedRouteStore.getRoute(routeSet.prefix()).orElse(null), resolveRouteSet(routeSet)))
+                                               resolvedRouteStore.getRoute(routeSet.prefix()).orElse(null),
+                                               routeResolver.resolveRouteSet(routeSet)))
                 .collect(Collectors.toList());
     }
 
-    private Set<ResolvedRoute> resolveRouteSet(RouteSet routeSet) {
-        return routeSet.routes().stream()
-                .map(this::tryResolve)
-                .collect(Collectors.toSet());
-    }
-
-    private ResolvedRoute tryResolve(Route route) {
-        ResolvedRoute resolvedRoute = resolve(route);
-        if (resolvedRoute == null) {
-            resolvedRoute = new ResolvedRoute(route, null, null);
-        }
-        return resolvedRoute;
-    }
-
     @Override
     public Collection<ResolvedRoute> getResolvedRoutes(RouteTableId id) {
         return resolvedRouteStore.getRoutes(id);
@@ -225,22 +204,14 @@
 
     @Override
     public void update(Collection<Route> routes) {
-        synchronized (this) {
-            routes.forEach(route -> {
-                log.debug("Received update {}", route);
-                routeStore.updateRoute(route);
-            });
-        }
+        log.debug("Received update {}", routes);
+        routeStore.updateRoutes(routes);
     }
 
     @Override
     public void withdraw(Collection<Route> routes) {
-        synchronized (this) {
-            routes.forEach(route -> {
-                log.debug("Received withdraw {}", route);
-                routeStore.removeRoute(route);
-            });
-        }
+        log.debug("Received withdraw {}", routes);
+        routeStore.removeRoutes(routes);
     }
 
     @Override
@@ -250,48 +221,14 @@
                 .orElse(null);
     }
 
-    private ResolvedRoute resolve(Route route) {
-        hostService.startMonitoringIp(route.nextHop());
-        Set<Host> hosts = hostService.getHostsByIp(route.nextHop());
-
-        return hosts.stream().findFirst()
-                .map(host -> new ResolvedRoute(route, host.mac(), host.vlan()))
-                .orElse(null);
-    }
-
-    private ResolvedRoute decide(ResolvedRoute route1, ResolvedRoute route2) {
-        return Comparator.comparing(ResolvedRoute::nextHop)
-                       .compare(route1, route2) <= 0 ? route1 : route2;
-    }
-
-    private void store(ResolvedRoute route, Set<ResolvedRoute> alternatives) {
+    void store(ResolvedRoute route, Set<ResolvedRoute> alternatives) {
         post(resolvedRouteStore.updateRoute(route, alternatives));
     }
 
-    private void remove(IpPrefix prefix) {
+    void remove(IpPrefix prefix) {
         post(resolvedRouteStore.removeRoute(prefix));
     }
 
-    private void resolve(RouteSet routes) {
-        if (routes.routes() == null) {
-            // The routes were removed before we got to them, nothing to do
-            return;
-        }
-        Set<ResolvedRoute> resolvedRoutes = routes.routes().stream()
-                .map(this::resolve)
-                .filter(Objects::nonNull)
-                .collect(Collectors.toSet());
-
-        Optional<ResolvedRoute> bestRoute = resolvedRoutes.stream()
-                    .reduce(this::decide);
-
-        if (bestRoute.isPresent()) {
-            store(bestRoute.get(), resolvedRoutes);
-        } else {
-            remove(routes.prefix());
-        }
-    }
-
     private void hostUpdated(Host host) {
         hostChanged(host);
     }
@@ -301,12 +238,8 @@
     }
 
     private void hostChanged(Host host) {
-        synchronized (this) {
-            host.ipAddresses().stream()
-                    .flatMap(ip -> routeStore.getRoutesForNextHop(ip).stream())
-                    .map(route -> routeStore.getRoutes(route.prefix()))
-                    .forEach(this::resolve);
-        }
+        routeStore.getRoutesForNextHops(host.ipAddresses())
+                .forEach(routeSet -> routeResolver.resolve(routeSet));
     }
 
     /**
@@ -377,10 +310,10 @@
         public void notify(InternalRouteEvent event) {
             switch (event.type()) {
             case ROUTE_ADDED:
-                resolve(event.subject());
+                routeResolver.resolve(event.subject());
                 break;
             case ROUTE_REMOVED:
-                resolve(event.subject());
+                routeResolver.resolve(event.subject());
                 break;
             default:
                 break;
@@ -399,11 +332,11 @@
             case HOST_UPDATED:
             case HOST_MOVED:
                 log.trace("Scheduled host event {}", event);
-                hostEventExecutor.execute(() -> hostUpdated(event.subject()));
+                hostEventExecutors.execute(() -> hostUpdated(event.subject()), event.subject().id().hashCode());
                 break;
             case HOST_REMOVED:
                 log.trace("Scheduled host event {}", event);
-                hostEventExecutor.execute(() -> hostRemoved(event.subject()));
+                hostEventExecutors.execute(() -> hostRemoved(event.subject()), event.subject().id().hashCode());
                 break;
             default:
                 break;
diff --git a/apps/route-service/app/src/main/java/org/onosproject/routeservice/impl/RouteResolver.java b/apps/route-service/app/src/main/java/org/onosproject/routeservice/impl/RouteResolver.java
new file mode 100644
index 0000000..5925904
--- /dev/null
+++ b/apps/route-service/app/src/main/java/org/onosproject/routeservice/impl/RouteResolver.java
@@ -0,0 +1,123 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.routeservice.impl;
+
+import org.onlab.util.PredictableExecutor;
+import org.onosproject.net.Host;
+import org.onosproject.net.host.HostService;
+import org.onosproject.routeservice.ResolvedRoute;
+import org.onosproject.routeservice.Route;
+import org.onosproject.routeservice.RouteSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Comparator;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.onlab.util.Tools.groupedThreads;
+
+/**
+ * Resolves routes in multi-thread fashion.
+ */
+class RouteResolver {
+
+    private final Logger log = LoggerFactory.getLogger(this.getClass());
+    private static final int DEFAULT_BUCKETS = 0;
+    private RouteManager routeManager;
+    private HostService hostService;
+    protected PredictableExecutor routeResolvers;
+
+    /**
+     * Creates a new route resolver.
+     *
+     * @param routeManager route service
+     * @param hostService host service
+     */
+    RouteResolver(RouteManager routeManager, HostService hostService) {
+        this.routeManager = routeManager;
+        this.hostService = hostService;
+        routeResolvers = new PredictableExecutor(DEFAULT_BUCKETS, groupedThreads("onos/route-resolver",
+                                                                                  "route-resolver-%d", log));
+    }
+
+    /**
+     * Shuts down the route resolver.
+     */
+    void shutdown() {
+        routeResolvers.shutdown();
+    }
+
+    private ResolvedRoute tryResolve(Route route) {
+        ResolvedRoute resolvedRoute = resolve(route);
+        if (resolvedRoute == null) {
+            resolvedRoute = new ResolvedRoute(route, null, null);
+        }
+        return resolvedRoute;
+    }
+
+    // Used by external reads
+    Set<ResolvedRoute> resolveRouteSet(RouteSet routeSet) {
+        return routeSet.routes().stream()
+                .map(this::tryResolve)
+                .collect(Collectors.toSet());
+    }
+
+    // Used by external reads and by resolvers
+    ResolvedRoute resolve(Route route) {
+        hostService.startMonitoringIp(route.nextHop());
+        Set<Host> hosts = hostService.getHostsByIp(route.nextHop());
+
+        return hosts.stream().findFirst()
+                .map(host -> new ResolvedRoute(route, host.mac(), host.vlan()))
+                .orElse(null);
+    }
+
+    private ResolvedRoute decide(ResolvedRoute route1, ResolvedRoute route2) {
+        return Comparator.comparing(ResolvedRoute::nextHop)
+                .compare(route1, route2) <= 0 ? route1 : route2;
+    }
+
+    private void resolveInternal(RouteSet routes) {
+        if (routes.routes() == null) {
+            // The routes were removed before we got to them, nothing to do
+            return;
+        }
+
+        Set<ResolvedRoute> resolvedRoutes = routes.routes().stream()
+                .map(this::resolve)
+                .filter(Objects::nonNull)
+                .collect(Collectors.toSet());
+
+        Optional<ResolvedRoute> bestRoute = resolvedRoutes.stream()
+                .reduce(this::decide);
+
+        if (bestRoute.isPresent()) {
+            routeManager.store(bestRoute.get(), resolvedRoutes);
+        } else {
+            routeManager.remove(routes.prefix());
+        }
+    }
+
+    // Offload to the resolvers using prefix hashcode as hint
+    // TODO Remove RouteManager reference using PickyCallable
+    void resolve(RouteSet routes) {
+        routeResolvers.execute(() -> resolveInternal(routes), routes.prefix().hashCode());
+    }
+}