Move routing from incubator to a separate app
Change-Id: I961d10af99c572b1f8d9b3d37c6f52dd04422007
diff --git a/apps/route-service/app/src/main/java/org/onosproject/routeservice/impl/ConfigurationRouteSource.java b/apps/route-service/app/src/main/java/org/onosproject/routeservice/impl/ConfigurationRouteSource.java
new file mode 100644
index 0000000..eb6733f
--- /dev/null
+++ b/apps/route-service/app/src/main/java/org/onosproject/routeservice/impl/ConfigurationRouteSource.java
@@ -0,0 +1,117 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.routeservice.impl;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.onosproject.core.ApplicationId;
+import org.onosproject.routeservice.Route;
+import org.onosproject.routeservice.RouteAdminService;
+import org.onosproject.routeservice.RouteConfig;
+import org.onosproject.net.config.ConfigFactory;
+import org.onosproject.net.config.NetworkConfigEvent;
+import org.onosproject.net.config.NetworkConfigListener;
+import org.onosproject.net.config.NetworkConfigRegistry;
+import org.onosproject.net.config.basics.SubjectFactories;
+
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Route source that installs static routes configured in the network configuration.
+ */
+@Component(immediate = true)
+public class ConfigurationRouteSource {
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected NetworkConfigRegistry netcfgRegistry;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected RouteAdminService routeService;
+
+ private final ConfigFactory<ApplicationId, RouteConfig> routeConfigFactory =
+ new ConfigFactory<ApplicationId, RouteConfig>(
+ SubjectFactories.APP_SUBJECT_FACTORY,
+ RouteConfig.class, "routes", true) {
+ @Override
+ public RouteConfig createConfig() {
+ return new RouteConfig();
+ }
+ };
+ private final InternalNetworkConfigListener netcfgListener =
+ new InternalNetworkConfigListener();
+
+ @Activate
+ protected void activate() {
+ netcfgRegistry.addListener(netcfgListener);
+ netcfgRegistry.registerConfigFactory(routeConfigFactory);
+ }
+
+ @Deactivate
+ protected void deactivate() {
+ netcfgRegistry.removeListener(netcfgListener);
+ netcfgRegistry.unregisterConfigFactory(routeConfigFactory);
+ }
+
+ private void processRouteConfigAdded(NetworkConfigEvent event) {
+ Set<Route> routes = ((RouteConfig) event.config().get()).getRoutes();
+ routeService.update(routes);
+ }
+
+ private void processRouteConfigUpdated(NetworkConfigEvent event) {
+ Set<Route> routes = ((RouteConfig) event.config().get()).getRoutes();
+ Set<Route> prevRoutes = ((RouteConfig) event.prevConfig().get()).getRoutes();
+ Set<Route> pendingRemove = prevRoutes.stream()
+ .filter(prevRoute -> routes.stream()
+ .noneMatch(route -> route.prefix().equals(prevRoute.prefix())))
+ .collect(Collectors.toSet());
+ Set<Route> pendingUpdate = routes.stream()
+ .filter(route -> !pendingRemove.contains(route)).collect(Collectors.toSet());
+ routeService.update(pendingUpdate);
+ routeService.withdraw(pendingRemove);
+ }
+
+ private void processRouteConfigRemoved(NetworkConfigEvent event) {
+ Set<Route> prevRoutes = ((RouteConfig) event.prevConfig().get()).getRoutes();
+ routeService.withdraw(prevRoutes);
+ }
+
+ private class InternalNetworkConfigListener implements
+ NetworkConfigListener {
+ @Override
+ public void event(NetworkConfigEvent event) {
+ if (event.configClass().equals(RouteConfig.class)) {
+ switch (event.type()) {
+ case CONFIG_ADDED:
+ processRouteConfigAdded(event);
+ break;
+ case CONFIG_UPDATED:
+ processRouteConfigUpdated(event);
+ break;
+ case CONFIG_REMOVED:
+ processRouteConfigRemoved(event);
+ break;
+ default:
+ break;
+ }
+ }
+ }
+ }
+}
diff --git a/apps/route-service/app/src/main/java/org/onosproject/routeservice/impl/DefaultResolvedRouteStore.java b/apps/route-service/app/src/main/java/org/onosproject/routeservice/impl/DefaultResolvedRouteStore.java
new file mode 100644
index 0000000..34aeaa7
--- /dev/null
+++ b/apps/route-service/app/src/main/java/org/onosproject/routeservice/impl/DefaultResolvedRouteStore.java
@@ -0,0 +1,234 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.routeservice.impl;
+
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Maps;
+import com.googlecode.concurrenttrees.common.KeyValuePair;
+import com.googlecode.concurrenttrees.radix.node.concrete.DefaultByteArrayNodeFactory;
+import com.googlecode.concurrenttrees.radixinverted.ConcurrentInvertedRadixTree;
+import com.googlecode.concurrenttrees.radixinverted.InvertedRadixTree;
+import org.onlab.packet.IpAddress;
+import org.onlab.packet.IpPrefix;
+import org.onlab.util.GuavaCollectors;
+import org.onlab.util.Tools;
+import org.onosproject.routeservice.ResolvedRoute;
+import org.onosproject.routeservice.RouteEvent;
+import org.onosproject.routeservice.RouteTableId;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+import static org.onosproject.routeservice.RouteTools.createBinaryString;
+
+/**
+ * Stores routes that have been resolved.
+ */
+public class DefaultResolvedRouteStore implements ResolvedRouteStore {
+
+ private Map<RouteTableId, RouteTable> routeTables;
+ private static final RouteTableId IPV4 = new RouteTableId("ipv4");
+ private static final RouteTableId IPV6 = new RouteTableId("ipv6");
+
+ /**
+ * Creates a new resolved route store.
+ */
+ public DefaultResolvedRouteStore() {
+ routeTables = new ConcurrentHashMap<>();
+
+ routeTables.put(IPV4, new RouteTable());
+ routeTables.put(IPV6, new RouteTable());
+ }
+
+ @Override
+ public RouteEvent updateRoute(ResolvedRoute route, Set<ResolvedRoute> alternatives) {
+ return getDefaultRouteTable(route).update(route, alternatives);
+ }
+
+ @Override
+ public RouteEvent removeRoute(IpPrefix prefix) {
+ RouteTable table = getDefaultRouteTable(prefix.address());
+ return table.remove(prefix);
+ }
+
+ @Override
+ public Set<RouteTableId> getRouteTables() {
+ return routeTables.keySet();
+ }
+
+ @Override
+ public Collection<ResolvedRoute> getRoutes(RouteTableId table) {
+ RouteTable routeTable = routeTables.get(table);
+ if (routeTable == null) {
+ return Collections.emptySet();
+ }
+ return routeTable.getRoutes();
+ }
+
+ @Override
+ public Optional<ResolvedRoute> getRoute(IpPrefix prefix) {
+ return getDefaultRouteTable(prefix.address()).getRoute(prefix);
+ }
+
+ @Override
+ public Collection<ResolvedRoute> getAllRoutes(IpPrefix prefix) {
+ return getDefaultRouteTable(prefix.address()).getAllRoutes(prefix);
+ }
+
+ @Override
+ public Optional<ResolvedRoute> longestPrefixMatch(IpAddress ip) {
+ return getDefaultRouteTable(ip).longestPrefixMatch(ip);
+ }
+
+ private RouteTable getDefaultRouteTable(ResolvedRoute route) {
+ return getDefaultRouteTable(route.prefix().address());
+ }
+
+ private RouteTable getDefaultRouteTable(IpAddress ip) {
+ RouteTableId routeTableId = (ip.isIp4()) ? IPV4 : IPV6;
+ return routeTables.get(routeTableId);
+ }
+
+ /**
+ * Route table into which routes can be placed.
+ */
+ private class RouteTable {
+ private final InvertedRadixTree<ResolvedRoute> routeTable;
+ private final Map<IpPrefix, Set<ResolvedRoute>> alternativeRoutes;
+
+ /**
+ * Creates a new route table.
+ */
+ public RouteTable() {
+ routeTable = new ConcurrentInvertedRadixTree<>(
+ new DefaultByteArrayNodeFactory());
+
+ alternativeRoutes = Maps.newHashMap();
+ }
+
+ /**
+ * Adds or updates the route in the route table.
+ *
+ * @param route route to update
+ * @param alternatives alternative routes
+ */
+ public RouteEvent update(ResolvedRoute route, Set<ResolvedRoute> alternatives) {
+ Set<ResolvedRoute> immutableAlternatives = checkAlternatives(route, alternatives);
+
+ synchronized (this) {
+ ResolvedRoute oldRoute = routeTable.put(createBinaryString(route.prefix()), route);
+ Set<ResolvedRoute> oldRoutes = alternativeRoutes.put(route.prefix(), immutableAlternatives);
+
+ if (!route.equals(oldRoute)) {
+ if (oldRoute == null) {
+ return new RouteEvent(RouteEvent.Type.ROUTE_ADDED, route,
+ immutableAlternatives);
+ } else {
+ return new RouteEvent(RouteEvent.Type.ROUTE_UPDATED, route,
+ oldRoute, immutableAlternatives);
+ }
+ }
+
+ if (!immutableAlternatives.equals(oldRoutes)) {
+ return new RouteEvent(RouteEvent.Type.ALTERNATIVE_ROUTES_CHANGED,
+ route, immutableAlternatives);
+ }
+
+ return null;
+ }
+ }
+
+ /**
+ * Checks that the best route is present in the alternatives list and
+ * returns an immutable set of alternatives.
+ *
+ * @param route best route
+ * @param alternatives alternatives
+ * @return immutable set of alternative routes
+ */
+ private Set<ResolvedRoute> checkAlternatives(ResolvedRoute route, Set<ResolvedRoute> alternatives) {
+ if (!alternatives.contains(route)) {
+ return ImmutableSet.<ResolvedRoute>builder()
+ .addAll(alternatives)
+ .add(route)
+ .build();
+ } else {
+ return ImmutableSet.copyOf(alternatives);
+ }
+ }
+
+ /**
+ * Removes the route from the route table.
+ *
+ * @param prefix prefix to remove
+ */
+ public RouteEvent remove(IpPrefix prefix) {
+ synchronized (this) {
+ String key = createBinaryString(prefix);
+
+ ResolvedRoute route = routeTable.getValueForExactKey(key);
+ alternativeRoutes.remove(prefix);
+
+ if (route != null) {
+ routeTable.remove(key);
+ return new RouteEvent(RouteEvent.Type.ROUTE_REMOVED, route);
+ }
+ return null;
+ }
+ }
+
+ /**
+ * Returns all routes in the route table.
+ *
+ * @return all routes
+ */
+ public Collection<ResolvedRoute> getRoutes() {
+ return Tools.stream(routeTable.getKeyValuePairsForKeysStartingWith(""))
+ .map(KeyValuePair::getValue)
+ .collect(GuavaCollectors.toImmutableList());
+ }
+
+ /**
+ * Returns the best route for the given prefix, if one exists.
+ *
+ * @param prefix IP prefix
+ * @return best route
+ */
+ public Optional<ResolvedRoute> getRoute(IpPrefix prefix) {
+ return Optional.ofNullable(routeTable.getValueForExactKey(createBinaryString(prefix)));
+ }
+
+ public Collection<ResolvedRoute> getAllRoutes(IpPrefix prefix) {
+ return alternativeRoutes.getOrDefault(prefix, Collections.emptySet());
+ }
+
+ /**
+ * Performs a longest prefix match with the given IP in the route table.
+ *
+ * @param ip IP address to look up
+ * @return most specific prefix containing the given
+ */
+ public Optional<ResolvedRoute> longestPrefixMatch(IpAddress ip) {
+ return Tools.stream(routeTable.getValuesForKeysPrefixing(createBinaryString(ip.toIpPrefix())))
+ .reduce((a, b) -> b); // reduces to the last element in the stream
+ }
+ }
+}
diff --git a/apps/route-service/app/src/main/java/org/onosproject/routeservice/impl/ListenerQueue.java b/apps/route-service/app/src/main/java/org/onosproject/routeservice/impl/ListenerQueue.java
new file mode 100644
index 0000000..93e80d5
--- /dev/null
+++ b/apps/route-service/app/src/main/java/org/onosproject/routeservice/impl/ListenerQueue.java
@@ -0,0 +1,43 @@
+/*
+ * Copyright 2016-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.routeservice.impl;
+
+import org.onosproject.routeservice.RouteEvent;
+
+/**
+ * Queues updates for a route listener to ensure they are received in the
+ * correct order.
+ */
+interface ListenerQueue {
+
+ /**
+ * Posts an event to the listener.
+ *
+ * @param event event
+ */
+ void post(RouteEvent event);
+
+ /**
+ * Initiates event delivery to the listener.
+ */
+ void start();
+
+ /**
+ * Halts event delivery to the listener.
+ */
+ void stop();
+}
diff --git a/apps/route-service/app/src/main/java/org/onosproject/routeservice/impl/ResolvedRouteStore.java b/apps/route-service/app/src/main/java/org/onosproject/routeservice/impl/ResolvedRouteStore.java
new file mode 100644
index 0000000..a6db108
--- /dev/null
+++ b/apps/route-service/app/src/main/java/org/onosproject/routeservice/impl/ResolvedRouteStore.java
@@ -0,0 +1,90 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.routeservice.impl;
+
+import org.onlab.packet.IpAddress;
+import org.onlab.packet.IpPrefix;
+import org.onosproject.routeservice.ResolvedRoute;
+import org.onosproject.routeservice.RouteEvent;
+import org.onosproject.routeservice.RouteTableId;
+
+import java.util.Collection;
+import java.util.Optional;
+import java.util.Set;
+
+/**
+ * Stores resolved routes and best route decisions.
+ */
+public interface ResolvedRouteStore {
+
+ /**
+ * Adds or updates the best route for the given prefix.
+ *
+ * @param route new best route for this prefix
+ * @param alternatives alternative resolved routes
+ * @return event describing the change
+ */
+ RouteEvent updateRoute(ResolvedRoute route, Set<ResolvedRoute> alternatives);
+
+ /**
+ * Removes the best route for the given prefix.
+ *
+ * @param prefix IP prefix
+ * @return event describing the change
+ */
+ RouteEvent removeRoute(IpPrefix prefix);
+
+ /**
+ * Gets the set of route tables.
+ *
+ * @return set of route table IDs
+ */
+ Set<RouteTableId> getRouteTables();
+
+ /**
+ * Returns the best routes for a give route table.
+ *
+ * @param table route table ID
+ * @return collection of selected routes
+ */
+ Collection<ResolvedRoute> getRoutes(RouteTableId table);
+
+ /**
+ * Returns the best selected route for the given IP prefix.
+ *
+ * @param prefix IP prefix
+ * @return optional best route
+ */
+ Optional<ResolvedRoute> getRoute(IpPrefix prefix);
+
+ /**
+ * Returns all resolved routes stored for the given prefix, including the
+ * best selected route.
+ *
+ * @param prefix IP prefix to look up routes for
+ * @return all stored resolved routes for this prefix
+ */
+ Collection<ResolvedRoute> getAllRoutes(IpPrefix prefix);
+
+ /**
+ * Performs a longest prefix match of the best routes on the given IP address.
+ *
+ * @param ip IP address
+ * @return optional longest matching route
+ */
+ Optional<ResolvedRoute> longestPrefixMatch(IpAddress ip);
+}
diff --git a/apps/route-service/app/src/main/java/org/onosproject/routeservice/impl/RouteManager.java b/apps/route-service/app/src/main/java/org/onosproject/routeservice/impl/RouteManager.java
new file mode 100644
index 0000000..e5db364
--- /dev/null
+++ b/apps/route-service/app/src/main/java/org/onosproject/routeservice/impl/RouteManager.java
@@ -0,0 +1,396 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.routeservice.impl;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.apache.felix.scr.annotations.Service;
+import org.onlab.packet.IpAddress;
+import org.onlab.packet.IpPrefix;
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.routeservice.InternalRouteEvent;
+import org.onosproject.routeservice.ResolvedRoute;
+import org.onosproject.routeservice.Route;
+import org.onosproject.routeservice.RouteAdminService;
+import org.onosproject.routeservice.RouteEvent;
+import org.onosproject.routeservice.RouteInfo;
+import org.onosproject.routeservice.RouteListener;
+import org.onosproject.routeservice.RouteService;
+import org.onosproject.routeservice.RouteSet;
+import org.onosproject.routeservice.RouteStore;
+import org.onosproject.routeservice.RouteStoreDelegate;
+import org.onosproject.routeservice.RouteTableId;
+import org.onosproject.net.Host;
+import org.onosproject.net.host.HostEvent;
+import org.onosproject.net.host.HostListener;
+import org.onosproject.net.host.HostService;
+import org.onosproject.store.service.StorageService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.GuardedBy;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.stream.Collectors;
+
+import static java.util.concurrent.Executors.newSingleThreadExecutor;
+import static org.onlab.util.Tools.groupedThreads;
+
+/**
+ * Implementation of the unicast route service.
+ */
+@Service
+@Component
+public class RouteManager implements RouteService, RouteAdminService {
+
+ private final Logger log = LoggerFactory.getLogger(getClass());
+
+ private RouteStoreDelegate delegate = new InternalRouteStoreDelegate();
+ private InternalHostListener hostListener = new InternalHostListener();
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected RouteStore routeStore;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected HostService hostService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected ClusterService clusterService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected StorageService storageService;
+
+ private ResolvedRouteStore resolvedRouteStore;
+
+ private RouteMonitor routeMonitor;
+
+ @GuardedBy(value = "this")
+ private Map<RouteListener, ListenerQueue> listeners = new HashMap<>();
+
+ private ThreadFactory threadFactory;
+
+ @Activate
+ protected void activate() {
+ routeMonitor = new RouteMonitor(this, clusterService, storageService);
+ threadFactory = groupedThreads("onos/route", "listener-%d", log);
+
+ resolvedRouteStore = new DefaultResolvedRouteStore();
+
+ routeStore.setDelegate(delegate);
+ hostService.addListener(hostListener);
+
+ routeStore.getRouteTables().stream()
+ .flatMap(id -> routeStore.getRoutes(id).stream())
+ .forEach(this::resolve);
+ }
+
+ @Deactivate
+ protected void deactivate() {
+ routeMonitor.shutdown();
+ listeners.values().forEach(ListenerQueue::stop);
+
+ routeStore.unsetDelegate(delegate);
+ hostService.removeListener(hostListener);
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * In a departure from other services in ONOS, calling addListener will
+ * cause all current routes to be pushed to the listener before any new
+ * events are sent. This allows a listener to easily get the exact set of
+ * routes without worrying about missing any.
+ *
+ * @param listener listener to be added
+ */
+ @Override
+ public void addListener(RouteListener listener) {
+ synchronized (this) {
+ log.debug("Synchronizing current routes to new listener");
+ ListenerQueue l = createListenerQueue(listener);
+ resolvedRouteStore.getRouteTables().stream()
+ .map(resolvedRouteStore::getRoutes)
+ .flatMap(Collection::stream)
+ .map(route -> new RouteEvent(RouteEvent.Type.ROUTE_ADDED, route,
+ resolvedRouteStore.getAllRoutes(route.prefix())))
+ .forEach(l::post);
+
+ listeners.put(listener, l);
+
+ l.start();
+ log.debug("Route synchronization complete");
+ }
+ }
+
+ @Override
+ public void removeListener(RouteListener listener) {
+ synchronized (this) {
+ ListenerQueue l = listeners.remove(listener);
+ if (l != null) {
+ l.stop();
+ }
+ }
+ }
+
+ /**
+ * Posts an event to all listeners.
+ *
+ * @param event event
+ */
+ private void post(RouteEvent event) {
+ if (event != null) {
+ log.debug("Sending event {}", event);
+ synchronized (this) {
+ listeners.values().forEach(l -> l.post(event));
+ }
+ }
+ }
+
+ private Collection<Route> reformatRoutes(Collection<RouteSet> routeSets) {
+ return routeSets.stream().flatMap(r -> r.routes().stream()).collect(Collectors.toList());
+ }
+
+ public Collection<RouteTableId> getRouteTables() {
+ return routeStore.getRouteTables();
+ }
+
+ @Override
+ public Collection<RouteInfo> getRoutes(RouteTableId id) {
+ return routeStore.getRoutes(id).stream()
+ .map(routeSet -> new RouteInfo(routeSet.prefix(),
+ resolvedRouteStore.getRoute(routeSet.prefix()).orElse(null), resolveRouteSet(routeSet)))
+ .collect(Collectors.toList());
+ }
+
+ private Set<ResolvedRoute> resolveRouteSet(RouteSet routeSet) {
+ return routeSet.routes().stream()
+ .map(this::tryResolve)
+ .collect(Collectors.toSet());
+ }
+
+ private ResolvedRoute tryResolve(Route route) {
+ ResolvedRoute resolvedRoute = resolve(route);
+ if (resolvedRoute == null) {
+ resolvedRoute = new ResolvedRoute(route, null, null);
+ }
+ return resolvedRoute;
+ }
+
+ @Override
+ public Optional<ResolvedRoute> longestPrefixLookup(IpAddress ip) {
+ return resolvedRouteStore.longestPrefixMatch(ip);
+ }
+
+ @Override
+ public void update(Collection<Route> routes) {
+ synchronized (this) {
+ routes.forEach(route -> {
+ log.debug("Received update {}", route);
+ routeStore.updateRoute(route);
+ });
+ }
+ }
+
+ @Override
+ public void withdraw(Collection<Route> routes) {
+ synchronized (this) {
+ routes.forEach(route -> {
+ log.debug("Received withdraw {}", route);
+ routeStore.removeRoute(route);
+ });
+ }
+ }
+
+ @Override
+ public Route longestPrefixMatch(IpAddress ip) {
+ return longestPrefixLookup(ip)
+ .map(ResolvedRoute::route)
+ .orElse(null);
+ }
+
+ private ResolvedRoute resolve(Route route) {
+ hostService.startMonitoringIp(route.nextHop());
+ Set<Host> hosts = hostService.getHostsByIp(route.nextHop());
+
+ Optional<Host> host = hosts.stream().findFirst();
+ if (host.isPresent()) {
+ return new ResolvedRoute(route, host.get().mac(), host.get().vlan(),
+ host.get().location());
+ } else {
+ return null;
+ }
+ }
+
+ private ResolvedRoute decide(ResolvedRoute route1, ResolvedRoute route2) {
+ return Comparator.comparing(ResolvedRoute::nextHop)
+ .compare(route1, route2) <= 0 ? route1 : route2;
+ }
+
+ private void store(ResolvedRoute route, Set<ResolvedRoute> alternatives) {
+ post(resolvedRouteStore.updateRoute(route, alternatives));
+ }
+
+ private void remove(IpPrefix prefix) {
+ post(resolvedRouteStore.removeRoute(prefix));
+ }
+
+ private void resolve(RouteSet routes) {
+ Set<ResolvedRoute> resolvedRoutes = routes.routes().stream()
+ .map(this::resolve)
+ .filter(Objects::nonNull)
+ .collect(Collectors.toSet());
+
+ Optional<ResolvedRoute> bestRoute = resolvedRoutes.stream()
+ .reduce(this::decide);
+
+ if (bestRoute.isPresent()) {
+ store(bestRoute.get(), resolvedRoutes);
+ } else {
+ remove(routes.prefix());
+ }
+ }
+
+ private void hostUpdated(Host host) {
+ hostChanged(host);
+ }
+
+ private void hostRemoved(Host host) {
+ hostChanged(host);
+ }
+
+ private void hostChanged(Host host) {
+ synchronized (this) {
+ host.ipAddresses().stream()
+ .flatMap(ip -> routeStore.getRoutesForNextHop(ip).stream())
+ .map(route -> routeStore.getRoutes(route.prefix()))
+ .forEach(this::resolve);
+ }
+ }
+
+ /**
+ * Creates a new listener queue.
+ *
+ * @param listener route listener
+ * @return listener queue
+ */
+ ListenerQueue createListenerQueue(RouteListener listener) {
+ return new DefaultListenerQueue(listener);
+ }
+
+ /**
+ * Default route listener queue.
+ */
+ private class DefaultListenerQueue implements ListenerQueue {
+
+ private final ExecutorService executorService;
+ private final BlockingQueue<RouteEvent> queue;
+ private final RouteListener listener;
+
+ /**
+ * Creates a new listener queue.
+ *
+ * @param listener route listener to queue updates for
+ */
+ public DefaultListenerQueue(RouteListener listener) {
+ this.listener = listener;
+ queue = new LinkedBlockingQueue<>();
+ executorService = newSingleThreadExecutor(threadFactory);
+ }
+
+ @Override
+ public void post(RouteEvent event) {
+ queue.add(event);
+ }
+
+ @Override
+ public void start() {
+ executorService.execute(this::poll);
+ }
+
+ @Override
+ public void stop() {
+ executorService.shutdown();
+ }
+
+ private void poll() {
+ while (true) {
+ try {
+ listener.event(queue.take());
+ } catch (InterruptedException e) {
+ log.info("Route listener event thread shutting down: {}", e.getMessage());
+ break;
+ } catch (Exception e) {
+ log.warn("Exception during route event handler", e);
+ }
+ }
+ }
+ }
+
+ /**
+ * Delegate to receive events from the route store.
+ */
+ private class InternalRouteStoreDelegate implements RouteStoreDelegate {
+ @Override
+ public void notify(InternalRouteEvent event) {
+ switch (event.type()) {
+ case ROUTE_ADDED:
+ resolve(event.subject());
+ break;
+ case ROUTE_REMOVED:
+ resolve(event.subject());
+ break;
+ default:
+ break;
+ }
+ }
+ }
+
+ /**
+ * Internal listener for host events.
+ */
+ private class InternalHostListener implements HostListener {
+ @Override
+ public void event(HostEvent event) {
+ switch (event.type()) {
+ case HOST_ADDED:
+ case HOST_UPDATED:
+ hostUpdated(event.subject());
+ break;
+ case HOST_REMOVED:
+ hostRemoved(event.subject());
+ break;
+ case HOST_MOVED:
+ break;
+ default:
+ break;
+ }
+ }
+ }
+
+}
diff --git a/apps/route-service/app/src/main/java/org/onosproject/routeservice/impl/RouteMonitor.java b/apps/route-service/app/src/main/java/org/onosproject/routeservice/impl/RouteMonitor.java
new file mode 100644
index 0000000..24d2aff
--- /dev/null
+++ b/apps/route-service/app/src/main/java/org/onosproject/routeservice/impl/RouteMonitor.java
@@ -0,0 +1,148 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.routeservice.impl;
+
+import org.onosproject.cluster.ClusterEvent;
+import org.onosproject.cluster.ClusterEventListener;
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.NodeId;
+import org.onosproject.routeservice.ResolvedRoute;
+import org.onosproject.routeservice.Route;
+import org.onosproject.routeservice.RouteAdminService;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.DistributedPrimitive;
+import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.StorageService;
+import org.onosproject.store.service.WorkQueue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.stream.Collectors;
+
+import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
+import static org.onlab.util.Tools.groupedThreads;
+
+/**
+ * Monitors cluster nodes and removes routes if a cluster node becomes unavailable.
+ */
+public class RouteMonitor {
+
+ private final Logger log = LoggerFactory.getLogger(this.getClass());
+
+ private static final String TOPIC = "route-reaper";
+ private static final int NUM_PARALLEL_JOBS = 10;
+
+ private RouteAdminService routeService;
+ private final ClusterService clusterService;
+ private StorageService storageService;
+
+ private WorkQueue<NodeId> queue;
+
+ private final InternalClusterListener clusterListener = new InternalClusterListener();
+
+ private final ScheduledExecutorService reaperExecutor =
+ newSingleThreadScheduledExecutor(groupedThreads("route/reaper", "", log));
+
+ /**
+ * Creates a new route monitor.
+ *
+ * @param routeService route service
+ * @param clusterService cluster service
+ * @param storageService storage service
+ */
+ public RouteMonitor(RouteAdminService routeService,
+ ClusterService clusterService, StorageService storageService) {
+ this.routeService = routeService;
+ this.clusterService = clusterService;
+ this.storageService = storageService;
+
+ clusterService.addListener(clusterListener);
+
+ queue = storageService.getWorkQueue(TOPIC, Serializer.using(KryoNamespaces.API));
+ queue.addStatusChangeListener(this::statusChange);
+
+ startProcessing();
+ }
+
+ /**
+ * Shuts down the route monitor.
+ */
+ public void shutdown() {
+ stopProcessing();
+ clusterService.removeListener(clusterListener);
+ }
+
+ private void statusChange(DistributedPrimitive.Status status) {
+ switch (status) {
+ case ACTIVE:
+ startProcessing();
+ break;
+ case SUSPENDED:
+ stopProcessing();
+ break;
+ case INACTIVE:
+ default:
+ break;
+ }
+ }
+
+ private void startProcessing() {
+ queue.registerTaskProcessor(this::cleanRoutes, NUM_PARALLEL_JOBS, reaperExecutor);
+ }
+
+ private void stopProcessing() {
+ queue.stopProcessing();
+ }
+
+ private void cleanRoutes(NodeId node) {
+ log.info("Cleaning routes from unavailable node {}", node);
+
+ Collection<Route> routes = routeService.getRouteTables().stream()
+ .flatMap(id -> routeService.getRoutes(id).stream())
+ .flatMap(route -> route.allRoutes().stream())
+ .map(ResolvedRoute::route)
+ .filter(r -> r.sourceNode().equals(node))
+ .collect(Collectors.toList());
+
+ log.debug("Withdrawing routes: {}", routes);
+
+ routeService.withdraw(routes);
+ }
+
+ private class InternalClusterListener implements ClusterEventListener {
+
+ @Override
+ public void event(ClusterEvent event) {
+ switch (event.type()) {
+ case INSTANCE_DEACTIVATED:
+ NodeId id = event.subject().id();
+ log.info("Node {} deactivated", id);
+ queue.addOne(id);
+ break;
+ case INSTANCE_ADDED:
+ case INSTANCE_REMOVED:
+ case INSTANCE_ACTIVATED:
+ case INSTANCE_READY:
+ default:
+ break;
+ }
+ }
+ }
+
+}
diff --git a/apps/route-service/app/src/main/java/org/onosproject/routeservice/impl/package-info.java b/apps/route-service/app/src/main/java/org/onosproject/routeservice/impl/package-info.java
new file mode 100644
index 0000000..037bde4
--- /dev/null
+++ b/apps/route-service/app/src/main/java/org/onosproject/routeservice/impl/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Copyright 2016-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Implementation of route service.
+ */
+package org.onosproject.routeservice.impl;