[AETHER-72] Refactoring RouteService
- to use bulk updates interface
- to use new getRoutesForNextHops API
- to use multi-thread resolver
- to use multi-thread hostexec
- to use a concurrent hashmap instead of synchronized
- to use a non-blocking resolved store
Additionally updates unit tests
Change-Id: Id960abd0f2a1b03066ce34b6a2f72b76566bb58c
diff --git a/apps/route-service/api/src/main/java/org/onosproject/routeservice/RouteStore.java b/apps/route-service/api/src/main/java/org/onosproject/routeservice/RouteStore.java
index 5eaa898..2171fcb 100644
--- a/apps/route-service/api/src/main/java/org/onosproject/routeservice/RouteStore.java
+++ b/apps/route-service/api/src/main/java/org/onosproject/routeservice/RouteStore.java
@@ -38,6 +38,13 @@
void updateRoute(Route route);
/**
+ * Adds or updates the given routes in the store.
+ *
+ * @param routes routes to add or update
+ */
+ void updateRoutes(Collection<Route> routes);
+
+ /**
* Removes the given route from the store.
*
* @param route route to remove
@@ -45,6 +52,13 @@
void removeRoute(Route route);
/**
+ * Removes the given routes from the store.
+ *
+ * @param routes routes to remove
+ */
+ void removeRoutes(Collection<Route> routes);
+
+ /**
* Replaces the all the routes for a prefix
* with the given route.
*
@@ -79,6 +93,14 @@
Collection<Route> getRoutesForNextHop(IpAddress ip);
/**
+ * Returns all routes that point to any of the given next hops IP addresses.
+ *
+ * @param nextHops next hops IP addresses
+ * @return collection of routes sets
+ */
+ Collection<RouteSet> getRoutesForNextHops(Collection<IpAddress> nextHops);
+
+ /**
* Returns the set of routes in the default route table for the given prefix.
*
* @param prefix IP prefix
diff --git a/apps/route-service/api/src/test/java/org/onosproject/routeservice/RouteStoreAdapter.java b/apps/route-service/api/src/test/java/org/onosproject/routeservice/RouteStoreAdapter.java
index 117a98b..1ede0fb 100644
--- a/apps/route-service/api/src/test/java/org/onosproject/routeservice/RouteStoreAdapter.java
+++ b/apps/route-service/api/src/test/java/org/onosproject/routeservice/RouteStoreAdapter.java
@@ -31,11 +31,21 @@
}
@Override
+ public void updateRoutes(Collection<Route> routes) {
+
+ }
+
+ @Override
public void removeRoute(Route route) {
}
@Override
+ public void removeRoutes(Collection<Route> routes) {
+
+ }
+
+ @Override
public void replaceRoute(Route route) {
}
@@ -56,6 +66,11 @@
}
@Override
+ public Collection<RouteSet> getRoutesForNextHops(Collection<IpAddress> nextHops) {
+ return null;
+ }
+
+ @Override
public RouteSet getRoutes(IpPrefix prefix) {
return null;
}
diff --git a/apps/route-service/app/src/main/java/org/onosproject/routeservice/impl/DefaultResolvedRouteStore.java b/apps/route-service/app/src/main/java/org/onosproject/routeservice/impl/DefaultResolvedRouteStore.java
index 725a7a9..51ef8c7 100644
--- a/apps/route-service/app/src/main/java/org/onosproject/routeservice/impl/DefaultResolvedRouteStore.java
+++ b/apps/route-service/app/src/main/java/org/onosproject/routeservice/impl/DefaultResolvedRouteStore.java
@@ -17,7 +17,6 @@
package org.onosproject.routeservice.impl;
import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Maps;
import com.googlecode.concurrenttrees.common.KeyValuePair;
import com.googlecode.concurrenttrees.radix.node.concrete.DefaultByteArrayNodeFactory;
import com.googlecode.concurrenttrees.radixinverted.ConcurrentInvertedRadixTree;
@@ -121,7 +120,7 @@
routeTable = new ConcurrentInvertedRadixTree<>(
new DefaultByteArrayNodeFactory());
- alternativeRoutes = Maps.newHashMap();
+ alternativeRoutes = new ConcurrentHashMap<>();
}
/**
@@ -133,7 +132,6 @@
public RouteEvent update(ResolvedRoute route, Set<ResolvedRoute> alternatives) {
Set<ResolvedRoute> immutableAlternatives = checkAlternatives(route, alternatives);
- synchronized (this) {
ResolvedRoute oldRoute = routeTable.put(createBinaryString(route.prefix()), route);
Set<ResolvedRoute> oldRoutes = alternativeRoutes.put(route.prefix(), immutableAlternatives);
@@ -153,7 +151,6 @@
}
return null;
- }
}
/**
@@ -181,18 +178,16 @@
* @param prefix prefix to remove
*/
public RouteEvent remove(IpPrefix prefix) {
- synchronized (this) {
- String key = createBinaryString(prefix);
+ String key = createBinaryString(prefix);
- ResolvedRoute route = routeTable.getValueForExactKey(key);
- Set<ResolvedRoute> alternatives = alternativeRoutes.remove(prefix);
+ ResolvedRoute route = routeTable.getValueForExactKey(key);
+ Set<ResolvedRoute> alternatives = alternativeRoutes.remove(prefix);
- if (route != null) {
- routeTable.remove(key);
- return new RouteEvent(RouteEvent.Type.ROUTE_REMOVED, route, alternatives);
- }
- return null;
+ if (route != null) {
+ routeTable.remove(key);
+ return new RouteEvent(RouteEvent.Type.ROUTE_REMOVED, route, alternatives);
}
+ return null;
}
/**
diff --git a/apps/route-service/app/src/main/java/org/onosproject/routeservice/impl/RouteManager.java b/apps/route-service/app/src/main/java/org/onosproject/routeservice/impl/RouteManager.java
index 72d5453..25a74ae 100644
--- a/apps/route-service/app/src/main/java/org/onosproject/routeservice/impl/RouteManager.java
+++ b/apps/route-service/app/src/main/java/org/onosproject/routeservice/impl/RouteManager.java
@@ -19,6 +19,7 @@
import com.google.common.collect.ImmutableList;
import org.onlab.packet.IpAddress;
import org.onlab.packet.IpPrefix;
+import org.onlab.util.PredictableExecutor;
import org.onosproject.cluster.ClusterService;
import org.onosproject.net.Host;
import org.onosproject.net.host.HostEvent;
@@ -32,7 +33,6 @@
import org.onosproject.routeservice.RouteInfo;
import org.onosproject.routeservice.RouteListener;
import org.onosproject.routeservice.RouteService;
-import org.onosproject.routeservice.RouteSet;
import org.onosproject.routeservice.RouteStore;
import org.onosproject.routeservice.RouteStoreDelegate;
import org.onosproject.routeservice.RouteTableId;
@@ -45,16 +45,12 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.annotation.concurrent.GuardedBy;
import java.util.Collection;
-import java.util.Comparator;
-import java.util.HashMap;
import java.util.Map;
-import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.Executor;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
@@ -71,6 +67,8 @@
private final Logger log = LoggerFactory.getLogger(getClass());
+ private static final int DEFAULT_BUCKETS = 0;
+
private RouteStoreDelegate delegate = new InternalRouteStoreDelegate();
private InternalHostListener hostListener = new InternalHostListener();
@@ -90,18 +88,21 @@
private RouteMonitor routeMonitor;
- @GuardedBy(value = "this")
- private Map<RouteListener, ListenerQueue> listeners = new HashMap<>();
+ protected RouteResolver routeResolver;
+
+ private Map<RouteListener, ListenerQueue> listeners = new ConcurrentHashMap<>();
private ThreadFactory threadFactory;
- protected Executor hostEventExecutor = newSingleThreadExecutor(
- groupedThreads("rm-event-host", "%d", log));
+ protected PredictableExecutor hostEventExecutors;
@Activate
protected void activate() {
routeMonitor = new RouteMonitor(this, clusterService, storageService);
+ routeResolver = new RouteResolver(this, hostService);
threadFactory = groupedThreads("onos/route", "listener-%d", log);
+ hostEventExecutors = new PredictableExecutor(DEFAULT_BUCKETS, groupedThreads("onos/route-manager",
+ "event-host-%d", log));
resolvedRouteStore = new DefaultResolvedRouteStore();
@@ -110,15 +111,14 @@
routeStore.getRouteTables().stream()
.flatMap(id -> routeStore.getRoutes(id).stream())
- .forEach(this::resolve);
+ .forEach(routeSet -> routeResolver.resolve(routeSet));
}
@Deactivate
protected void deactivate() {
routeMonitor.shutdown();
- synchronized (this) {
- listeners.values().forEach(ListenerQueue::stop);
- }
+ routeResolver.shutdown();
+ listeners.values().forEach(ListenerQueue::stop);
routeStore.unsetDelegate(delegate);
hostService.removeListener(hostListener);
@@ -136,8 +136,9 @@
*/
@Override
public void addListener(RouteListener listener) {
- synchronized (this) {
- log.debug("Synchronizing current routes to new listener");
+ log.debug("Synchronizing current routes to new listener");
+ ListenerQueue listenerQueue = listeners.compute(listener, (key, value) -> {
+ // Create listener regardless the existence of a previous value
ListenerQueue l = createListenerQueue(listener);
resolvedRouteStore.getRouteTables().stream()
.map(resolvedRouteStore::getRoutes)
@@ -145,21 +146,18 @@
.map(route -> new RouteEvent(RouteEvent.Type.ROUTE_ADDED, route,
resolvedRouteStore.getAllRoutes(route.prefix())))
.forEach(l::post);
-
- listeners.put(listener, l);
-
- l.start();
- log.debug("Route synchronization complete");
- }
+ return l;
+ });
+ // Start draining the events
+ listenerQueue.start();
+ log.debug("Route synchronization complete");
}
@Override
public void removeListener(RouteListener listener) {
- synchronized (this) {
- ListenerQueue l = listeners.remove(listener);
- if (l != null) {
- l.stop();
- }
+ ListenerQueue l = listeners.remove(listener);
+ if (l != null) {
+ l.stop();
}
}
@@ -171,16 +169,10 @@
private void post(RouteEvent event) {
if (event != null) {
log.debug("Sending event {}", event);
- synchronized (this) {
- listeners.values().forEach(l -> l.post(event));
- }
+ listeners.values().forEach(l -> l.post(event));
}
}
- private Collection<Route> reformatRoutes(Collection<RouteSet> routeSets) {
- return routeSets.stream().flatMap(r -> r.routes().stream()).collect(Collectors.toList());
- }
-
@Override
public Collection<RouteTableId> getRouteTables() {
return routeStore.getRouteTables();
@@ -190,24 +182,11 @@
public Collection<RouteInfo> getRoutes(RouteTableId id) {
return routeStore.getRoutes(id).stream()
.map(routeSet -> new RouteInfo(routeSet.prefix(),
- resolvedRouteStore.getRoute(routeSet.prefix()).orElse(null), resolveRouteSet(routeSet)))
+ resolvedRouteStore.getRoute(routeSet.prefix()).orElse(null),
+ routeResolver.resolveRouteSet(routeSet)))
.collect(Collectors.toList());
}
- private Set<ResolvedRoute> resolveRouteSet(RouteSet routeSet) {
- return routeSet.routes().stream()
- .map(this::tryResolve)
- .collect(Collectors.toSet());
- }
-
- private ResolvedRoute tryResolve(Route route) {
- ResolvedRoute resolvedRoute = resolve(route);
- if (resolvedRoute == null) {
- resolvedRoute = new ResolvedRoute(route, null, null);
- }
- return resolvedRoute;
- }
-
@Override
public Collection<ResolvedRoute> getResolvedRoutes(RouteTableId id) {
return resolvedRouteStore.getRoutes(id);
@@ -225,22 +204,14 @@
@Override
public void update(Collection<Route> routes) {
- synchronized (this) {
- routes.forEach(route -> {
- log.debug("Received update {}", route);
- routeStore.updateRoute(route);
- });
- }
+ log.debug("Received update {}", routes);
+ routeStore.updateRoutes(routes);
}
@Override
public void withdraw(Collection<Route> routes) {
- synchronized (this) {
- routes.forEach(route -> {
- log.debug("Received withdraw {}", route);
- routeStore.removeRoute(route);
- });
- }
+ log.debug("Received withdraw {}", routes);
+ routeStore.removeRoutes(routes);
}
@Override
@@ -250,48 +221,14 @@
.orElse(null);
}
- private ResolvedRoute resolve(Route route) {
- hostService.startMonitoringIp(route.nextHop());
- Set<Host> hosts = hostService.getHostsByIp(route.nextHop());
-
- return hosts.stream().findFirst()
- .map(host -> new ResolvedRoute(route, host.mac(), host.vlan()))
- .orElse(null);
- }
-
- private ResolvedRoute decide(ResolvedRoute route1, ResolvedRoute route2) {
- return Comparator.comparing(ResolvedRoute::nextHop)
- .compare(route1, route2) <= 0 ? route1 : route2;
- }
-
- private void store(ResolvedRoute route, Set<ResolvedRoute> alternatives) {
+ void store(ResolvedRoute route, Set<ResolvedRoute> alternatives) {
post(resolvedRouteStore.updateRoute(route, alternatives));
}
- private void remove(IpPrefix prefix) {
+ void remove(IpPrefix prefix) {
post(resolvedRouteStore.removeRoute(prefix));
}
- private void resolve(RouteSet routes) {
- if (routes.routes() == null) {
- // The routes were removed before we got to them, nothing to do
- return;
- }
- Set<ResolvedRoute> resolvedRoutes = routes.routes().stream()
- .map(this::resolve)
- .filter(Objects::nonNull)
- .collect(Collectors.toSet());
-
- Optional<ResolvedRoute> bestRoute = resolvedRoutes.stream()
- .reduce(this::decide);
-
- if (bestRoute.isPresent()) {
- store(bestRoute.get(), resolvedRoutes);
- } else {
- remove(routes.prefix());
- }
- }
-
private void hostUpdated(Host host) {
hostChanged(host);
}
@@ -301,12 +238,8 @@
}
private void hostChanged(Host host) {
- synchronized (this) {
- host.ipAddresses().stream()
- .flatMap(ip -> routeStore.getRoutesForNextHop(ip).stream())
- .map(route -> routeStore.getRoutes(route.prefix()))
- .forEach(this::resolve);
- }
+ routeStore.getRoutesForNextHops(host.ipAddresses())
+ .forEach(routeSet -> routeResolver.resolve(routeSet));
}
/**
@@ -377,10 +310,10 @@
public void notify(InternalRouteEvent event) {
switch (event.type()) {
case ROUTE_ADDED:
- resolve(event.subject());
+ routeResolver.resolve(event.subject());
break;
case ROUTE_REMOVED:
- resolve(event.subject());
+ routeResolver.resolve(event.subject());
break;
default:
break;
@@ -399,11 +332,11 @@
case HOST_UPDATED:
case HOST_MOVED:
log.trace("Scheduled host event {}", event);
- hostEventExecutor.execute(() -> hostUpdated(event.subject()));
+ hostEventExecutors.execute(() -> hostUpdated(event.subject()), event.subject().id().hashCode());
break;
case HOST_REMOVED:
log.trace("Scheduled host event {}", event);
- hostEventExecutor.execute(() -> hostRemoved(event.subject()));
+ hostEventExecutors.execute(() -> hostRemoved(event.subject()), event.subject().id().hashCode());
break;
default:
break;
diff --git a/apps/route-service/app/src/main/java/org/onosproject/routeservice/impl/RouteResolver.java b/apps/route-service/app/src/main/java/org/onosproject/routeservice/impl/RouteResolver.java
new file mode 100644
index 0000000..5925904
--- /dev/null
+++ b/apps/route-service/app/src/main/java/org/onosproject/routeservice/impl/RouteResolver.java
@@ -0,0 +1,123 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.routeservice.impl;
+
+import org.onlab.util.PredictableExecutor;
+import org.onosproject.net.Host;
+import org.onosproject.net.host.HostService;
+import org.onosproject.routeservice.ResolvedRoute;
+import org.onosproject.routeservice.Route;
+import org.onosproject.routeservice.RouteSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Comparator;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.onlab.util.Tools.groupedThreads;
+
+/**
+ * Resolves routes in multi-thread fashion.
+ */
+class RouteResolver {
+
+ private final Logger log = LoggerFactory.getLogger(this.getClass());
+ private static final int DEFAULT_BUCKETS = 0;
+ private RouteManager routeManager;
+ private HostService hostService;
+ protected PredictableExecutor routeResolvers;
+
+ /**
+ * Creates a new route resolver.
+ *
+ * @param routeManager route service
+ * @param hostService host service
+ */
+ RouteResolver(RouteManager routeManager, HostService hostService) {
+ this.routeManager = routeManager;
+ this.hostService = hostService;
+ routeResolvers = new PredictableExecutor(DEFAULT_BUCKETS, groupedThreads("onos/route-resolver",
+ "route-resolver-%d", log));
+ }
+
+ /**
+ * Shuts down the route resolver.
+ */
+ void shutdown() {
+ routeResolvers.shutdown();
+ }
+
+ private ResolvedRoute tryResolve(Route route) {
+ ResolvedRoute resolvedRoute = resolve(route);
+ if (resolvedRoute == null) {
+ resolvedRoute = new ResolvedRoute(route, null, null);
+ }
+ return resolvedRoute;
+ }
+
+ // Used by external reads
+ Set<ResolvedRoute> resolveRouteSet(RouteSet routeSet) {
+ return routeSet.routes().stream()
+ .map(this::tryResolve)
+ .collect(Collectors.toSet());
+ }
+
+ // Used by external reads and by resolvers
+ ResolvedRoute resolve(Route route) {
+ hostService.startMonitoringIp(route.nextHop());
+ Set<Host> hosts = hostService.getHostsByIp(route.nextHop());
+
+ return hosts.stream().findFirst()
+ .map(host -> new ResolvedRoute(route, host.mac(), host.vlan()))
+ .orElse(null);
+ }
+
+ private ResolvedRoute decide(ResolvedRoute route1, ResolvedRoute route2) {
+ return Comparator.comparing(ResolvedRoute::nextHop)
+ .compare(route1, route2) <= 0 ? route1 : route2;
+ }
+
+ private void resolveInternal(RouteSet routes) {
+ if (routes.routes() == null) {
+ // The routes were removed before we got to them, nothing to do
+ return;
+ }
+
+ Set<ResolvedRoute> resolvedRoutes = routes.routes().stream()
+ .map(this::resolve)
+ .filter(Objects::nonNull)
+ .collect(Collectors.toSet());
+
+ Optional<ResolvedRoute> bestRoute = resolvedRoutes.stream()
+ .reduce(this::decide);
+
+ if (bestRoute.isPresent()) {
+ routeManager.store(bestRoute.get(), resolvedRoutes);
+ } else {
+ routeManager.remove(routes.prefix());
+ }
+ }
+
+ // Offload to the resolvers using prefix hashcode as hint
+ // TODO Remove RouteManager reference using PickyCallable
+ void resolve(RouteSet routes) {
+ routeResolvers.execute(() -> resolveInternal(routes), routes.prefix().hashCode());
+ }
+}
diff --git a/apps/route-service/app/src/main/java/org/onosproject/routeservice/store/DefaultRouteTable.java b/apps/route-service/app/src/main/java/org/onosproject/routeservice/store/DefaultRouteTable.java
index f052f04..c24f005 100644
--- a/apps/route-service/app/src/main/java/org/onosproject/routeservice/store/DefaultRouteTable.java
+++ b/apps/route-service/app/src/main/java/org/onosproject/routeservice/store/DefaultRouteTable.java
@@ -18,6 +18,7 @@
import java.util.Collection;
import java.util.Collections;
+import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ExecutorService;
@@ -43,6 +44,7 @@
import org.onosproject.store.service.StorageService;
import org.onosproject.store.service.Versioned;
+
import static com.google.common.base.MoreObjects.toStringHelper;
import static com.google.common.base.Preconditions.checkNotNull;
@@ -133,6 +135,14 @@
}
@Override
+ public void update(Collection<Route> routesAdded) {
+ Map<String, Collection<? extends RawRoute>> computedRoutes = new HashMap<>();
+ computeRoutesToAdd(routesAdded).forEach((prefix, routes) -> computedRoutes.computeIfAbsent(
+ prefix, k -> Sets.newHashSet(routes)));
+ routes.putAll(computedRoutes);
+ }
+
+ @Override
public void remove(Route route) {
getRoutes(route.prefix())
.routes()
@@ -145,6 +155,14 @@
}
@Override
+ public void remove(Collection<Route> routesRemoved) {
+ Map<String, Collection<? extends RawRoute>> computedRoutes = new HashMap<>();
+ computeRoutesToRemove(routesRemoved).forEach((prefix, routes) -> computedRoutes.computeIfAbsent(
+ prefix, k -> Sets.newHashSet(routes)));
+ routes.removeAll(computedRoutes);
+ }
+
+ @Override
public void replace(Route route) {
routes.replaceValues(route.prefix().toString(), Sets.newHashSet(new RawRoute(route)));
}
@@ -180,6 +198,53 @@
.collect(Collectors.toSet());
}
+ @Override
+ public Collection<RouteSet> getRoutesForNextHops(Collection<IpAddress> nextHops) {
+ // First create a reduced snapshot of the store iterating one time the map
+ Map<String, Collection<? extends RawRoute>> filteredRouteStore = new HashMap<>();
+ routes.values().stream()
+ .filter(r -> nextHops.contains(IpAddress.valueOf(r.nextHop())))
+ .forEach(r -> filteredRouteStore.computeIfAbsent(r.prefix, k -> {
+ // We need to get all the routes because the resolve logic
+ // will use the alternatives as well
+ Versioned<Collection<? extends RawRoute>> routeSet = routes.get(k);
+ if (routeSet != null) {
+ return routeSet.value();
+ }
+ return null;
+ }));
+ // Return the collection of the routeSet we have to resolve
+ return filteredRouteStore.entrySet().stream()
+ .map(entry -> new RouteSet(id, IpPrefix.valueOf(entry.getKey()),
+ entry.getValue().stream().map(RawRoute::route).collect(Collectors.toSet())))
+ .collect(Collectors.toSet());
+ }
+
+ private Map<String, Collection<RawRoute>> computeRoutesToAdd(Collection<Route> routesAdded) {
+ Map<String, Collection<RawRoute>> computedRoutes = new HashMap<>();
+ routesAdded.forEach(route -> {
+ Collection<RawRoute> tempRoutes = computedRoutes.computeIfAbsent(
+ route.prefix().toString(), k -> Sets.newHashSet());
+ tempRoutes.add(new RawRoute(route));
+ });
+ return computedRoutes;
+ }
+
+ private Map<String, Collection<RawRoute>> computeRoutesToRemove(Collection<Route> routesRemoved) {
+ Map<String, Collection<RawRoute>> computedRoutes = new HashMap<>();
+ routesRemoved.forEach(route -> getRoutes(route.prefix())
+ .routes()
+ .stream()
+ .filter(r -> r.equals(route))
+ .findAny()
+ .ifPresent(matchRoute -> {
+ Collection<RawRoute> tempRoutes = computedRoutes.computeIfAbsent(
+ matchRoute.prefix().toString(), k -> Sets.newHashSet());
+ tempRoutes.add(new RawRoute(matchRoute));
+ }));
+ return computedRoutes;
+ }
+
private class RouteTableListener
implements MultimapEventListener<String, RawRoute> {
diff --git a/apps/route-service/app/src/main/java/org/onosproject/routeservice/store/DistributedRouteStore.java b/apps/route-service/app/src/main/java/org/onosproject/routeservice/store/DistributedRouteStore.java
index efdb792..f2661a1 100644
--- a/apps/route-service/app/src/main/java/org/onosproject/routeservice/store/DistributedRouteStore.java
+++ b/apps/route-service/app/src/main/java/org/onosproject/routeservice/store/DistributedRouteStore.java
@@ -17,6 +17,7 @@
package org.onosproject.routeservice.store;
import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
import org.onlab.packet.IpAddress;
import org.onlab.packet.IpPrefix;
import org.onlab.util.KryoNamespace;
@@ -37,11 +38,13 @@
import java.util.Collection;
import java.util.Collections;
+import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.stream.Collectors;
import static org.onlab.util.Tools.groupedThreads;
@@ -116,11 +119,27 @@
}
@Override
+ public void updateRoutes(Collection<Route> routes) {
+ Map<RouteTableId, Set<Route>> computedTables = computeRouteTablesFromRoutes(routes);
+ computedTables.forEach(
+ ((routeTableId, routesToAdd) -> getDefaultRouteTable(routeTableId).update(routesToAdd))
+ );
+ }
+
+ @Override
public void removeRoute(Route route) {
getDefaultRouteTable(route).remove(route);
}
@Override
+ public void removeRoutes(Collection<Route> routes) {
+ Map<RouteTableId, Set<Route>> computedTables = computeRouteTablesFromRoutes(routes);
+ computedTables.forEach(
+ ((routeTableId, routesToRemove) -> getDefaultRouteTable(routeTableId).remove(routesToRemove))
+ );
+ }
+
+ @Override
public void replaceRoute(Route route) {
getDefaultRouteTable(route).replace(route);
}
@@ -146,6 +165,15 @@
}
@Override
+ public Collection<RouteSet> getRoutesForNextHops(Collection<IpAddress> nextHops) {
+ Map<RouteTableId, Set<IpAddress>> computedTables = computeRouteTablesFromIps(nextHops);
+ return computedTables.entrySet().stream()
+ .map(entry -> getDefaultRouteTable(entry.getKey()).getRoutesForNextHops(entry.getValue()))
+ .flatMap(Collection::stream)
+ .collect(Collectors.toList());
+ }
+
+ @Override
public RouteSet getRoutes(IpPrefix prefix) {
return getDefaultRouteTable(prefix.address()).getRoutes(prefix);
}
@@ -170,6 +198,30 @@
return routeTables.getOrDefault(routeTableId, EmptyRouteTable.instance());
}
+ private RouteTable getDefaultRouteTable(RouteTableId routeTableId) {
+ return routeTables.getOrDefault(routeTableId, EmptyRouteTable.instance());
+ }
+
+ private Map<RouteTableId, Set<Route>> computeRouteTablesFromRoutes(Collection<Route> routes) {
+ Map<RouteTableId, Set<Route>> computedTables = new HashMap<>();
+ routes.forEach(route -> {
+ RouteTableId routeTableId = (route.prefix().address().isIp4()) ? IPV4 : IPV6;
+ Set<Route> tempRoutes = computedTables.computeIfAbsent(routeTableId, k -> Sets.newHashSet());
+ tempRoutes.add(route);
+ });
+ return computedTables;
+ }
+
+ private Map<RouteTableId, Set<IpAddress>> computeRouteTablesFromIps(Collection<IpAddress> ipAddresses) {
+ Map<RouteTableId, Set<IpAddress>> computedTables = new HashMap<>();
+ ipAddresses.forEach(ipAddress -> {
+ RouteTableId routeTableId = (ipAddress.isIp4()) ? IPV4 : IPV6;
+ Set<IpAddress> tempIpAddresses = computedTables.computeIfAbsent(routeTableId, k -> Sets.newHashSet());
+ tempIpAddresses.add(ipAddress);
+ });
+ return computedTables;
+ }
+
private class InternalRouteStoreDelegate implements RouteStoreDelegate {
@Override
public void notify(InternalRouteEvent event) {
diff --git a/apps/route-service/app/src/main/java/org/onosproject/routeservice/store/EmptyRouteTable.java b/apps/route-service/app/src/main/java/org/onosproject/routeservice/store/EmptyRouteTable.java
index 8bee10e..9201c1a 100644
--- a/apps/route-service/app/src/main/java/org/onosproject/routeservice/store/EmptyRouteTable.java
+++ b/apps/route-service/app/src/main/java/org/onosproject/routeservice/store/EmptyRouteTable.java
@@ -52,11 +52,21 @@
}
@Override
+ public void update(Collection<Route> routes) {
+
+ }
+
+ @Override
public void remove(Route route) {
}
@Override
+ public void remove(Collection<Route> routes) {
+
+ }
+
+ @Override
public void replace(Route route) {
}
@@ -82,6 +92,11 @@
}
@Override
+ public Collection<RouteSet> getRoutesForNextHops(Collection<IpAddress> nextHops) {
+ return Collections.emptyList();
+ }
+
+ @Override
public void shutdown() {
}
diff --git a/apps/route-service/app/src/main/java/org/onosproject/routeservice/store/LocalRouteStore.java b/apps/route-service/app/src/main/java/org/onosproject/routeservice/store/LocalRouteStore.java
index 04b0f38..c286ab1 100644
--- a/apps/route-service/app/src/main/java/org/onosproject/routeservice/store/LocalRouteStore.java
+++ b/apps/route-service/app/src/main/java/org/onosproject/routeservice/store/LocalRouteStore.java
@@ -16,6 +16,7 @@
package org.onosproject.routeservice.store;
+import com.google.common.collect.Sets;
import com.googlecode.concurrenttrees.common.KeyValuePair;
import com.googlecode.concurrenttrees.radix.node.concrete.DefaultByteArrayNodeFactory;
import com.googlecode.concurrenttrees.radixinverted.ConcurrentInvertedRadixTree;
@@ -35,6 +36,7 @@
import java.util.Collection;
import java.util.Collections;
+import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
@@ -82,11 +84,27 @@
}
@Override
+ public void updateRoutes(Collection<Route> routes) {
+ Map<RouteTableId, Set<Route>> computedTables = computeRouteTablesFromRoutes(routes);
+ computedTables.forEach(
+ ((routeTableId, routesToAdd) -> getDefaultRouteTable(routeTableId).update(routesToAdd))
+ );
+ }
+
+ @Override
public void removeRoute(Route route) {
getDefaultRouteTable(route).remove(route);
}
@Override
+ public void removeRoutes(Collection<Route> routes) {
+ Map<RouteTableId, Set<Route>> computedTables = computeRouteTablesFromRoutes(routes);
+ computedTables.forEach(
+ ((routeTableId, routesToRemove) -> getDefaultRouteTable(routeTableId).remove(routesToRemove))
+ );
+ }
+
+ @Override
public void replaceRoute(Route route) {
getDefaultRouteTable(route).replace(route);
}
@@ -111,6 +129,15 @@
}
@Override
+ public Collection<RouteSet> getRoutesForNextHops(Collection<IpAddress> nextHops) {
+ Map<RouteTableId, Set<IpAddress>> computedTables = computeRouteTablesFromIps(nextHops);
+ return computedTables.entrySet().stream()
+ .map(entry -> getDefaultRouteTable(entry.getKey()).getRoutesForNextHops(entry.getValue()))
+ .flatMap(Collection::stream)
+ .collect(Collectors.toList());
+ }
+
+ @Override
public RouteSet getRoutes(IpPrefix prefix) {
return getDefaultRouteTable(prefix.address()).getRoutes(prefix);
}
@@ -124,6 +151,30 @@
return routeTables.get(routeTableId);
}
+ private RouteTable getDefaultRouteTable(RouteTableId routeTableId) {
+ return routeTables.get(routeTableId);
+ }
+
+ private Map<RouteTableId, Set<Route>> computeRouteTablesFromRoutes(Collection<Route> routes) {
+ Map<RouteTableId, Set<Route>> computedTables = new HashMap<>();
+ routes.forEach(route -> {
+ RouteTableId routeTableId = (route.prefix().address().isIp4()) ? IPV4 : IPV6;
+ Set<Route> tempRoutes = computedTables.computeIfAbsent(routeTableId, k -> Sets.newHashSet());
+ tempRoutes.add(route);
+ });
+ return computedTables;
+ }
+
+ private Map<RouteTableId, Set<IpAddress>> computeRouteTablesFromIps(Collection<IpAddress> ipAddresses) {
+ Map<RouteTableId, Set<IpAddress>> computedTables = new HashMap<>();
+ ipAddresses.forEach(ipAddress -> {
+ RouteTableId routeTableId = (ipAddress.isIp4()) ? IPV4 : IPV6;
+ Set<IpAddress> tempIpAddresses = computedTables.computeIfAbsent(routeTableId, k -> Sets.newHashSet());
+ tempIpAddresses.add(ipAddress);
+ });
+ return computedTables;
+ }
+
/**
* Route table into which routes can be placed.
*/
@@ -163,6 +214,17 @@
}
/**
+ * Adds or updates the routes in the route table.
+ *
+ * @param routes routes to update
+ */
+ public void update(Collection<Route> routes) {
+ synchronized (this) {
+ routes.forEach(this::update);
+ }
+ }
+
+ /**
* Removes the route from the route table.
*
* @param route route to remove
@@ -180,6 +242,17 @@
}
/**
+ * Adds or updates the routes in the route table.
+ *
+ * @param routes routes to update
+ */
+ public void remove(Collection<Route> routes) {
+ synchronized (this) {
+ routes.forEach(this::remove);
+ }
+ }
+
+ /**
* Replace the route in the route table.
*/
public void replace(Route route) {
@@ -199,6 +272,28 @@
.collect(Collectors.toSet());
}
+ /**
+ * Returns the routes pointing to the next hops.
+ *
+ * @param ips next hops IP addresses
+ * @return routes for the next hop
+ */
+ public Collection<RouteSet> getRoutesForNextHops(Collection<IpAddress> ips) {
+ // First create a reduced snapshot of the store iterating one time the map
+ Map<IpPrefix, Set<Route>> filteredRouteStore = new HashMap<>();
+ routes.values().stream()
+ .filter(r -> ips.contains(r.nextHop()))
+ .forEach(r -> {
+ Collection<Route> tempRoutes = filteredRouteStore.computeIfAbsent(
+ r.prefix(), k -> Sets.newHashSet());
+ tempRoutes.add(r);
+ });
+ // Return the collection of the routeSet we have to resolve
+ return filteredRouteStore.entrySet().stream()
+ .map(entry -> new RouteSet(id, entry.getKey(), entry.getValue()))
+ .collect(Collectors.toSet());
+ }
+
public RouteSet getRoutes(IpPrefix prefix) {
Route route = routes.get(prefix);
if (route != null) {
diff --git a/apps/route-service/app/src/main/java/org/onosproject/routeservice/store/RouteStoreImpl.java b/apps/route-service/app/src/main/java/org/onosproject/routeservice/store/RouteStoreImpl.java
index edd22c3..f5c9d04 100644
--- a/apps/route-service/app/src/main/java/org/onosproject/routeservice/store/RouteStoreImpl.java
+++ b/apps/route-service/app/src/main/java/org/onosproject/routeservice/store/RouteStoreImpl.java
@@ -141,11 +141,21 @@
}
@Override
+ public void updateRoutes(Collection<Route> routes) {
+ currentRouteStore.updateRoutes(routes);
+ }
+
+ @Override
public void removeRoute(Route route) {
currentRouteStore.removeRoute(route);
}
@Override
+ public void removeRoutes(Collection<Route> routes) {
+ currentRouteStore.removeRoutes(routes);
+ }
+
+ @Override
public void replaceRoute(Route route) {
currentRouteStore.replaceRoute(route);
}
@@ -166,6 +176,11 @@
}
@Override
+ public Collection<RouteSet> getRoutesForNextHops(Collection<IpAddress> ips) {
+ return currentRouteStore.getRoutesForNextHops(ips);
+ }
+
+ @Override
public RouteSet getRoutes(IpPrefix prefix) {
return currentRouteStore.getRoutes(prefix);
}
diff --git a/apps/route-service/app/src/main/java/org/onosproject/routeservice/store/RouteTable.java b/apps/route-service/app/src/main/java/org/onosproject/routeservice/store/RouteTable.java
index 130654b..fddd116 100644
--- a/apps/route-service/app/src/main/java/org/onosproject/routeservice/store/RouteTable.java
+++ b/apps/route-service/app/src/main/java/org/onosproject/routeservice/store/RouteTable.java
@@ -37,6 +37,13 @@
void update(Route route);
/**
+ * Adds the routes to the route table.
+ *
+ * @param routes routes
+ */
+ void update(Collection<Route> routes);
+
+ /**
* Removes a route from the route table.
*
* @param route route
@@ -44,6 +51,13 @@
void remove(Route route);
/**
+ * Removes the routes from the route table.
+ *
+ * @param routes routes
+ */
+ void remove(Collection<Route> routes);
+
+ /**
* Replaces a route in the route table.
*
* @param route route
@@ -81,6 +95,14 @@
Collection<Route> getRoutesForNextHop(IpAddress nextHop);
/**
+ * Returns all routes that have the given next hops.
+ *
+ * @param nextHops next hops IP addresses
+ * @return collection of routes sets
+ */
+ Collection<RouteSet> getRoutesForNextHops(Collection<IpAddress> nextHops);
+
+ /**
* Releases route table resources held locally.
*/
void shutdown();
diff --git a/apps/route-service/app/src/test/java/org/onosproject/routeservice/impl/RouteManagerTest.java b/apps/route-service/app/src/test/java/org/onosproject/routeservice/impl/RouteManagerTest.java
index 3c02572..d075b32 100644
--- a/apps/route-service/app/src/test/java/org/onosproject/routeservice/impl/RouteManagerTest.java
+++ b/apps/route-service/app/src/test/java/org/onosproject/routeservice/impl/RouteManagerTest.java
@@ -19,7 +19,7 @@
import java.util.Collection;
import java.util.Collections;
-import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.collect.Lists;
import org.junit.Before;
import org.junit.Test;
import org.onlab.packet.Ip4Address;
@@ -30,6 +30,7 @@
import org.onlab.packet.IpPrefix;
import org.onlab.packet.MacAddress;
import org.onlab.packet.VlanId;
+import org.onlab.util.PredictableExecutor;
import org.onosproject.routeservice.ResolvedRoute;
import org.onosproject.routeservice.Route;
import org.onosproject.routeservice.RouteEvent;
@@ -65,6 +66,7 @@
import static org.easymock.EasyMock.replay;
import static org.easymock.EasyMock.reset;
import static org.easymock.EasyMock.verify;
+import static org.onlab.util.Tools.groupedThreads;
/**
* Unit tests for the route manager.
@@ -104,7 +106,6 @@
routeManager = new TestRouteManager();
routeManager.hostService = hostService;
- routeManager.hostEventExecutor = MoreExecutors.directExecutor();
routeManager.clusterService = createNiceMock(ClusterService.class);
replay(routeManager.clusterService);
@@ -130,6 +131,11 @@
routeManager.routeStore = routeStore;
routeManager.activate();
+ routeManager.hostEventExecutors = new PredictableExecutor(
+ 0, groupedThreads("onos/route-manager-test", "event-host-%d"), true);
+ routeManager.routeResolver.routeResolvers = new PredictableExecutor(
+ 0, groupedThreads("onos/route-resolver-test", "route-resolver-%d"), true);
+
routeManager.addListener(routeListener);
}
@@ -142,16 +148,16 @@
hostService.addListener(anyObject(HostListener.class));
expectLastCall().andDelegateTo(new TestHostService()).anyTimes();
- Host host1 = createHost(MAC1, V4_NEXT_HOP1);
+ Host host1 = createHost(MAC1, Collections.singletonList(V4_NEXT_HOP1));
expectHost(host1);
- Host host2 = createHost(MAC2, V4_NEXT_HOP2);
+ Host host2 = createHost(MAC2, Collections.singletonList(V4_NEXT_HOP2));
expectHost(host2);
- Host host3 = createHost(MAC3, V6_NEXT_HOP1);
+ Host host3 = createHost(MAC3, Collections.singletonList(V6_NEXT_HOP1));
expectHost(host3);
- Host host4 = createHost(MAC4, V6_NEXT_HOP2);
+ Host host4 = createHost(MAC4, Collections.singletonList(V6_NEXT_HOP2));
expectHost(host4);
replay(hostService);
@@ -176,13 +182,13 @@
* Creates a host with the given parameters.
*
* @param macAddress MAC address
- * @param ipAddress IP address
+ * @param ipAddresses IP addresses
* @return new host
*/
- private Host createHost(MacAddress macAddress, IpAddress ipAddress) {
+ private Host createHost(MacAddress macAddress, Collection<IpAddress> ipAddresses) {
return new DefaultHost(ProviderId.NONE, HostId.NONE, macAddress,
VlanId.NONE, new HostLocation(CP1, 1),
- Sets.newHashSet(ipAddress));
+ Sets.newHashSet(ipAddresses));
}
/**
@@ -343,12 +349,19 @@
@Test
public void testAsyncRouteAdd() {
Route route = new Route(Route.Source.STATIC, V4_PREFIX1, V4_NEXT_HOP1);
+ // 2nd route for the same nexthop
+ Route route2 = new Route(Route.Source.STATIC, V4_PREFIX2, V4_NEXT_HOP2);
+ // 3rd route with no valid nexthop
+ Route route3 = new Route(Route.Source.STATIC, V6_PREFIX1, V6_NEXT_HOP1);
+
// Host service will reply with no hosts when asked
reset(hostService);
expect(hostService.getHostsByIp(anyObject(IpAddress.class))).andReturn(
Collections.emptySet()).anyTimes();
hostService.startMonitoringIp(V4_NEXT_HOP1);
+ hostService.startMonitoringIp(V4_NEXT_HOP2);
+ hostService.startMonitoringIp(V6_NEXT_HOP1);
expectLastCall().anyTimes();
replay(hostService);
@@ -356,7 +369,7 @@
// the host is not known
replay(routeListener);
- routeManager.update(Collections.singleton(route));
+ routeManager.update(Lists.newArrayList(route, route2, route3));
verify(routeListener);
@@ -365,15 +378,21 @@
ResolvedRoute resolvedRoute = new ResolvedRoute(route, MAC1);
routeListener.event(event(RouteEvent.Type.ROUTE_ADDED, resolvedRoute, null,
Sets.newHashSet(resolvedRoute), null));
+ ResolvedRoute resolvedRoute2 = new ResolvedRoute(route2, MAC1);
+ routeListener.event(event(RouteEvent.Type.ROUTE_ADDED, resolvedRoute2, null,
+ Sets.newHashSet(resolvedRoute2), null));
replay(routeListener);
- Host host = createHost(MAC1, V4_NEXT_HOP1);
+ Host host = createHost(MAC1, Lists.newArrayList(V4_NEXT_HOP1, V4_NEXT_HOP2));
// Set up the host service with a host
reset(hostService);
expect(hostService.getHostsByIp(V4_NEXT_HOP1)).andReturn(
Collections.singleton(host)).anyTimes();
hostService.startMonitoringIp(V4_NEXT_HOP1);
+ expect(hostService.getHostsByIp(V4_NEXT_HOP2)).andReturn(
+ Collections.singleton(host)).anyTimes();
+ hostService.startMonitoringIp(V4_NEXT_HOP2);
expectLastCall().anyTimes();
replay(hostService);