[ONOS-6810] Implement Mastership handling in general DeviceProvider
Change-Id: I14b706d364cf5124da248230fbcda65d0bd284ce
diff --git a/core/net/src/main/java/org/onosproject/net/pi/impl/PiPipeconfManager.java b/core/net/src/main/java/org/onosproject/net/pi/impl/PiPipeconfManager.java
index d60e9b3..212cf1c 100644
--- a/core/net/src/main/java/org/onosproject/net/pi/impl/PiPipeconfManager.java
+++ b/core/net/src/main/java/org/onosproject/net/pi/impl/PiPipeconfManager.java
@@ -28,6 +28,9 @@
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onlab.util.ItemNotFoundException;
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.LeadershipService;
+import org.onosproject.cluster.NodeId;
import org.onosproject.net.DeviceId;
import org.onosproject.net.config.ConfigFactory;
import org.onosproject.net.config.NetworkConfigEvent;
@@ -79,6 +82,9 @@
protected NetworkConfigRegistry cfgService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected LeadershipService leadershipService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected DriverService driverService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
@@ -87,6 +93,9 @@
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected PiPipeconfMappingStore pipeconfMappingStore;
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected ClusterService clusterService;
+
// Registered pipeconf are replicated through the app subsystem and registered on app activated events.
protected ConcurrentHashMap<PiPipeconfId, PiPipeconf> piPipeconfs = new ConcurrentHashMap<>();
@@ -200,21 +209,28 @@
// due to 1:1:1 pipeconf:driver:provider maybe find better way
DriverProvider provider = new PiPipeconfDriverProviderInternal(completeDriver);
- //we register to the dirver susbystem the driver provider containing the merged driver
+ //we register to the driver susbystem the driver provider containing the merged driver
driverAdminService.registerProvider(provider);
}
// Changing the configuration for the device to enforce the full driver with pipipeconf
- // and base behaviours
- ObjectNode newCfg = (ObjectNode) basicDeviceConfig.node();
- newCfg = newCfg.put(DRIVER, completeDriverName);
- ObjectMapper mapper = new ObjectMapper();
- JsonNode newCfgNode = mapper.convertValue(newCfg, JsonNode.class);
- cfgService.applyConfig(deviceId, BasicDeviceConfig.class, newCfgNode);
- // Completable future is needed for when this method will also apply the pipeline to the device.
- // FIXME (maybe): the pipeline is currently applied by the general device provider. But we store here
- // the association between device and pipeconf.
- pipeconfMappingStore.createOrUpdateBinding(deviceId, pipeconfId);
+ // and base behaviours, updating binding only first time something changes
+ NodeId leaderNodeId = leadershipService.getLeader("deploy-" +
+ deviceId.toString() + "-pipeconf");
+ NodeId localNodeId = clusterService.getLocalNode().id();
+
+ if (!basicDeviceConfig.driver().equals(completeDriverName) && localNodeId.equals(leaderNodeId)) {
+ ObjectNode newCfg = (ObjectNode) basicDeviceConfig.node();
+ newCfg = newCfg.put(DRIVER, completeDriverName);
+ ObjectMapper mapper = new ObjectMapper();
+ JsonNode newCfgNode = mapper.convertValue(newCfg, JsonNode.class);
+ log.debug("New driver {} for device {}", completeDriverName, deviceId);
+ cfgService.applyConfig(deviceId, BasicDeviceConfig.class, newCfgNode);
+ // Completable future is needed for when this method will also apply the pipeline to the device.
+ // FIXME (maybe): the pipeline is currently applied by the general device provider.
+ // But we store here the association between device and pipeconf.
+ pipeconfMappingStore.createOrUpdateBinding(deviceId, pipeconfId);
+ }
operationResult.complete(true);
}
});
diff --git a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeHandshaker.java b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeHandshaker.java
index 7e41a99..f49ad18 100644
--- a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeHandshaker.java
+++ b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeHandshaker.java
@@ -66,7 +66,7 @@
client = controller.getClient(deviceId);
if (client == null || !controller.isReacheable(deviceId)) {
- result.complete(MastershipRole.STANDBY);
+ result.complete(MastershipRole.NONE);
return result;
}
if (newRole.equals(MastershipRole.MASTER)) {
diff --git a/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/GrpcControllerImpl.java b/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/GrpcControllerImpl.java
index 91b9db0..269936a 100644
--- a/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/GrpcControllerImpl.java
+++ b/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/GrpcControllerImpl.java
@@ -168,6 +168,8 @@
doDummyMessage(channels.get(channelId));
return true;
} catch (IOException e) {
+ log.warn("Error in sending dummy message to device {}", channelId);
+ log.debug("Exception ", e);
return false;
}
} finally {
diff --git a/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/GeneralDeviceProvider.java b/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/GeneralDeviceProvider.java
index a77dc33..824ec79 100644
--- a/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/GeneralDeviceProvider.java
+++ b/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/GeneralDeviceProvider.java
@@ -30,7 +30,11 @@
import org.onlab.util.ItemNotFoundException;
import org.onlab.util.Tools;
import org.onosproject.cfg.ComponentConfigService;
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.LeadershipService;
+import org.onosproject.cluster.NodeId;
import org.onosproject.core.CoreService;
+import org.onosproject.mastership.MastershipService;
import org.onosproject.net.AnnotationKeys;
import org.onosproject.net.DefaultAnnotations;
import org.onosproject.net.Device;
@@ -110,6 +114,10 @@
public class GeneralDeviceProvider extends AbstractProvider
implements DeviceProvider {
public static final String DRIVER = "driver";
+ public static final int REACHABILITY_TIMEOUT = 10;
+ public static final String DEPLOY = "deploy-";
+ public static final String PIPECONF_TOPIC = "-pipeconf";
+
private final Logger log = getLogger(getClass());
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
@@ -131,8 +139,17 @@
protected DriverService driverService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected MastershipService mastershipService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected PiPipeconfService piPipeconfService;
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected ClusterService clusterService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected LeadershipService leadershipService;
+
private static final int DEFAULT_POLL_FREQUENCY_SECONDS = 10;
@Property(name = "pollFrequency", intValue = DEFAULT_POLL_FREQUENCY_SECONDS,
label = "Configure poll frequency for port status and statistics; " +
@@ -257,17 +274,26 @@
@Override
public void roleChanged(DeviceId deviceId, MastershipRole newRole) {
- log.debug("Received role {} for device {}", newRole, deviceId);
+ log.info("Received role {} for device {}", newRole, deviceId);
CompletableFuture<MastershipRole> roleReply = getHandshaker(deviceId).roleChanged(newRole);
- roleReply.thenAcceptAsync(mastership -> providerService.receivedRoleReply(deviceId, newRole, mastership));
+ roleReply.thenAcceptAsync(mastership -> {
+ providerService.receivedRoleReply(deviceId, newRole, mastership);
+ if (!mastership.equals(MastershipRole.MASTER) && scheduledTasks.get(deviceId) != null) {
+ scheduledTasks.get(deviceId).cancel(false);
+ scheduledTasks.remove(deviceId);
+ } else if (mastership.equals(MastershipRole.MASTER) && scheduledTasks.get(deviceId) == null) {
+ scheduledTasks.put(deviceId, schedulePolling(deviceId, false));
+ updatePortStatistics(deviceId);
+ }
+ });
}
@Override
public boolean isReachable(DeviceId deviceId) {
- log.debug("Testing rechability for device {}", deviceId);
+ log.debug("Testing reachability for device {}", deviceId);
CompletableFuture<Boolean> reachable = getHandshaker(deviceId).isReachable();
try {
- return reachable.get(10, TimeUnit.SECONDS);
+ return reachable.get(REACHABILITY_TIMEOUT, TimeUnit.SECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
log.error("Device {} is not reachable", deviceId, e);
return false;
@@ -368,15 +394,7 @@
return;
}
- //Storing deviceKeyId and all other config values
- // as data in the driver with protocol_<info>
- // name as the key. e.g protocol_ip
- providerConfig.protocolsInfo()
- .forEach((protocol, deviceInfoConfig) -> {
- deviceInfoConfig.configValues()
- .forEach((k, v) -> driverData.set(protocol + "_" + k, v));
- driverData.set(protocol + "_key", deviceInfoConfig.deviceKeyId());
- });
+ addConfigData(providerConfig, driverData);
//Connecting to the device
CompletableFuture<Boolean> connected = handshaker.connect();
@@ -409,7 +427,7 @@
ports = deviceDiscovery.discoverPortDetails();
}
- if (!handlePipeconf(deviceId, driver, driverData)) {
+ if (!handlePipeconf(deviceId, driver, driverData, true)) {
// Something went wrong during handling of pipeconf.
// We already logged the error.
handshaker.disconnect();
@@ -425,11 +443,37 @@
}
}
+ private void connectStandbyDevice(DeviceId deviceId) {
+
+ //if device is pipeline programmable we merge pipeconf + base driver for every other role
+ GeneralProviderDeviceConfig providerConfig =
+ cfgService.getConfig(deviceId, GeneralProviderDeviceConfig.class);
+
+ Driver driver = getDriver(deviceId);
+
+
+ DriverData driverData = new DefaultDriverData(driver, deviceId);
+ DeviceHandshaker handshaker = getBehaviour(driver, DeviceHandshaker.class, driverData);
+ if (handshaker == null) {
+ log.error("Device {}, with driver {} does not support DeviceHandshaker " +
+ "behaviour, supported behaviours={}", deviceId, driver.name(), driver.behaviours());
+ return;
+ }
+ addConfigData(providerConfig, driverData);
+
+ //Connecting to the device
+ handshaker.connect().thenAcceptAsync(result -> {
+ if (result) {
+ handlePipeconf(deviceId, driver, driverData, false);
+ }
+ });
+ }
+
/**
* Handles the case of a device that is pipeline programmable. Returns true if the operation wa successful and the
* device can be registered to the core, false otherwise.
*/
- private boolean handlePipeconf(DeviceId deviceId, Driver driver, DriverData driverData) {
+ private boolean handlePipeconf(DeviceId deviceId, Driver driver, DriverData driverData, boolean deployPipeconf) {
PiPipelineProgrammable pipelineProg = getBehaviour(driver, PiPipelineProgrammable.class,
driverData);
@@ -439,6 +483,42 @@
return true;
}
+ PiPipeconf pipeconf = getPipeconf(deviceId, pipelineProg);
+
+ if (pipeconf != null) {
+
+ PiPipeconfId pipeconfId = pipeconf.id();
+
+ try {
+ if (deployPipeconf) {
+ if (!pipelineProg.deployPipeconf(pipeconf).get()) {
+ log.error("Unable to deploy pipeconf {} to {}, aborting device discovery",
+ pipeconfId, deviceId);
+ return false;
+ }
+ }
+ } catch (InterruptedException | ExecutionException e) {
+ log.warn("Exception occurred while deploying pipeconf {} to device {}", pipeconf.id(), deviceId, e);
+ return false;
+ }
+ try {
+ if (!piPipeconfService.bindToDevice(pipeconfId, deviceId).get()) {
+ log.error("Unable to merge driver {} for device {} with pipeconf {}, aborting device discovery",
+ driver.name(), deviceId, pipeconfId);
+ return false;
+ }
+ } catch (InterruptedException | ExecutionException e) {
+ log.warn("Exception occurred while binding pipeconf {} to device {}", pipeconf.id(), deviceId, e);
+ return false;
+ }
+ } else {
+ return false;
+ }
+
+ return true;
+ }
+
+ private PiPipeconf getPipeconf(DeviceId deviceId, PiPipelineProgrammable pipelineProg) {
PiPipeconfId pipeconfId = piPipeconfService.ofDevice(deviceId).orElseGet(() -> {
// No pipeconf has been associated with this device.
// Check if device driver provides a default one.
@@ -453,33 +533,16 @@
if (pipeconfId == null) {
log.warn("Device {} is pipeline programmable but no pipeconf can be associated to it.", deviceId);
- return false;
+ return null;
}
if (!piPipeconfService.getPipeconf(pipeconfId).isPresent()) {
log.warn("Pipeconf {} is not registered", pipeconfId);
- return false;
+ return null;
}
- PiPipeconf pipeconf = piPipeconfService.getPipeconf(pipeconfId).get();
-
- try {
- if (!pipelineProg.deployPipeconf(pipeconf).get()) {
- log.error("Unable to deploy pipeconf {} to {}, aborting device discovery", pipeconfId, deviceId);
- return false;
- }
-
- if (!piPipeconfService.bindToDevice(pipeconfId, deviceId).get()) {
- log.error("Unable to merge driver {} for device {} with pipeconf {}, aborting device discovery",
- driver.name(), deviceId, pipeconfId);
- return false;
- }
- } catch (InterruptedException | ExecutionException e) {
- throw new IllegalStateException(e);
- }
-
- return true;
+ return piPipeconfService.getPipeconf(pipeconfId).get();
}
private void advertiseDevice(DeviceId deviceId, DeviceDescription description, List<PortDescription> ports) {
@@ -492,7 +555,6 @@
DeviceHandshaker handshaker = getHandshaker(deviceId);
if (handshaker != null) {
CompletableFuture<Boolean> disconnect = handshaker.disconnect();
-
disconnect.thenAcceptAsync(result -> {
if (result) {
log.info("Disconnected device {}", deviceId);
@@ -560,6 +622,28 @@
log.info("Device {} is already connected to ONOS and is available", deviceId);
return;
}
+ NodeId leaderNodeId = leadershipService.runForLeadership(DEPLOY + deviceId.toString() + PIPECONF_TOPIC)
+ .leader().nodeId();
+ NodeId localNodeId = clusterService.getLocalNode().id();
+ if (localNodeId.equals(leaderNodeId)) {
+ if (processEvent(event, deviceId)) {
+ log.debug("{} is leader for {}, initiating the connection and deploying pipeline", leaderNodeId,
+ deviceId);
+ checkAndSubmitDeviceTask(deviceId);
+ }
+ } else {
+ if (processEvent(event, deviceId)) {
+ log.debug("{} is not leader for {}, initiating connection but not deploying pipeline, {} is LEADER",
+ localNodeId, deviceId, leaderNodeId);
+ connectionExecutor.submit(exceptionSafe(() -> connectStandbyDevice(deviceId)));
+ //FIXME this will be removed when config is synced
+ cleanUpConfigInfo(deviceId);
+ }
+ }
+
+ }
+
+ private boolean processEvent(NetworkConfigEvent event, DeviceId deviceId) {
//FIXME to be removed when netcfg will issue device events in a bundle or
// ensure all configuration needed is present
Lock lock = ENTRY_LOCKS.computeIfAbsent(deviceId, key -> new ReentrantLock());
@@ -590,7 +674,7 @@
// in the pipelineConfigured
if (deviceConfigured.contains(deviceId) && driverConfigured.contains(deviceId)
&& pipelineConfigured.contains(deviceId)) {
- checkAndSubmitDeviceTask(deviceId);
+ return true;
} else {
if (deviceConfigured.contains(deviceId) && driverConfigured.contains(deviceId)) {
log.debug("Waiting for pipeline configuration for device {}", deviceId);
@@ -604,6 +688,7 @@
log.debug("Only device configuration for device {}", deviceId);
}
}
+ return false;
} finally {
lock.unlock();
}
@@ -622,10 +707,26 @@
private void checkAndSubmitDeviceTask(DeviceId deviceId) {
connectionExecutor.submit(exceptionSafe(() -> connectDevice(deviceId)));
//FIXME this will be removed when configuration is synced.
+ cleanUpConfigInfo(deviceId);
+
+ }
+
+ private void addConfigData(GeneralProviderDeviceConfig providerConfig, DriverData driverData) {
+ //Storing deviceKeyId and all other config values
+ // as data in the driver with protocol_<info>
+ // name as the key. e.g protocol_ip
+ providerConfig.protocolsInfo()
+ .forEach((protocol, deviceInfoConfig) -> {
+ deviceInfoConfig.configValues()
+ .forEach((k, v) -> driverData.set(protocol + "_" + k, v));
+ driverData.set(protocol + "_key", deviceInfoConfig.deviceKeyId());
+ });
+ }
+
+ private void cleanUpConfigInfo(DeviceId deviceId) {
deviceConfigured.remove(deviceId);
driverConfigured.remove(deviceId);
pipelineConfigured.remove(deviceId);
-
}
private ScheduledFuture<?> schedulePolling(DeviceId deviceId, boolean randomize) {
@@ -650,10 +751,13 @@
//For now this is scheduled periodically, when streaming API will
// be available we check and base it on the streaming API (e.g. gNMI)
- scheduledTasks.put(deviceId, schedulePolling(deviceId, false));
- updatePortStatistics(deviceId);
+ if (mastershipService.isLocalMaster(deviceId)) {
+ scheduledTasks.put(deviceId, schedulePolling(deviceId, false));
+ updatePortStatistics(deviceId);
+ }
} else if (type.equals(Type.DEVICE_REMOVED)) {
+
connectionExecutor.submit(exceptionSafe(() ->
disconnectDevice(deviceId)));
}
diff --git a/providers/p4runtime/packet/src/main/java/org/onosproject/provider/p4runtime/packet/impl/P4RuntimePacketProvider.java b/providers/p4runtime/packet/src/main/java/org/onosproject/provider/p4runtime/packet/impl/P4RuntimePacketProvider.java
index fb4a97a..8693eff 100644
--- a/providers/p4runtime/packet/src/main/java/org/onosproject/provider/p4runtime/packet/impl/P4RuntimePacketProvider.java
+++ b/providers/p4runtime/packet/src/main/java/org/onosproject/provider/p4runtime/packet/impl/P4RuntimePacketProvider.java
@@ -22,6 +22,7 @@
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.onosproject.mastership.MastershipService;
import org.onosproject.net.Device;
import org.onosproject.net.DeviceId;
import org.onosproject.net.device.DeviceService;
@@ -67,6 +68,9 @@
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected DeviceService deviceService;
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected MastershipService mastershipService;
+
private PacketProviderService providerService;
private InternalPacketListener packetListener = new InternalPacketListener();
@@ -98,7 +102,7 @@
if (packet != null) {
DeviceId deviceId = packet.sendThrough();
Device device = deviceService.getDevice(deviceId);
- if (device.is(PacketProgrammable.class)) {
+ if (device.is(PacketProgrammable.class) && mastershipService.isLocalMaster(deviceId)) {
PacketProgrammable packetProgrammable = device.as(PacketProgrammable.class);
packetProgrammable.emit(packet);
} else {
@@ -148,7 +152,10 @@
@Override
public void event(P4RuntimeEvent event) {
- if (event.type() != P4RuntimeEvent.Type.PACKET_IN) {
+ //Masterhip message is sent to everybody but picked up only by master.
+ //FIXME we need the device ID into p4RuntimeEvnetSubject to check for mastsership
+ if (!(event.subject() instanceof P4RuntimePacketIn) || event.type() != P4RuntimeEvent.Type.PACKET_IN) {
+ log.debug("Event type {}", event.type());
// Not a packet-in event, ignore it.
return;
}
@@ -163,7 +170,7 @@
if (!device.is(PiPipelineInterpreter.class)) {
log.warn("Unable to process packet-in from {}, device has no PiPipelineInterpreter behaviour",
- deviceId);
+ deviceId);
return;
}
@@ -184,7 +191,7 @@
log.debug("Processing inbound packet: {}", inPkt.toString());
OutboundPacket outPkt = new DefaultOutboundPacket(eventSubject.deviceId(), null,
- operation.data().asReadOnlyBuffer());
+ operation.data().asReadOnlyBuffer());
PacketContext pktCtx = new P4RuntimePacketContext(System.currentTimeMillis(), inPkt, outPkt, false);
// Pushing the packet context up for processing.