Add device service session health check and recovery.

Change-Id: I1c5902367653d08af6f74eeb45e60a2de6257b86
diff --git a/ecord/co/src/main/java/org/onosproject/ecord/co/BigSwitchDeviceProvider.java b/ecord/co/src/main/java/org/onosproject/ecord/co/BigSwitchDeviceProvider.java
index 10cd467..da27b3a 100644
--- a/ecord/co/src/main/java/org/onosproject/ecord/co/BigSwitchDeviceProvider.java
+++ b/ecord/co/src/main/java/org/onosproject/ecord/co/BigSwitchDeviceProvider.java
@@ -22,7 +22,6 @@
 import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.RemovalCause;
 import com.google.common.cache.RemovalNotification;
-
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
@@ -80,6 +79,7 @@
 import java.net.URI;
 import java.nio.ByteBuffer;
 import java.util.Dictionary;
+import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.ScheduledExecutorService;
@@ -122,6 +122,8 @@
      */
     private static final int PROBE_INTERVAL = 3;
 
+    private static final long HEALTHCHECK_INTERVAL = 5;
+
     private static final String PROP_SCHEME = "providerScheme";
     private static final String DEFAULT_SCHEME = "bigswitch";
     private static final String PROP_ID = "providerId";
@@ -160,7 +162,11 @@
     private LinkProviderRegistry linkProviderRegistry;
     private LinkProviderService linkProviderService;
     private ScheduledExecutorService executor;
-    private ScheduledFuture<?> future;
+    private ScheduledFuture<?> discovery;
+    /**
+     * Device session heart beat task handle.
+     */
+    private ScheduledFuture<?> heartBeat;
 
     private BigSwitchListener listener = new InternalListener();
 
@@ -216,8 +222,8 @@
             log.warn("Unsupported URI: {}", remoteUri);
         }
         providerId = new ProviderId(schemeProp, idProp);
-        registerToDeviceProvider();
         executor = newSingleThreadScheduledExecutor(groupedThreads("onos/bigswitch", "discovery-%d"));
+        registerToDeviceProvider();
         prepareProbe();
         registerToLinkServices();
 
@@ -258,6 +264,7 @@
             // register to Device and Link Providers with new parameters
             try {
                 remoteServiceContext = rpcService.get(URI.create(remoteUri));
+                providerId = new ProviderId(schemeProp, idProp);
                 registerToDeviceProvider();
                 registerToLinkServices();
             } catch (UnsupportedOperationException e) {
@@ -327,10 +334,16 @@
         deviceProviderRegistry = remoteServiceContext.get(DeviceProviderRegistry.class);
         deviceProviderService = deviceProviderRegistry.register(this);
         // Start big switch service and register device
-        deviceProviderService.deviceConnected(bigSwitch.id(), bigSwitchDescription);
-        deviceProviderService.updatePorts(bigSwitch.id(), bigSwitchService.getPorts());
+        advertiseDevice(bigSwitch.id());
+        advertisePorts(bigSwitch.id(), bigSwitchService.getPorts());
         advertiseCrossConnectLinksOnAllPorts();
         bigSwitchService.addListener(listener);
+
+        heartBeat = executor.scheduleAtFixedRate(new HeartBeatTask(),
+                                                   HEALTHCHECK_INTERVAL,
+                                                   HEALTHCHECK_INTERVAL,
+                                                   TimeUnit.SECONDS);
+
     }
 
     private void registerToLinkServices() {
@@ -338,7 +351,7 @@
         linkProviderRegistry = remoteServiceContext.get(LinkProviderRegistry.class);
         linkProviderService = linkProviderRegistry.register(this);
 
-        future = executor.scheduleAtFixedRate(new DiscoveryTask(),
+        discovery = executor.scheduleAtFixedRate(new DiscoveryTask(),
                                               PROBE_INTERVAL, PROBE_INTERVAL, TimeUnit.SECONDS);
 
         // maybe also want a way to say 'get me next usable priority of class X'
@@ -346,17 +359,26 @@
     }
 
     private void unregisterFromDeviceProvider() {
+        heartBeat.cancel(true);
+
         if (bigSwitch == null) {
             log.warn("Invalid unregistration.");
             return;
         }
-        deviceProviderService.deviceDisconnected(bigSwitch.id());
+        bigSwitchService.removeListener(listener);
+
+        try {
+            deviceProviderService.deviceDisconnected(bigSwitch.id());
+        } catch (IllegalStateException e) {
+            log.warn("Exception on deviceDisconnected", e);
+        }
         deviceProviderRegistry.unregister(this);
+        deviceProviderService = null;
         bigSwitch = null;
     }
 
     private void unregisterFromLinkServices() {
-        future.cancel(true);
+        discovery.cancel(true);
         packetService.removeProcessor(packetProcessor);
         linkProviderRegistry.unregister(this);
     }
@@ -466,20 +488,67 @@
         return ProbedLinkProvider.fingerprintMac(metadataService.getClusterMetadata());
     }
 
