Simplified Bmv2 device context service and context handling in demo apps
Change-Id: I2a13ed673902d0616732d43c841f50b1ad38cd4c
diff --git a/protocols/bmv2/ctl/src/main/java/org/onosproject/bmv2/ctl/Bmv2DeviceContextServiceImpl.java b/protocols/bmv2/ctl/src/main/java/org/onosproject/bmv2/ctl/Bmv2DeviceContextServiceImpl.java
index bcdb939..4727913 100644
--- a/protocols/bmv2/ctl/src/main/java/org/onosproject/bmv2/ctl/Bmv2DeviceContextServiceImpl.java
+++ b/protocols/bmv2/ctl/src/main/java/org/onosproject/bmv2/ctl/Bmv2DeviceContextServiceImpl.java
@@ -29,7 +29,7 @@
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onlab.util.KryoNamespace;
-import org.onlab.util.SharedExecutors;
+import org.onlab.util.SharedScheduledExecutors;
import org.onosproject.bmv2.api.context.Bmv2Configuration;
import org.onosproject.bmv2.api.context.Bmv2DefaultConfiguration;
import org.onosproject.bmv2.api.context.Bmv2DeviceContext;
@@ -38,11 +38,17 @@
import org.onosproject.bmv2.api.runtime.Bmv2RuntimeException;
import org.onosproject.bmv2.api.service.Bmv2Controller;
import org.onosproject.bmv2.api.service.Bmv2DeviceContextService;
+import org.onosproject.mastership.MastershipService;
import org.onosproject.net.DeviceId;
+import org.onosproject.net.device.DeviceService;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.service.ConsistentMap;
+import org.onosproject.store.service.ConsistentMapException;
+import org.onosproject.store.service.MapEvent;
+import org.onosproject.store.service.MapEventListener;
import org.onosproject.store.service.Serializer;
import org.onosproject.store.service.StorageService;
+import org.onosproject.store.service.Versioned;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -50,30 +56,43 @@
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.Map;
-import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
import static com.google.common.base.Preconditions.checkNotNull;
+import static org.onosproject.bmv2.api.context.Bmv2DefaultConfiguration.parse;
+import static org.onosproject.store.service.MapEvent.Type.INSERT;
+import static org.onosproject.store.service.MapEvent.Type.UPDATE;
@Component(immediate = true)
@Service
public class Bmv2DeviceContextServiceImpl implements Bmv2DeviceContextService {
private static final String JSON_DEFAULT_CONFIG_PATH = "/default.json";
+ private static final long CHECK_INTERVAL = 5_000; // milliseconds
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
private StorageService storageService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ private DeviceService deviceService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ private MastershipService mastershipService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
private Bmv2Controller controller;
- private final ExecutorService executorService = SharedExecutors.getPoolThreadExecutor();
+ private final ScheduledExecutorService scheduledExecutor = SharedScheduledExecutors.getPoolThreadExecutor();
+ private final MapEventListener<DeviceId, Bmv2DeviceContext> contextListener = new ContextMapEventListener();
+ private final ConcurrentMap<DeviceId, Boolean> deviceLocks = Maps.newConcurrentMap();
private ConsistentMap<DeviceId, Bmv2DeviceContext> contexts;
- private Map<DeviceId, Bmv2DeviceContext> contextsMap;
-
private Map<String, ClassLoader> interpreterClassLoaders;
-
private Bmv2DeviceContext defaultContext;
+ private ScheduledFuture<?> configChecker = null;
private final Logger log = LoggerFactory.getLogger(getClass());
@@ -88,39 +107,55 @@
.withSerializer(Serializer.using(kryo))
.withName("onos-bmv2-contexts")
.build();
- contextsMap = contexts.asJavaMap();
-
- interpreterClassLoaders = Maps.newConcurrentMap();
Bmv2Configuration defaultConfiguration = loadDefaultConfiguration();
Bmv2Interpreter defaultInterpreter = new Bmv2DefaultInterpreterImpl();
defaultContext = new Bmv2DeviceContext(defaultConfiguration, defaultInterpreter);
- interpreterClassLoaders.put(defaultInterpreter.getClass().getName(), this.getClass().getClassLoader());
+ interpreterClassLoaders = Maps.newConcurrentMap();
+ registerInterpreterClassLoader(defaultInterpreter.getClass(), this.getClass().getClassLoader());
+
+ contexts.addListener(contextListener);
+
+ if (configChecker != null && configChecker.isCancelled()) {
+ configChecker.cancel(false);
+ }
+ configChecker = scheduledExecutor.scheduleAtFixedRate(this::checkDevices, 0, CHECK_INTERVAL,
+ TimeUnit.MILLISECONDS);
log.info("Started");
}
@Deactivate
public void deactivate() {
+ contexts.removeListener(contextListener);
+ if (configChecker != null) {
+ configChecker.cancel(false);
+ }
log.info("Stopped");
}
@Override
public Bmv2DeviceContext getContext(DeviceId deviceId) {
checkNotNull(deviceId, "device id cannot be null");
- return contextsMap.get(deviceId);
+ Versioned<Bmv2DeviceContext> versionedContext = contexts.get(deviceId);
+ return (versionedContext == null) ? null : versionedContext.value();
}
@Override
- public void triggerConfigurationSwap(DeviceId deviceId, Bmv2DeviceContext context) {
+ public void setContext(DeviceId deviceId, Bmv2DeviceContext context) {
checkNotNull(deviceId, "device id cannot be null");
checkNotNull(context, "context cannot be null");
if (!interpreterClassLoaders.containsKey(context.interpreter().getClass().getName())) {
- log.error("Unable to trigger configuration swap, missing class loader for context interpreter. " +
- "Please register it with registerInterpreterClassLoader()");
+ log.error("Unable to set context, missing class loader for interpreter '{}'. " +
+ "Please register it with registerInterpreterClassLoader()",
+ context.interpreter().getClass().getName());
} else {
- executorService.execute(() -> executeConfigurationSwap(deviceId, context));
+ try {
+ contexts.put(deviceId, context);
+ } catch (ConsistentMapException.ConcurrentModification e) {
+ log.error("Detected concurrent modification on context map");
+ }
}
}
@@ -129,86 +164,83 @@
interpreterClassLoaders.put(interpreterClass.getName(), loader);
}
- private void executeConfigurationSwap(DeviceId deviceId, Bmv2DeviceContext context) {
- contexts.compute(deviceId, (key, existingValue) -> {
- if (context.equals(existingValue)) {
- log.info("Dropping swap request as one has already been triggered for the given context.");
- return existingValue;
+ @Override
+ public Bmv2DeviceContext defaultContext() {
+ return defaultContext;
+ }
+
+ private void configCheck(DeviceId deviceId) {
+ // Synchronize executions over the same deviceId.
+ deviceLocks.putIfAbsent(deviceId, new Boolean(true));
+ synchronized (deviceLocks.get(deviceId)) {
+
+ Bmv2DeviceContext storedContext = getContext(deviceId);
+ if (storedContext == null) {
+ return;
}
+
+ log.trace("Executing configuration check on {}...", deviceId);
+
try {
- Bmv2DeviceAgent agent = controller.getAgent(deviceId);
- String jsonString = context.configuration().json().toString();
- agent.loadNewJsonConfig(jsonString);
- agent.swapJsonConfig();
- return context;
+ // FIXME: JSON dump is heavy, can we use the JSON MD5 to check the running configuration?
+ String jsonString = controller.getAgent(deviceId).dumpJsonConfig();
+ Bmv2Configuration deviceConfiguration = parse(Json.parse(jsonString).asObject());
+
+ if (!storedContext.configuration().equals(deviceConfiguration)) {
+ log.info("Triggering configuration swap on {}...", deviceId);
+ try {
+ Bmv2DeviceAgent agent = controller.getAgent(deviceId);
+ String newJsonString = storedContext.configuration().json().toString();
+ agent.uploadNewJsonConfig(newJsonString);
+ agent.swapJsonConfig();
+ } catch (Bmv2RuntimeException e) {
+ log.error("Unable to swap configuration on {}: {}", deviceId, e.explain());
+ }
+ }
} catch (Bmv2RuntimeException e) {
- log.error("Unable to swap configuration on {}: {}", deviceId, e.explain());
- return existingValue;
+ log.warn("Unable to dump JSON configuration from {}: {}", deviceId, e.explain());
}
+ }
+ }
+
+ private void triggerConfigCheck(DeviceId deviceId) {
+ if (mastershipService.isLocalMaster(deviceId)) {
+ scheduledExecutor.schedule(() -> configCheck(deviceId), 0, TimeUnit.SECONDS);
+ }
+ }
+
+ private void checkDevices() {
+ deviceService.getAvailableDevices().forEach(device -> {
+ triggerConfigCheck(device.id());
});
}
- @Override
- public boolean notifyDeviceChange(DeviceId deviceId) {
- checkNotNull(deviceId, "device id cannot be null");
-
- Bmv2DeviceContext storedContext = getContext(deviceId);
-
- if (storedContext == null) {
- log.info("No context previously stored for {}, swapping to DEFAULT_CONTEXT.", deviceId);
- triggerConfigurationSwap(deviceId, defaultContext);
- // Device can be accepted.
- return false;
- } else {
- Bmv2Configuration deviceConfiguration = loadDeviceConfiguration(deviceId);
- if (deviceConfiguration == null) {
- log.warn("Unable to load configuration from device {}", deviceId);
- return false;
- }
- if (storedContext.configuration().equals(deviceConfiguration)) {
- return true;
- } else {
- log.info("Device context is different from the stored one, triggering configuration swap for {}...",
- deviceId);
- triggerConfigurationSwap(deviceId, storedContext);
- return false;
- }
- }
- }
-
- /**
- * Load and parse a BMv2 JSON configuration from the given device.
- *
- * @param deviceId a device id
- * @return a BMv2 configuration
- */
- private Bmv2Configuration loadDeviceConfiguration(DeviceId deviceId) {
- try {
- String jsonString = controller.getAgent(deviceId).dumpJsonConfig();
- return Bmv2DefaultConfiguration.parse(Json.parse(jsonString).asObject());
- } catch (Bmv2RuntimeException e) {
- log.warn("Unable to load JSON configuration from {}: {}", deviceId, e.explain());
- return null;
- }
- }
-
- /**
- * Loads default configuration from file.
- *
- * @return a BMv2 configuration
- */
protected static Bmv2DefaultConfiguration loadDefaultConfiguration() {
try {
JsonObject json = Json.parse(new BufferedReader(new InputStreamReader(
Bmv2DeviceContextServiceImpl.class.getResourceAsStream(JSON_DEFAULT_CONFIG_PATH)))).asObject();
- return Bmv2DefaultConfiguration.parse(json);
+ return parse(json);
} catch (IOException e) {
throw new RuntimeException("Unable to load default configuration", e);
}
}
/**
- * Internal BMv2 context serializer.
+ * Listener of context changes that immediately triggers config checks (to swap the config if necessary).
+ */
+ private class ContextMapEventListener implements MapEventListener<DeviceId, Bmv2DeviceContext> {
+ @Override
+ public void event(MapEvent<DeviceId, Bmv2DeviceContext> event) {
+ DeviceId deviceId = event.key();
+ if (event.type().equals(INSERT) || event.type().equals(UPDATE)) {
+ log.trace("Context {} for {}", event.type().name(), deviceId);
+ triggerConfigCheck(deviceId);
+ }
+ }
+ }
+
+ /**
+ * Context serializer.
*/
private class BmvDeviceContextSerializer extends com.esotericsoftware.kryo.Serializer<Bmv2DeviceContext> {
@@ -222,7 +254,7 @@
public Bmv2DeviceContext read(Kryo kryo, Input input, Class<Bmv2DeviceContext> type) {
String jsonStr = kryo.readObject(input, String.class);
String interpreterClassName = kryo.readObject(input, String.class);
- Bmv2Configuration configuration = Bmv2DefaultConfiguration.parse(Json.parse(jsonStr).asObject());
+ Bmv2Configuration configuration = parse(Json.parse(jsonStr).asObject());
ClassLoader loader = interpreterClassLoaders.get(interpreterClassName);
if (loader == null) {
throw new IllegalStateException("No class loader registered for interpreter: " + interpreterClassName);