[AETHER-72] Refactoring RouteService
- to use bulk updates interface
- to use new getRoutesForNextHops API
- to use multi-thread resolver
- to use multi-thread hostexec
- to use a concurrent hashmap instead of synchronized
- to use a non-blocking resolved store
Additionally updates unit tests
Change-Id: Id960abd0f2a1b03066ce34b6a2f72b76566bb58c
diff --git a/apps/route-service/app/src/main/java/org/onosproject/routeservice/impl/RouteManager.java b/apps/route-service/app/src/main/java/org/onosproject/routeservice/impl/RouteManager.java
index 72d5453..25a74ae 100644
--- a/apps/route-service/app/src/main/java/org/onosproject/routeservice/impl/RouteManager.java
+++ b/apps/route-service/app/src/main/java/org/onosproject/routeservice/impl/RouteManager.java
@@ -19,6 +19,7 @@
import com.google.common.collect.ImmutableList;
import org.onlab.packet.IpAddress;
import org.onlab.packet.IpPrefix;
+import org.onlab.util.PredictableExecutor;
import org.onosproject.cluster.ClusterService;
import org.onosproject.net.Host;
import org.onosproject.net.host.HostEvent;
@@ -32,7 +33,6 @@
import org.onosproject.routeservice.RouteInfo;
import org.onosproject.routeservice.RouteListener;
import org.onosproject.routeservice.RouteService;
-import org.onosproject.routeservice.RouteSet;
import org.onosproject.routeservice.RouteStore;
import org.onosproject.routeservice.RouteStoreDelegate;
import org.onosproject.routeservice.RouteTableId;
@@ -45,16 +45,12 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.annotation.concurrent.GuardedBy;
import java.util.Collection;
-import java.util.Comparator;
-import java.util.HashMap;
import java.util.Map;
-import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.Executor;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
@@ -71,6 +67,8 @@
private final Logger log = LoggerFactory.getLogger(getClass());
+ private static final int DEFAULT_BUCKETS = 0;
+
private RouteStoreDelegate delegate = new InternalRouteStoreDelegate();
private InternalHostListener hostListener = new InternalHostListener();
@@ -90,18 +88,21 @@
private RouteMonitor routeMonitor;
- @GuardedBy(value = "this")
- private Map<RouteListener, ListenerQueue> listeners = new HashMap<>();
+ protected RouteResolver routeResolver;
+
+ private Map<RouteListener, ListenerQueue> listeners = new ConcurrentHashMap<>();
private ThreadFactory threadFactory;
- protected Executor hostEventExecutor = newSingleThreadExecutor(
- groupedThreads("rm-event-host", "%d", log));
+ protected PredictableExecutor hostEventExecutors;
@Activate
protected void activate() {
routeMonitor = new RouteMonitor(this, clusterService, storageService);
+ routeResolver = new RouteResolver(this, hostService);
threadFactory = groupedThreads("onos/route", "listener-%d", log);
+ hostEventExecutors = new PredictableExecutor(DEFAULT_BUCKETS, groupedThreads("onos/route-manager",
+ "event-host-%d", log));
resolvedRouteStore = new DefaultResolvedRouteStore();
@@ -110,15 +111,14 @@
routeStore.getRouteTables().stream()
.flatMap(id -> routeStore.getRoutes(id).stream())
- .forEach(this::resolve);
+ .forEach(routeSet -> routeResolver.resolve(routeSet));
}
@Deactivate
protected void deactivate() {
routeMonitor.shutdown();
- synchronized (this) {
- listeners.values().forEach(ListenerQueue::stop);
- }
+ routeResolver.shutdown();
+ listeners.values().forEach(ListenerQueue::stop);
routeStore.unsetDelegate(delegate);
hostService.removeListener(hostListener);
@@ -136,8 +136,9 @@
*/
@Override
public void addListener(RouteListener listener) {
- synchronized (this) {
- log.debug("Synchronizing current routes to new listener");
+ log.debug("Synchronizing current routes to new listener");
+ ListenerQueue listenerQueue = listeners.compute(listener, (key, value) -> {
+ // Create listener regardless the existence of a previous value
ListenerQueue l = createListenerQueue(listener);
resolvedRouteStore.getRouteTables().stream()
.map(resolvedRouteStore::getRoutes)
@@ -145,21 +146,18 @@
.map(route -> new RouteEvent(RouteEvent.Type.ROUTE_ADDED, route,
resolvedRouteStore.getAllRoutes(route.prefix())))
.forEach(l::post);
-
- listeners.put(listener, l);
-
- l.start();
- log.debug("Route synchronization complete");
- }
+ return l;
+ });
+ // Start draining the events
+ listenerQueue.start();
+ log.debug("Route synchronization complete");
}
@Override
public void removeListener(RouteListener listener) {
- synchronized (this) {
- ListenerQueue l = listeners.remove(listener);
- if (l != null) {
- l.stop();
- }
+ ListenerQueue l = listeners.remove(listener);
+ if (l != null) {
+ l.stop();
}
}
@@ -171,16 +169,10 @@
private void post(RouteEvent event) {
if (event != null) {
log.debug("Sending event {}", event);
- synchronized (this) {
- listeners.values().forEach(l -> l.post(event));
- }
+ listeners.values().forEach(l -> l.post(event));
}
}
- private Collection<Route> reformatRoutes(Collection<RouteSet> routeSets) {
- return routeSets.stream().flatMap(r -> r.routes().stream()).collect(Collectors.toList());
- }
-
@Override
public Collection<RouteTableId> getRouteTables() {
return routeStore.getRouteTables();
@@ -190,24 +182,11 @@
public Collection<RouteInfo> getRoutes(RouteTableId id) {
return routeStore.getRoutes(id).stream()
.map(routeSet -> new RouteInfo(routeSet.prefix(),
- resolvedRouteStore.getRoute(routeSet.prefix()).orElse(null), resolveRouteSet(routeSet)))
+ resolvedRouteStore.getRoute(routeSet.prefix()).orElse(null),
+ routeResolver.resolveRouteSet(routeSet)))
.collect(Collectors.toList());
}
- private Set<ResolvedRoute> resolveRouteSet(RouteSet routeSet) {
- return routeSet.routes().stream()
- .map(this::tryResolve)
- .collect(Collectors.toSet());
- }
-
- private ResolvedRoute tryResolve(Route route) {
- ResolvedRoute resolvedRoute = resolve(route);
- if (resolvedRoute == null) {
- resolvedRoute = new ResolvedRoute(route, null, null);
- }
- return resolvedRoute;
- }
-
@Override
public Collection<ResolvedRoute> getResolvedRoutes(RouteTableId id) {
return resolvedRouteStore.getRoutes(id);
@@ -225,22 +204,14 @@
@Override
public void update(Collection<Route> routes) {
- synchronized (this) {
- routes.forEach(route -> {
- log.debug("Received update {}", route);
- routeStore.updateRoute(route);
- });
- }
+ log.debug("Received update {}", routes);
+ routeStore.updateRoutes(routes);
}
@Override
public void withdraw(Collection<Route> routes) {
- synchronized (this) {
- routes.forEach(route -> {
- log.debug("Received withdraw {}", route);
- routeStore.removeRoute(route);
- });
- }
+ log.debug("Received withdraw {}", routes);
+ routeStore.removeRoutes(routes);
}
@Override
@@ -250,48 +221,14 @@
.orElse(null);
}
- private ResolvedRoute resolve(Route route) {
- hostService.startMonitoringIp(route.nextHop());
- Set<Host> hosts = hostService.getHostsByIp(route.nextHop());
-
- return hosts.stream().findFirst()
- .map(host -> new ResolvedRoute(route, host.mac(), host.vlan()))
- .orElse(null);
- }
-
- private ResolvedRoute decide(ResolvedRoute route1, ResolvedRoute route2) {
- return Comparator.comparing(ResolvedRoute::nextHop)
- .compare(route1, route2) <= 0 ? route1 : route2;
- }
-
- private void store(ResolvedRoute route, Set<ResolvedRoute> alternatives) {
+ void store(ResolvedRoute route, Set<ResolvedRoute> alternatives) {
post(resolvedRouteStore.updateRoute(route, alternatives));
}
- private void remove(IpPrefix prefix) {
+ void remove(IpPrefix prefix) {
post(resolvedRouteStore.removeRoute(prefix));
}
- private void resolve(RouteSet routes) {
- if (routes.routes() == null) {
- // The routes were removed before we got to them, nothing to do
- return;
- }
- Set<ResolvedRoute> resolvedRoutes = routes.routes().stream()
- .map(this::resolve)
- .filter(Objects::nonNull)
- .collect(Collectors.toSet());
-
- Optional<ResolvedRoute> bestRoute = resolvedRoutes.stream()
- .reduce(this::decide);
-
- if (bestRoute.isPresent()) {
- store(bestRoute.get(), resolvedRoutes);
- } else {
- remove(routes.prefix());
- }
- }
-
private void hostUpdated(Host host) {
hostChanged(host);
}
@@ -301,12 +238,8 @@
}
private void hostChanged(Host host) {
- synchronized (this) {
- host.ipAddresses().stream()
- .flatMap(ip -> routeStore.getRoutesForNextHop(ip).stream())
- .map(route -> routeStore.getRoutes(route.prefix()))
- .forEach(this::resolve);
- }
+ routeStore.getRoutesForNextHops(host.ipAddresses())
+ .forEach(routeSet -> routeResolver.resolve(routeSet));
}
/**
@@ -377,10 +310,10 @@
public void notify(InternalRouteEvent event) {
switch (event.type()) {
case ROUTE_ADDED:
- resolve(event.subject());
+ routeResolver.resolve(event.subject());
break;
case ROUTE_REMOVED:
- resolve(event.subject());
+ routeResolver.resolve(event.subject());
break;
default:
break;
@@ -399,11 +332,11 @@
case HOST_UPDATED:
case HOST_MOVED:
log.trace("Scheduled host event {}", event);
- hostEventExecutor.execute(() -> hostUpdated(event.subject()));
+ hostEventExecutors.execute(() -> hostUpdated(event.subject()), event.subject().id().hashCode());
break;
case HOST_REMOVED:
log.trace("Scheduled host event {}", event);
- hostEventExecutor.execute(() -> hostRemoved(event.subject()));
+ hostEventExecutors.execute(() -> hostRemoved(event.subject()), event.subject().id().hashCode());
break;
default:
break;