Add device service session health check and recovery.
Change-Id: I1c5902367653d08af6f74eeb45e60a2de6257b86
diff --git a/ecord/co/src/main/java/org/onosproject/ecord/co/BigSwitchDeviceProvider.java b/ecord/co/src/main/java/org/onosproject/ecord/co/BigSwitchDeviceProvider.java
index 10cd467..da27b3a 100644
--- a/ecord/co/src/main/java/org/onosproject/ecord/co/BigSwitchDeviceProvider.java
+++ b/ecord/co/src/main/java/org/onosproject/ecord/co/BigSwitchDeviceProvider.java
@@ -22,7 +22,6 @@
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.RemovalCause;
import com.google.common.cache.RemovalNotification;
-
import org.apache.commons.lang3.tuple.Pair;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
@@ -80,6 +79,7 @@
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.Dictionary;
+import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ScheduledExecutorService;
@@ -122,6 +122,8 @@
*/
private static final int PROBE_INTERVAL = 3;
+ private static final long HEALTHCHECK_INTERVAL = 5;
+
private static final String PROP_SCHEME = "providerScheme";
private static final String DEFAULT_SCHEME = "bigswitch";
private static final String PROP_ID = "providerId";
@@ -160,7 +162,11 @@
private LinkProviderRegistry linkProviderRegistry;
private LinkProviderService linkProviderService;
private ScheduledExecutorService executor;
- private ScheduledFuture<?> future;
+ private ScheduledFuture<?> discovery;
+ /**
+ * Device session heart beat task handle.
+ */
+ private ScheduledFuture<?> heartBeat;
private BigSwitchListener listener = new InternalListener();
@@ -216,8 +222,8 @@
log.warn("Unsupported URI: {}", remoteUri);
}
providerId = new ProviderId(schemeProp, idProp);
- registerToDeviceProvider();
executor = newSingleThreadScheduledExecutor(groupedThreads("onos/bigswitch", "discovery-%d"));
+ registerToDeviceProvider();
prepareProbe();
registerToLinkServices();
@@ -258,6 +264,7 @@
// register to Device and Link Providers with new parameters
try {
remoteServiceContext = rpcService.get(URI.create(remoteUri));
+ providerId = new ProviderId(schemeProp, idProp);
registerToDeviceProvider();
registerToLinkServices();
} catch (UnsupportedOperationException e) {
@@ -327,10 +334,16 @@
deviceProviderRegistry = remoteServiceContext.get(DeviceProviderRegistry.class);
deviceProviderService = deviceProviderRegistry.register(this);
// Start big switch service and register device
- deviceProviderService.deviceConnected(bigSwitch.id(), bigSwitchDescription);
- deviceProviderService.updatePorts(bigSwitch.id(), bigSwitchService.getPorts());
+ advertiseDevice(bigSwitch.id());
+ advertisePorts(bigSwitch.id(), bigSwitchService.getPorts());
advertiseCrossConnectLinksOnAllPorts();
bigSwitchService.addListener(listener);
+
+ heartBeat = executor.scheduleAtFixedRate(new HeartBeatTask(),
+ HEALTHCHECK_INTERVAL,
+ HEALTHCHECK_INTERVAL,
+ TimeUnit.SECONDS);
+
}
private void registerToLinkServices() {
@@ -338,7 +351,7 @@
linkProviderRegistry = remoteServiceContext.get(LinkProviderRegistry.class);
linkProviderService = linkProviderRegistry.register(this);
- future = executor.scheduleAtFixedRate(new DiscoveryTask(),
+ discovery = executor.scheduleAtFixedRate(new DiscoveryTask(),
PROBE_INTERVAL, PROBE_INTERVAL, TimeUnit.SECONDS);
// maybe also want a way to say 'get me next usable priority of class X'
@@ -346,17 +359,26 @@
}
private void unregisterFromDeviceProvider() {
+ heartBeat.cancel(true);
+
if (bigSwitch == null) {
log.warn("Invalid unregistration.");
return;
}
- deviceProviderService.deviceDisconnected(bigSwitch.id());
+ bigSwitchService.removeListener(listener);
+
+ try {
+ deviceProviderService.deviceDisconnected(bigSwitch.id());
+ } catch (IllegalStateException e) {
+ log.warn("Exception on deviceDisconnected", e);
+ }
deviceProviderRegistry.unregister(this);
+ deviceProviderService = null;
bigSwitch = null;
}
private void unregisterFromLinkServices() {
- future.cancel(true);
+ discovery.cancel(true);
packetService.removeProcessor(packetProcessor);
linkProviderRegistry.unregister(this);
}
@@ -466,20 +488,67 @@
return ProbedLinkProvider.fingerprintMac(metadataService.getClusterMetadata());
}
+ void safeUnregister() {
+ try {
+ deviceProviderRegistry.unregister(this);
+ } catch (IllegalStateException e) {
+ // silently-ignore error
+ }
+ }
+
+ void advertiseDevice(DeviceId did) {
+ try {
+ deviceProviderService.deviceConnected(did, bigSwitchDescription);
+ } catch (IllegalStateException e) {
+ log.warn("Exception thrown", e);
+ safeUnregister();
+ deviceProviderService = deviceProviderRegistry.register(this);
+ deviceProviderService.deviceConnected(did, bigSwitchDescription);
+ // retry
+ deviceProviderService.deviceConnected(did, bigSwitchDescription);
+ }
+ }
+
+ void advertisePort(DeviceId did, PortDescription port) {
+ try {
+ deviceProviderService.portStatusChanged(did, port);
+ } catch (IllegalStateException e) {
+ log.warn("Exception thrown", e);
+ safeUnregister();
+ deviceProviderService = deviceProviderRegistry.register(this);
+ deviceProviderService.deviceConnected(did, bigSwitchDescription);
+ // retry
+ deviceProviderService.portStatusChanged(did, port);
+ }
+ }
+
+ void advertisePorts(DeviceId did, List<PortDescription> ports) {
+ try {
+ deviceProviderService.updatePorts(did, ports);
+ } catch (IllegalStateException e) {
+ log.warn("Exception thrown", e);
+ safeUnregister();
+ deviceProviderService = deviceProviderRegistry.register(this);
+ deviceProviderService.deviceConnected(did, bigSwitchDescription);
+ // retry
+ deviceProviderService.updatePorts(did, ports);
+ }
+ }
+
private class InternalListener implements BigSwitchListener {
@Override
public void event(BigSwitchEvent event) {
switch (event.type()) {
case PORT_ADDED:
case PORT_REMOVED:
- deviceProviderService.updatePorts(bigSwitch.id(), bigSwitchService.getPorts());
+ advertisePorts(bigSwitch.id(), bigSwitchService.getPorts());
// if the subject's underlying port was a cross connect port,
// advertise cross-connect link to Metro-ONOS view
advertiseCrossConnectLinks(event.subject());
break;
case PORT_UPDATED:
- deviceProviderService.portStatusChanged(bigSwitch.id(), event.subject());
+ advertisePort(bigSwitch.id(), event.subject());
// if the subject's underlying port was a cross connect port,
// advertise cross-connect link to Metro-ONOS view
advertiseCrossConnectLinks(event.subject());
@@ -558,6 +627,22 @@
}
}
+ /**
+ * Periodic heart beat of Device RPC session.
+ */
+ public class HeartBeatTask implements Runnable {
+
+ @Override
+ public void run() {
+ try {
+ // Minimum side-effect RPC call as a heart beat message
+ advertisePorts(bigSwitch.id(), bigSwitchService.getPorts());
+ } catch (IllegalStateException e) {
+ log.warn("Exception caught sending heart beat", e);
+ }
+ }
+ }
+
private class InternalPacketProcessor implements PacketProcessor {
@Override
@@ -599,7 +684,7 @@
if (mac.equalsIgnoreCase(ourMac) || ProbedLinkProvider.defaultMac().equalsIgnoreCase(ourMac)) {
return false;
}
- // TODO Come up with more rubust way to identify bigswitch probe?
+ // TODO Come up with more robust way to identify big switch probe?
return probe.getDeviceString().contains("bigswitch");
}
}
diff --git a/ecord/co/src/main/java/org/onosproject/ecord/co/BigSwitchManager.java b/ecord/co/src/main/java/org/onosproject/ecord/co/BigSwitchManager.java
index 0e8ec4f..c476700 100644
--- a/ecord/co/src/main/java/org/onosproject/ecord/co/BigSwitchManager.java
+++ b/ecord/co/src/main/java/org/onosproject/ecord/co/BigSwitchManager.java
@@ -124,10 +124,10 @@
.build(CacheLoader.from(portCounter::getAndIncrement));
eventDispatcher.addSink(BigSwitchEvent.class, listenerRegistry);
- //portCounter.compareAndSet(0, 1); // Start counting from 1, doesn't work??
- buildPorts();
+ portCounter.compareAndSet(0, 1);
edgePortService.addListener(edgeListener);
deviceService.addListener(deviceListener);
+ buildPorts();
log.info("Started");
}