LLDPLinkProvider listens to MastershipEvents

Change-Id: Iaa3655c680a8fc93921f0b83dc4fc16311222bf9
diff --git a/providers/lldp/src/main/java/org/onlab/onos/provider/lldp/impl/LLDPLinkProvider.java b/providers/lldp/src/main/java/org/onlab/onos/provider/lldp/impl/LLDPLinkProvider.java
index 6dc464e..b8905ae 100644
--- a/providers/lldp/src/main/java/org/onlab/onos/provider/lldp/impl/LLDPLinkProvider.java
+++ b/providers/lldp/src/main/java/org/onlab/onos/provider/lldp/impl/LLDPLinkProvider.java
@@ -20,6 +20,8 @@
 import org.apache.felix.scr.annotations.Deactivate;
 import org.apache.felix.scr.annotations.Reference;
 import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.onlab.onos.mastership.MastershipEvent;
+import org.onlab.onos.mastership.MastershipListener;
 import org.onlab.onos.mastership.MastershipService;
 import org.onlab.onos.net.ConnectPoint;
 import org.onlab.onos.net.Device;
@@ -40,7 +42,11 @@
 
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 
+import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
+import static org.onlab.util.Tools.namedThreads;
 import static org.slf4j.LoggerFactory.getLogger;
 
 
@@ -67,11 +73,17 @@
 
     private LinkProviderService providerService;
 
+    private ScheduledExecutorService executor;
+
     private final boolean useBDDP = true;
 
+    private static final long INIT_DELAY = 5;
+    private static final long DELAY = 5;
 
     private final InternalLinkProvider listener = new InternalLinkProvider();
 
+    private final InternalRoleListener roleListener = new InternalRoleListener();
+
     protected final Map<DeviceId, LinkDiscovery> discoverers = new ConcurrentHashMap<>();
 
     /**
@@ -86,6 +98,8 @@
         providerService = providerRegistry.register(this);
         deviceService.addListener(listener);
         packetSevice.addProcessor(listener, 0);
+        masterService.addListener(roleListener);
+
         LinkDiscovery ld;
         for (Device device : deviceService.getDevices()) {
             ld = new LinkDiscovery(device, packetSevice, masterService,
@@ -98,22 +112,57 @@
             }
         }
 
+        executor = newSingleThreadScheduledExecutor(namedThreads("device-sync-%d"));
+        executor.scheduleAtFixedRate(new SyncDeviceInfoTask(), INIT_DELAY,
+                DELAY, TimeUnit.SECONDS);
+
         log.info("Started");
     }
 
     @Deactivate
     public void deactivate() {
+        executor.shutdownNow();
         for (LinkDiscovery ld : discoverers.values()) {
             ld.stop();
         }
         providerRegistry.unregister(this);
         deviceService.removeListener(listener);
         packetSevice.removeProcessor(listener);
+        masterService.removeListener(roleListener);
         providerService = null;
 
         log.info("Stopped");
     }
 
+    private class InternalRoleListener implements MastershipListener {
+
+        @Override
+        public void event(MastershipEvent event) {
+
+            if (MastershipEvent.Type.BACKUPS_CHANGED.equals(event.type())) {
+                // only need new master events
+                return;
+            }
+
+            DeviceId deviceId = event.subject();
+            Device device = deviceService.getDevice(deviceId);
+            if (device == null) {
+                log.warn("Device {} doesn't exist, or isn't there yet", deviceId);
+                return;
+            }
+            synchronized (discoverers) {
+                if (!discoverers.containsKey(deviceId)) {
+                    // TODO: ideally, should never reach here
+                    log.debug("Device mastership changed ({}) {}",
+                            event.type(), deviceId);
+                    discoverers.put(deviceId, new LinkDiscovery(device,
+                            packetSevice, masterService, providerService,
+                            useBDDP));
+                }
+            }
+        }
+
+    }
 
     private class InternalLinkProvider implements PacketProcessor, DeviceListener {
 
@@ -131,17 +180,21 @@
             switch (event.type()) {
                 case DEVICE_ADDED:
                 case DEVICE_UPDATED:
+                synchronized (discoverers) {
                     ld = discoverers.get(deviceId);
                     if (ld == null) {
-                        log.debug("Device added ({}) {}", event.type(), deviceId);
-                        discoverers.put(deviceId,
-                               new LinkDiscovery(device, packetSevice, masterService,
-                                      providerService, useBDDP));
+                        log.debug("Device added ({}) {}", event.type(),
+                                    deviceId);
+                        discoverers.put(deviceId, new LinkDiscovery(device,
+                                packetSevice, masterService, providerService,
+                                useBDDP));
                     } else {
                         if (ld.isStopped()) {
-                            log.debug("Device restarted ({}) {}", event.type(), deviceId);
+                            log.debug("Device restarted ({}) {}", event.type(),
+                                    deviceId);
                             ld.start();
                         }
+                        }
                     }
                     break;
                 case PORT_ADDED:
@@ -193,15 +246,6 @@
                         ld.stop();
                     }
                     break;
-                case DEVICE_MASTERSHIP_CHANGED:
-                    if (!discoverers.containsKey(deviceId)) {
-                        // TODO: ideally, should never reach here
-                        log.debug("Device mastership changed ({}) {}", event.type(), deviceId);
-                        discoverers.put(deviceId,
-                               new LinkDiscovery(device, packetSevice, masterService,
-                                      providerService, useBDDP));
-                    }
-                    break;
                 default:
                     log.debug("Unknown event {}", event);
             }
@@ -224,4 +268,37 @@
         }
     }
 
+    private final class SyncDeviceInfoTask implements Runnable {
+
+        @Override
+        public void run() {
+            if (Thread.currentThread().isInterrupted()) {
+                log.info("Interrupted, quitting");
+                return;
+            }
+            // check what deviceService sees, to see if we are missing anything
+            try {
+                LinkDiscovery ld = null;
+                for (Device dev : deviceService.getDevices()) {
+                    DeviceId did = dev.id();
+                    synchronized (discoverers) {
+                        if (!discoverers.containsKey(did)) {
+                            ld = new LinkDiscovery(dev, packetSevice,
+                                    masterService, providerService, useBDDP);
+                            discoverers.put(did, ld);
+                            for (Port p : deviceService.getPorts(did)) {
+                                if (!p.number().isLogical()) {
+                                    ld.addPort(p);
+                                }
+                            }
+                        }
+                    }
+                }
+            } catch (Exception e) {
+                // catch all Exception to avoid Scheduled task being suppressed.
+                log.error("Exception thrown during synchronization process", e);
+            }
+        }
+    }
+
 }