+    void safeUnregister() {
+        try {
+            deviceProviderRegistry.unregister(this);
+        } catch (IllegalStateException e) {
+            // silently-ignore error
+        }
+    }
+
+    void advertiseDevice(DeviceId did) {
+        try {
+            deviceProviderService.deviceConnected(did, bigSwitchDescription);
+        } catch (IllegalStateException e) {
+            log.warn("Exception thrown", e);
+            safeUnregister();
+            deviceProviderService = deviceProviderRegistry.register(this);
+            deviceProviderService.deviceConnected(did, bigSwitchDescription);
+            // retry
+            deviceProviderService.deviceConnected(did, bigSwitchDescription);
+        }
+    }
+
+    void advertisePort(DeviceId did, PortDescription port) {
+        try {
+            deviceProviderService.portStatusChanged(did, port);
+        } catch (IllegalStateException e) {
+            log.warn("Exception thrown", e);
+            safeUnregister();
+            deviceProviderService = deviceProviderRegistry.register(this);
+            deviceProviderService.deviceConnected(did, bigSwitchDescription);
+            // retry
+            deviceProviderService.portStatusChanged(did, port);
+        }
+    }
+
+    void advertisePorts(DeviceId did, List<PortDescription> ports) {
+        try {
+            deviceProviderService.updatePorts(did, ports);
+        } catch (IllegalStateException e) {
+            log.warn("Exception thrown", e);
+            safeUnregister();
+            deviceProviderService = deviceProviderRegistry.register(this);
+            deviceProviderService.deviceConnected(did, bigSwitchDescription);
+            // retry
+            deviceProviderService.updatePorts(did, ports);
+        }
+    }
+
     private class InternalListener implements BigSwitchListener {
         @Override
         public void event(BigSwitchEvent event) {
             switch (event.type()) {
                 case PORT_ADDED:
                 case PORT_REMOVED:
-                    deviceProviderService.updatePorts(bigSwitch.id(), bigSwitchService.getPorts());
+                    advertisePorts(bigSwitch.id(), bigSwitchService.getPorts());
                     // if the subject's underlying port was a cross connect port,
                     // advertise cross-connect link to Metro-ONOS view
                     advertiseCrossConnectLinks(event.subject());
                     break;
 
                 case PORT_UPDATED:
-                    deviceProviderService.portStatusChanged(bigSwitch.id(), event.subject());
+                    advertisePort(bigSwitch.id(), event.subject());
                     // if the subject's underlying port was a cross connect port,
                     // advertise cross-connect link to Metro-ONOS view
                     advertiseCrossConnectLinks(event.subject());
@@ -558,6 +627,22 @@
         }
     }
 
+    /**
+     * Periodic heart beat of Device RPC session.
+     */
+    public class HeartBeatTask implements Runnable {
+
+        @Override
+        public void run() {
+            try {
+                // Minimum side-effect RPC call as a heart beat message
+                advertisePorts(bigSwitch.id(), bigSwitchService.getPorts());
+            } catch (IllegalStateException e) {
+                log.warn("Exception caught sending heart beat", e);
+            }
+        }
+    }
+
     private class InternalPacketProcessor implements PacketProcessor {
 
         @Override
@@ -599,7 +684,7 @@
             if (mac.equalsIgnoreCase(ourMac) || ProbedLinkProvider.defaultMac().equalsIgnoreCase(ourMac)) {
                 return false;
             }
-            // TODO Come up with more rubust way to identify bigswitch probe?
+            // TODO Come up with more robust way to identify big switch probe?
             return probe.getDeviceString().contains("bigswitch");
         }
     }
diff --git a/ecord/co/src/main/java/org/onosproject/ecord/co/BigSwitchManager.java b/ecord/co/src/main/java/org/onosproject/ecord/co/BigSwitchManager.java
index 0e8ec4f..c476700 100644
--- a/ecord/co/src/main/java/org/onosproject/ecord/co/BigSwitchManager.java
+++ b/ecord/co/src/main/java/org/onosproject/ecord/co/BigSwitchManager.java
@@ -124,10 +124,10 @@
                 .build(CacheLoader.from(portCounter::getAndIncrement));
 
         eventDispatcher.addSink(BigSwitchEvent.class, listenerRegistry);
-        //portCounter.compareAndSet(0, 1);     // Start counting from 1, doesn't work??
-        buildPorts();
+        portCounter.compareAndSet(0, 1);
         edgePortService.addListener(edgeListener);
         deviceService.addListener(deviceListener);
+        buildPorts();
         log.info("Started");
     }