Introduce DistributedLock to guarantee that minority nodes won't try to remove
routes that originated from majority nodes

Change-Id: I138236b77773df808ca1478a59dd44fd88e711b4
diff --git a/apps/route-service/app/src/main/java/org/onosproject/routeservice/impl/RouteMonitor.java b/apps/route-service/app/src/main/java/org/onosproject/routeservice/impl/RouteMonitor.java
index 24d2aff..ede60db 100644
--- a/apps/route-service/app/src/main/java/org/onosproject/routeservice/impl/RouteMonitor.java
+++ b/apps/route-service/app/src/main/java/org/onosproject/routeservice/impl/RouteMonitor.java
@@ -19,11 +19,13 @@
 import org.onosproject.cluster.ClusterEvent;
 import org.onosproject.cluster.ClusterEventListener;
 import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.ControllerNode;
 import org.onosproject.cluster.NodeId;
 import org.onosproject.routeservice.ResolvedRoute;
 import org.onosproject.routeservice.Route;
 import org.onosproject.routeservice.RouteAdminService;
 import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.AsyncDistributedLock;
 import org.onosproject.store.service.DistributedPrimitive;
 import org.onosproject.store.service.Serializer;
 import org.onosproject.store.service.StorageService;
@@ -31,6 +33,7 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.time.Duration;
 import java.util.Collection;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.stream.Collectors;
@@ -46,12 +49,15 @@
     private final Logger log = LoggerFactory.getLogger(this.getClass());
 
     private static final String TOPIC = "route-reaper";
+    private static final String LOCK_NAME = "route-monitor-lock";
     private static final int NUM_PARALLEL_JOBS = 10;
 
     private RouteAdminService routeService;
     private final ClusterService clusterService;
     private StorageService storageService;
 
+    private final AsyncDistributedLock asyncLock;
+
     private WorkQueue<NodeId> queue;
 
     private final InternalClusterListener clusterListener = new InternalClusterListener();
@@ -72,6 +78,8 @@
         this.clusterService = clusterService;
         this.storageService = storageService;
 
+        asyncLock = storageService.lockBuilder().withName(LOCK_NAME).build();
+
         clusterService.addListener(clusterListener);
 
         queue = storageService.getWorkQueue(TOPIC, Serializer.using(KryoNamespaces.API));
@@ -86,6 +94,7 @@
     public void shutdown() {
         stopProcessing();
         clusterService.removeListener(clusterListener);
+        asyncLock.unlock();
     }
 
     private void statusChange(DistributedPrimitive.Status status) {
@@ -112,16 +121,23 @@
 
     private void cleanRoutes(NodeId node) {
         log.info("Cleaning routes from unavailable node {}", node);
-
         Collection<Route> routes = routeService.getRouteTables().stream()
                 .flatMap(id -> routeService.getRoutes(id).stream())
                 .flatMap(route -> route.allRoutes().stream())
                 .map(ResolvedRoute::route)
                 .filter(r -> r.sourceNode().equals(node))
                 .collect(Collectors.toList());
+        if (node.equals(clusterService.getLocalNode().id())) {
+            log.debug("Do not remove routes from local nodes {}", node);
+            return;
+        }
+
+        if (clusterService.getState(node) == ControllerNode.State.READY) {
+            log.debug("Do not remove routes from active nodes {}", node);
+            return;
+        }
 
         log.debug("Withdrawing routes: {}", routes);
-
         routeService.withdraw(routes);
     }
 
@@ -133,7 +149,19 @@
             case INSTANCE_DEACTIVATED:
                 NodeId id = event.subject().id();
                 log.info("Node {} deactivated", id);
-                queue.addOne(id);
+
+                // DistributedLock is introduced to guarantee that minority nodes won't try to remove
+                // routes that originated from majority nodes.
+                // Adding 15 seconds retry for the leadership election to be completed.
+                asyncLock.tryLock(Duration.ofSeconds(15)).whenComplete((result, error) -> {
+                    if (result != null && result.isPresent()) {
+                        log.debug("Lock obtained. Put {} into removal queue", id);
+                        queue.addOne(id);
+                        asyncLock.unlock();
+                    } else {
+                        log.debug("Fail to obtain lock. Do not remove routes from {}", id);
+                    }
+                });
                 break;
             case INSTANCE_ADDED:
             case INSTANCE_REMOVED:
@@ -144,5 +172,4 @@
             }
         }
     }
-
 }
diff --git a/apps/route-service/app/src/test/java/org/onosproject/routeservice/impl/RouteManagerTest.java b/apps/route-service/app/src/test/java/org/onosproject/routeservice/impl/RouteManagerTest.java
index 3ba34da..c6c0f82 100644
--- a/apps/route-service/app/src/test/java/org/onosproject/routeservice/impl/RouteManagerTest.java
+++ b/apps/route-service/app/src/test/java/org/onosproject/routeservice/impl/RouteManagerTest.java
@@ -47,6 +47,9 @@
 import org.onosproject.net.host.HostService;
 import org.onosproject.net.host.HostServiceAdapter;
 import org.onosproject.net.provider.ProviderId;
+import org.onosproject.store.service.AsyncDistributedLock;
+import org.onosproject.store.service.DistributedLock;
+import org.onosproject.store.service.DistributedLockBuilder;
 import org.onosproject.store.service.StorageService;
 import org.onosproject.store.service.WorkQueue;
 
@@ -104,6 +107,18 @@
         routeManager.clusterService = createNiceMock(ClusterService.class);
         replay(routeManager.clusterService);
         routeManager.storageService = createNiceMock(StorageService.class);
+
+        AsyncDistributedLock adl = createNiceMock(AsyncDistributedLock.class);
+        expect(adl.asLock()).andReturn(createNiceMock(DistributedLock.class));
+        replay(adl);
+
+        DistributedLockBuilder dlb = createNiceMock(DistributedLockBuilder.class);
+        expect(dlb.withName(anyString())).andReturn(dlb);
+        expect(dlb.build()).andReturn(adl);
+        replay(dlb);
+
+        expect(routeManager.storageService.lockBuilder())
+                .andReturn(dlb);
         expect(routeManager.storageService.getWorkQueue(anyString(), anyObject()))
                 .andReturn(createNiceMock(WorkQueue.class));
         replay(routeManager.storageService);