[ONOS-7143] Add arbitration update support by P4RuntimeClient
Change-Id: I671275576018d50447f969166a7b42a28dd93b1d
diff --git a/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeClient.java b/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeClient.java
index 7c75c91..5072c65 100644
--- a/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeClient.java
+++ b/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeClient.java
@@ -156,5 +156,12 @@
*/
void shutdown();
+ /**
+ * Sends a master arbitration update to the device.
+ *
+ * @return a completable future containing true if the operation was successful; false otherwise
+ */
+ CompletableFuture<Boolean> sendMasterArbitrationUpdate();
+
// TODO: work in progress.
}
diff --git a/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeController.java b/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeController.java
index b095bca..3e015a4 100644
--- a/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeController.java
+++ b/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeController.java
@@ -75,4 +75,11 @@
* @return true if a client was created and is able to contact the P4Runtime server, false otherwise.
*/
boolean isReacheable(DeviceId deviceId);
+
+ /**
+ * Gets new election id for device arbitration request.
+ *
+ * @return the election id
+ */
+ long getNewMasterElectionId();
}
diff --git a/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeEvent.java b/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeEvent.java
index 1f04374..ab0146f 100644
--- a/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeEvent.java
+++ b/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeEvent.java
@@ -33,7 +33,11 @@
* A packet-in.
*/
PACKET_IN,
- // TODO: add mastership, device as soon as we define those.
+
+ /**
+ * Arbitration reply.
+ */
+ ARBITRATION,
}
public P4RuntimeEvent(Type type, P4RuntimeEventSubject subject) {
diff --git a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/DefaultArbitration.java b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/DefaultArbitration.java
new file mode 100644
index 0000000..7370dc7
--- /dev/null
+++ b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/DefaultArbitration.java
@@ -0,0 +1,58 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.p4runtime.ctl;
+
+import org.onosproject.net.MastershipRole;
+import org.onosproject.p4runtime.api.P4RuntimeEventSubject;
+import p4.P4RuntimeOuterClass.Uint128;
+
+/**
+ * Default implementation of arbitration in P4Runtime.
+ */
+public class DefaultArbitration implements P4RuntimeEventSubject {
+ private MastershipRole role;
+ private Uint128 electionId;
+
+ /**
+ * Creates arbitration with given role and election id.
+ *
+ * @param role the role
+ * @param electionId the election id
+ */
+ public DefaultArbitration(MastershipRole role, Uint128 electionId) {
+ this.role = role;
+ this.electionId = electionId;
+ }
+
+ /**
+ * Gets the role of this arbitration.
+ *
+ * @return the role
+ */
+ public MastershipRole role() {
+ return role;
+ }
+
+ /**
+ * Gets election id of this arbitration.
+ *
+ * @return the election id
+ */
+ public Uint128 electionId() {
+ return electionId;
+ }
+}
diff --git a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeClientImpl.java b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeClientImpl.java
index 21bd31b..738417e 100644
--- a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeClientImpl.java
+++ b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeClientImpl.java
@@ -31,6 +31,7 @@
import org.onlab.osgi.DefaultServiceDirectory;
import org.onlab.util.Tools;
import org.onosproject.net.DeviceId;
+import org.onosproject.net.MastershipRole;
import org.onosproject.net.pi.model.PiPipeconf;
import org.onosproject.net.pi.runtime.PiActionGroup;
import org.onosproject.net.pi.runtime.PiActionGroupMember;
@@ -60,6 +61,7 @@
import p4.P4RuntimeOuterClass.StreamMessageRequest;
import p4.P4RuntimeOuterClass.StreamMessageResponse;
import p4.P4RuntimeOuterClass.TableEntry;
+import p4.P4RuntimeOuterClass.Uint128;
import p4.P4RuntimeOuterClass.Update;
import p4.P4RuntimeOuterClass.WriteRequest;
import p4.config.P4InfoOuterClass.P4Info;
@@ -74,6 +76,7 @@
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -104,6 +107,7 @@
WriteOperationType.MODIFY, Update.Type.MODIFY,
WriteOperationType.DELETE, Update.Type.DELETE
);
+ private static final String ARBITRATION_RESULT_MASTER = "Is master";
private final Logger log = getLogger(getClass());
@@ -117,6 +121,9 @@
private final Lock writeLock = new ReentrantLock();
private final StreamObserver<StreamMessageRequest> streamRequestObserver;
+ private Map<Uint128, CompletableFuture<Boolean>> arbitrationUpdateMap = Maps.newConcurrentMap();
+ protected Uint128 p4RuntimeElectionId;
+
/**
* Default constructor.
*
@@ -257,34 +264,44 @@
"dumpGroups-" + actionProfileId.id());
}
+ @Override
+ public CompletableFuture<Boolean> sendMasterArbitrationUpdate() {
+ return supplyInContext(this::doArbitrationUpdate, "arbitrationUpdate");
+ }
+
/* Blocking method implementations below */
+ private boolean doArbitrationUpdate() {
+ CompletableFuture<Boolean> result = new CompletableFuture<>();
+ // TODO: currently we use 64-bit Long type for election id, should
+ // we use 128-bit ?
+ long nextElectId = controller.getNewMasterElectionId();
+ Uint128 newElectionId = Uint128.newBuilder()
+ .setLow(nextElectId)
+ .build();
+ MasterArbitrationUpdate arbitrationUpdate = MasterArbitrationUpdate.newBuilder()
+ .setDeviceId(p4DeviceId)
+ .setElectionId(newElectionId)
+ .build();
+ StreamMessageRequest requestMsg = StreamMessageRequest.newBuilder()
+ .setArbitration(arbitrationUpdate)
+ .build();
+ log.debug("Sending arbitration update to {} with election id {}...",
+ deviceId, newElectionId);
+ arbitrationUpdateMap.put(newElectionId, result);
+ try {
+ streamRequestObserver.onNext(requestMsg);
+ return result.get();
+ } catch (InterruptedException | ExecutionException | StatusRuntimeException e) {
+ log.warn("Arbitration update failed for {} due to {}", deviceId, e);
+ arbitrationUpdateMap.remove(newElectionId);
+ return false;
+ }
+ }
private boolean doInitStreamChannel() {
// To listen for packets and other events, we need to start the RPC.
// Here we do it by sending a master arbitration update.
- log.info("initializing stream chanel on {}...", deviceId);
- if (!doArbitrationUpdate()) {
- log.warn("Unable to initialize stream channel for {}", deviceId);
- return false;
- } else {
- return true;
- }
- }
-
- private boolean doArbitrationUpdate() {
- log.info("Sending arbitration update to {}...", deviceId);
- StreamMessageRequest requestMsg = StreamMessageRequest.newBuilder()
- .setArbitration(MasterArbitrationUpdate.newBuilder()
- .setDeviceId(p4DeviceId)
- .build())
- .build();
- try {
- streamRequestObserver.onNext(requestMsg);
- return true;
- } catch (StatusRuntimeException e) {
- log.warn("Arbitration update failed for {}: {}", deviceId, e);
- return false;
- }
+ return doArbitrationUpdate();
}
private boolean doSetPipelineConfig(PiPipeconf pipeconf, ByteBuffer deviceData) {
@@ -315,6 +332,7 @@
SetForwardingPipelineConfigRequest request = SetForwardingPipelineConfigRequest
.newBuilder()
+ .setElectionId(p4RuntimeElectionId)
.setAction(VERIFY_AND_COMMIT)
.addConfigs(pipelineConfig)
.build();
@@ -330,7 +348,6 @@
private boolean doWriteTableEntries(Collection<PiTableEntry> piTableEntries, WriteOperationType opType,
PiPipeconf pipeconf) {
-
WriteRequest.Builder writeRequestBuilder = WriteRequest.newBuilder();
Collection<Update> updateMsgs = TableEntryEncoder.encode(piTableEntries, pipeconf)
@@ -350,11 +367,7 @@
writeRequestBuilder
.setDeviceId(p4DeviceId)
- /* PI ignores this ElectionId, commenting out for now.
- .setElectionId(Uint128.newBuilder()
- .setHigh(0)
- .setLow(ELECTION_ID)
- .build()) */
+ .setElectionId(p4RuntimeElectionId)
.addAllUpdates(updateMsgs)
.build();
@@ -455,8 +468,33 @@
}
private void doArbitrationUpdateFromDevice(MasterArbitrationUpdate arbitrationMsg) {
+ log.debug("Received arbitration update from {}: {}", deviceId, arbitrationMsg);
- log.warn("Received arbitration update from {} (NOT IMPLEMENTED YET): {}", deviceId, arbitrationMsg);
+ Uint128 electionId = arbitrationMsg.getElectionId();
+ CompletableFuture<Boolean> mastershipFeature = arbitrationUpdateMap.remove(electionId);
+
+ if (mastershipFeature == null) {
+ log.warn("Can't find completable future of election id {}", electionId);
+ return;
+ }
+
+ this.p4RuntimeElectionId = electionId;
+ int statusCode = arbitrationMsg.getStatus().getCode();
+ MastershipRole arbitrationRole;
+ // arbitration update success
+
+ if (statusCode == Status.OK.getCode().value()) {
+ mastershipFeature.complete(true);
+ arbitrationRole = MastershipRole.MASTER;
+ } else {
+ mastershipFeature.complete(false);
+ arbitrationRole = MastershipRole.STANDBY;
+ }
+
+ DefaultArbitration arbitrationEventSubject = new DefaultArbitration(arbitrationRole, electionId);
+ P4RuntimeEvent event = new P4RuntimeEvent(P4RuntimeEvent.Type.ARBITRATION,
+ arbitrationEventSubject);
+ controller.postEvent(event);
}
private Collection<PiCounterCellData> doReadCounterCells(Collection<PiCounterCellId> cellIds, PiPipeconf pipeconf) {
@@ -490,7 +528,6 @@
}
private boolean doWriteActionGroupMembers(PiActionGroup group, WriteOperationType opType, PiPipeconf pipeconf) {
-
final Collection<ActionProfileMember> actionProfileMembers = Lists.newArrayList();
try {
for (PiActionGroupMember member : group.members()) {
@@ -518,6 +555,7 @@
WriteRequest writeRequestMsg = WriteRequest.newBuilder()
.setDeviceId(p4DeviceId)
+ .setElectionId(p4RuntimeElectionId)
.addAllUpdates(updateMsgs)
.build();
try {
@@ -652,7 +690,6 @@
}
private boolean doWriteActionGroup(PiActionGroup group, WriteOperationType opType, PiPipeconf pipeconf) {
-
final ActionProfileGroup actionProfileGroup;
try {
actionProfileGroup = ActionProfileGroupEncoder.encode(group, pipeconf);
@@ -663,6 +700,7 @@
final WriteRequest writeRequestMsg = WriteRequest.newBuilder()
.setDeviceId(p4DeviceId)
+ .setElectionId(p4RuntimeElectionId)
.addUpdates(Update.newBuilder()
.setEntity(Entity.newBuilder()
.setActionProfileGroup(actionProfileGroup)
diff --git a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeControllerImpl.java b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeControllerImpl.java
index 383b857..57703a1 100644
--- a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeControllerImpl.java
+++ b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeControllerImpl.java
@@ -35,6 +35,8 @@
import org.onosproject.p4runtime.api.P4RuntimeController;
import org.onosproject.p4runtime.api.P4RuntimeEvent;
import org.onosproject.p4runtime.api.P4RuntimeEventListener;
+import org.onosproject.store.service.AtomicCounter;
+import org.onosproject.store.service.StorageService;
import org.slf4j.Logger;
import java.io.IOException;
@@ -55,19 +57,26 @@
extends AbstractListenerManager<P4RuntimeEvent, P4RuntimeEventListener>
implements P4RuntimeController {
+ private static final String P4R_ELECTION = "p4runtime-election";
private final Logger log = getLogger(getClass());
private final NameResolverProvider nameResolverProvider = new DnsNameResolverProvider();
private final Map<DeviceId, P4RuntimeClient> clients = Maps.newHashMap();
private final Map<DeviceId, GrpcChannelId> channelIds = Maps.newHashMap();
// TODO: should use a cache to delete unused locks.
private final Map<DeviceId, ReadWriteLock> deviceLocks = Maps.newConcurrentMap();
+ private AtomicCounter electionIdGenerator;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
public GrpcController grpcController;
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ public StorageService storageService;
+
@Activate
public void activate() {
eventDispatcher.addSink(P4RuntimeEvent.class, listenerRegistry);
+ electionIdGenerator = storageService.getAtomicCounter(P4R_ELECTION);
+
log.info("Started");
}
@@ -186,6 +195,11 @@
}
}
+ @Override
+ public long getNewMasterElectionId() {
+ return electionIdGenerator.incrementAndGet();
+ }
+
public void postEvent(P4RuntimeEvent event) {
post(event);
}
diff --git a/protocols/p4runtime/ctl/src/test/java/org/onosproject/p4runtime/ctl/P4RuntimeGroupTest.java b/protocols/p4runtime/ctl/src/test/java/org/onosproject/p4runtime/ctl/P4RuntimeGroupTest.java
index fda5053..debf6a6 100644
--- a/protocols/p4runtime/ctl/src/test/java/org/onosproject/p4runtime/ctl/P4RuntimeGroupTest.java
+++ b/protocols/p4runtime/ctl/src/test/java/org/onosproject/p4runtime/ctl/P4RuntimeGroupTest.java
@@ -48,6 +48,7 @@
import p4.P4RuntimeOuterClass.ActionProfileGroup;
import p4.P4RuntimeOuterClass.ActionProfileMember;
import p4.P4RuntimeOuterClass.Entity;
+import p4.P4RuntimeOuterClass.Uint128;
import p4.P4RuntimeOuterClass.Update;
import p4.P4RuntimeOuterClass.WriteRequest;
@@ -101,6 +102,7 @@
private static final int SET_EGRESS_PORT_ID = 16794308;
private static final String GRPC_SERVER_NAME = "P4RuntimeGroupTest";
private static final long DEFAULT_TIMEOUT_TIME = 10;
+ private static final Uint128 DEFAULT_ELECTION_ID = Uint128.newBuilder().setLow(1).build();
private P4RuntimeClientImpl client;
private P4RuntimeControllerImpl controller;
@@ -156,6 +158,7 @@
client = new P4RuntimeClientImpl(DEVICE_ID, P4_DEVICE_ID,
grpcChannel,
controller);
+ client.p4RuntimeElectionId = DEFAULT_ELECTION_ID;
}
@Test
@@ -166,6 +169,7 @@
WriteRequest result = p4RuntimeServerImpl.getWriteReqs().get(0);
assertEquals(1, result.getDeviceId());
assertEquals(1, result.getUpdatesCount());
+ assertEquals(DEFAULT_ELECTION_ID, result.getElectionId());
Update update = result.getUpdatesList().get(0);
assertEquals(Update.Type.INSERT, update.getType());
@@ -194,6 +198,7 @@
WriteRequest result = p4RuntimeServerImpl.getWriteReqs().get(0);
assertEquals(1, result.getDeviceId());
assertEquals(3, result.getUpdatesCount());
+ assertEquals(DEFAULT_ELECTION_ID, result.getElectionId());
List<Update> updates = result.getUpdatesList();
for (Update update : updates) {
diff --git a/protocols/p4runtime/proto/BUCK b/protocols/p4runtime/proto/BUCK
index a50c71f..f8623f8 100644
--- a/protocols/p4runtime/proto/BUCK
+++ b/protocols/p4runtime/proto/BUCK
@@ -5,7 +5,7 @@
PROTOBUF_VER = '3.0.2'
GRPC_VER = '1.3.0'
-PI_COMMIT = '9fc50cd0a0187eb1346272524d4b8bafb51bb513'
+PI_COMMIT = 'a8814a8ac40838a9df83fe47a17a025b69026fcf'
PI_BASEURL = 'https://github.com/p4lang/PI.git'
# Wondering which .proto files to build? Check p4runtime's Makefile: