Add StatusChangedListener for maps in RouteTables

Change-Id: I02c8558567ac416ea62fea79f856c331da7282ad
diff --git a/incubator/store/src/main/java/org/onosproject/incubator/store/routing/impl/DefaultRouteTable.java b/incubator/store/src/main/java/org/onosproject/incubator/store/routing/impl/DefaultRouteTable.java
index e9537d9..1231abe 100644
--- a/incubator/store/src/main/java/org/onosproject/incubator/store/routing/impl/DefaultRouteTable.java
+++ b/incubator/store/src/main/java/org/onosproject/incubator/store/routing/impl/DefaultRouteTable.java
@@ -26,6 +26,7 @@
 import org.onosproject.incubator.net.routing.RouteTableId;
 import org.onosproject.store.serializers.KryoNamespaces;
 import org.onosproject.store.service.ConsistentMap;
+import org.onosproject.store.service.DistributedPrimitive;
 import org.onosproject.store.service.MapEvent;
 import org.onosproject.store.service.MapEventListener;
 import org.onosproject.store.service.Serializer;
@@ -36,6 +37,8 @@
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.function.Consumer;
 import java.util.stream.Collectors;
 
 import static com.google.common.base.Preconditions.checkNotNull;
@@ -48,27 +51,43 @@
     private final RouteTableId id;
     private final ConsistentMap<IpPrefix, Set<Route>> routes;
     private final RouteStoreDelegate delegate;
+    private final ExecutorService executor;
     private final RouteTableListener listener = new RouteTableListener();
 
+    private final Consumer<DistributedPrimitive.Status> statusChangeListener;
+
     /**
      * Creates a new route table.
      *
      * @param id route table ID
      * @param delegate route store delegate to notify of events
      * @param storageService storage service
+     * @param executor executor service
      */
     public DefaultRouteTable(RouteTableId id, RouteStoreDelegate delegate,
-                             StorageService storageService) {
+                             StorageService storageService, ExecutorService executor) {
         this.delegate = checkNotNull(delegate);
         this.id = checkNotNull(id);
         this.routes = buildRouteMap(checkNotNull(storageService));
+        this.executor = checkNotNull(executor);
 
+        statusChangeListener = status -> {
+            if (status.equals(DistributedPrimitive.Status.ACTIVE)) {
+                executor.execute(this::notifyExistingRoutes);
+            }
+        };
+        routes.addStatusChangeListener(statusChangeListener);
+
+        notifyExistingRoutes();
+
+        routes.addListener(listener);
+    }
+
+    private void notifyExistingRoutes() {
         routes.entrySet().stream()
                 .map(e -> new InternalRouteEvent(InternalRouteEvent.Type.ROUTE_ADDED,
                         new RouteSet(id, e.getKey(), e.getValue().value())))
                 .forEach(delegate::notify);
-
-        routes.addListener(listener);
     }
 
     private ConsistentMap<IpPrefix, Set<Route>> buildRouteMap(StorageService storageService) {
@@ -91,6 +110,7 @@
 
     @Override
     public void shutdown() {
+        routes.removeStatusChangeListener(statusChangeListener);
         routes.removeListener(listener);
     }
 
@@ -186,4 +206,5 @@
             }
         }
     }
+
 }
diff --git a/incubator/store/src/main/java/org/onosproject/incubator/store/routing/impl/DistributedRouteStore.java b/incubator/store/src/main/java/org/onosproject/incubator/store/routing/impl/DistributedRouteStore.java
index 6901041..34e281a 100644
--- a/incubator/store/src/main/java/org/onosproject/incubator/store/routing/impl/DistributedRouteStore.java
+++ b/incubator/store/src/main/java/org/onosproject/incubator/store/routing/impl/DistributedRouteStore.java
@@ -44,6 +44,8 @@
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 
+import static org.onlab.util.Tools.groupedThreads;
+
 /**
  * Route store based on distributed storage.
  */
@@ -75,7 +77,7 @@
      */
     public void activate() {
         routeTables = new ConcurrentHashMap<>();
-        executor = Executors.newSingleThreadExecutor();
+        executor = Executors.newSingleThreadExecutor(groupedThreads("onos/route", "store", log));
 
         KryoNamespace masterRouteTableSerializer = KryoNamespace.newBuilder()
                 .register(RouteTableId.class)
@@ -173,7 +175,7 @@
     }
 
     private void createRouteTable(RouteTableId tableId) {
-        routeTables.computeIfAbsent(tableId, id -> new DefaultRouteTable(id, ourDelegate, storageService));
+        routeTables.computeIfAbsent(tableId, id -> new DefaultRouteTable(id, ourDelegate, storageService, executor));
     }
 
     private void destroyRouteTable(RouteTableId tableId) {