Add StatusChangedListener for maps in RouteTables
Change-Id: I02c8558567ac416ea62fea79f856c331da7282ad
diff --git a/incubator/store/src/main/java/org/onosproject/incubator/store/routing/impl/DefaultRouteTable.java b/incubator/store/src/main/java/org/onosproject/incubator/store/routing/impl/DefaultRouteTable.java
index e9537d9..1231abe 100644
--- a/incubator/store/src/main/java/org/onosproject/incubator/store/routing/impl/DefaultRouteTable.java
+++ b/incubator/store/src/main/java/org/onosproject/incubator/store/routing/impl/DefaultRouteTable.java
@@ -26,6 +26,7 @@
import org.onosproject.incubator.net.routing.RouteTableId;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.service.ConsistentMap;
+import org.onosproject.store.service.DistributedPrimitive;
import org.onosproject.store.service.MapEvent;
import org.onosproject.store.service.MapEventListener;
import org.onosproject.store.service.Serializer;
@@ -36,6 +37,8 @@
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.function.Consumer;
import java.util.stream.Collectors;
import static com.google.common.base.Preconditions.checkNotNull;
@@ -48,27 +51,43 @@
private final RouteTableId id;
private final ConsistentMap<IpPrefix, Set<Route>> routes;
private final RouteStoreDelegate delegate;
+ private final ExecutorService executor;
private final RouteTableListener listener = new RouteTableListener();
+ private final Consumer<DistributedPrimitive.Status> statusChangeListener;
+
/**
* Creates a new route table.
*
* @param id route table ID
* @param delegate route store delegate to notify of events
* @param storageService storage service
+ * @param executor executor service
*/
public DefaultRouteTable(RouteTableId id, RouteStoreDelegate delegate,
- StorageService storageService) {
+ StorageService storageService, ExecutorService executor) {
this.delegate = checkNotNull(delegate);
this.id = checkNotNull(id);
this.routes = buildRouteMap(checkNotNull(storageService));
+ this.executor = checkNotNull(executor);
+ statusChangeListener = status -> {
+ if (status.equals(DistributedPrimitive.Status.ACTIVE)) {
+ executor.execute(this::notifyExistingRoutes);
+ }
+ };
+ routes.addStatusChangeListener(statusChangeListener);
+
+ notifyExistingRoutes();
+
+ routes.addListener(listener);
+ }
+
+ private void notifyExistingRoutes() {
routes.entrySet().stream()
.map(e -> new InternalRouteEvent(InternalRouteEvent.Type.ROUTE_ADDED,
new RouteSet(id, e.getKey(), e.getValue().value())))
.forEach(delegate::notify);
-
- routes.addListener(listener);
}
private ConsistentMap<IpPrefix, Set<Route>> buildRouteMap(StorageService storageService) {
@@ -91,6 +110,7 @@
@Override
public void shutdown() {
+ routes.removeStatusChangeListener(statusChangeListener);
routes.removeListener(listener);
}
@@ -186,4 +206,5 @@
}
}
}
+
}
diff --git a/incubator/store/src/main/java/org/onosproject/incubator/store/routing/impl/DistributedRouteStore.java b/incubator/store/src/main/java/org/onosproject/incubator/store/routing/impl/DistributedRouteStore.java
index 6901041..34e281a 100644
--- a/incubator/store/src/main/java/org/onosproject/incubator/store/routing/impl/DistributedRouteStore.java
+++ b/incubator/store/src/main/java/org/onosproject/incubator/store/routing/impl/DistributedRouteStore.java
@@ -44,6 +44,8 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import static org.onlab.util.Tools.groupedThreads;
+
/**
* Route store based on distributed storage.
*/
@@ -75,7 +77,7 @@
*/
public void activate() {
routeTables = new ConcurrentHashMap<>();
- executor = Executors.newSingleThreadExecutor();
+ executor = Executors.newSingleThreadExecutor(groupedThreads("onos/route", "store", log));
KryoNamespace masterRouteTableSerializer = KryoNamespace.newBuilder()
.register(RouteTableId.class)
@@ -173,7 +175,7 @@
}
private void createRouteTable(RouteTableId tableId) {
- routeTables.computeIfAbsent(tableId, id -> new DefaultRouteTable(id, ourDelegate, storageService));
+ routeTables.computeIfAbsent(tableId, id -> new DefaultRouteTable(id, ourDelegate, storageService, executor));
}
private void destroyRouteTable(RouteTableId tableId) {