Improve scalability of P4Runtime subsystem
The P4Runtime client was hanging (deadlock) on a master arbitration
request. As such, all other requests (e.g. table write) were waiting
for the client's request lock to become available.
Apart from fixing those deadlocks, this patch brings a number of
improvements that all together allow to run networks of 100+ P4Runtime
devices on a single ONOS instance (before only ~20 devices)
Includes:
- Asynchrounous mastership handling in DevicHandshaker (as defined in
the P4Runtime and OpenFlow spec)
- Refactored arbitration handling in the P4RuntimeClient
to be consistent with the P4Runtime spec
- Report suspect deadlocks in P4RuntimeClientImpl
- Exploit write errors in P4RuntimeClient to quickly report
channel/mastership errors to upper layers
- Complete all futures with deadlines in P4Runtime driver
- Dump all tables in one request
- Re-purposed ChannelEvent to DeviceAgentEvent to carry also mastership
response events
- Fixed IntelliJ warnings
- Various code and log clean-ups
Change-Id: I9376793a9fe69d8eddf7e8ac2ef0ee4c14fbd198
diff --git a/protocols/p4runtime/ctl/BUCK b/protocols/p4runtime/ctl/BUCK
index cc62acb..46540b3 100644
--- a/protocols/p4runtime/ctl/BUCK
+++ b/protocols/p4runtime/ctl/BUCK
@@ -3,6 +3,7 @@
COMPILE_DEPS = [
'//lib:CORE_DEPS',
+ '//lib:KRYO',
'//protocols/grpc/api:onos-protocols-grpc-api',
'//protocols/p4runtime/api:onos-protocols-p4runtime-api',
'//protocols/p4runtime/proto:onos-protocols-p4runtime-proto',
@@ -10,6 +11,7 @@
'//lib:grpc-stub-' + GRPC_VER,
'//lib:grpc-netty-' + GRPC_VER,
'//lib:protobuf-java-' + PROTOBUF_VER,
+ '//core/store/serializers:onos-core-serializers',
]
TEST_DEPS = [
diff --git a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/ActionProfileMemberEncoder.java b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/ActionProfileMemberEncoder.java
index ba31e69..4dbcac3 100644
--- a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/ActionProfileMemberEncoder.java
+++ b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/ActionProfileMemberEncoder.java
@@ -31,7 +31,7 @@
/**
* Encoder/Decoder of action profile member.
*/
-public final class ActionProfileMemberEncoder {
+final class ActionProfileMemberEncoder {
private ActionProfileMemberEncoder() {
// Hide default constructor
}
diff --git a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/ArbitrationResponse.java b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/ArbitrationResponse.java
new file mode 100644
index 0000000..ea3c24f
--- /dev/null
+++ b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/ArbitrationResponse.java
@@ -0,0 +1,55 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.p4runtime.ctl;
+
+import org.onosproject.net.DeviceId;
+import org.onosproject.p4runtime.api.P4RuntimeEventSubject;
+
+/**
+ * Default implementation of arbitration in P4Runtime.
+ */
+class ArbitrationResponse implements P4RuntimeEventSubject {
+
+ private DeviceId deviceId;
+ private boolean isMaster;
+
+ /**
+ * Creates arbitration with given role and master flag.
+ *
+ * @param deviceId the device
+ * @param isMaster true if arbitration response signals master status
+ */
+ ArbitrationResponse(DeviceId deviceId, boolean isMaster) {
+ this.deviceId = deviceId;
+ this.isMaster = isMaster;
+ }
+
+ /**
+ * Returns true if arbitration response signals master status, false
+ * otherwise.
+ *
+ * @return boolean flag
+ */
+ boolean isMaster() {
+ return isMaster;
+ }
+
+ @Override
+ public DeviceId deviceId() {
+ return deviceId;
+ }
+}
diff --git a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/ChannelEvent.java b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/ChannelEvent.java
new file mode 100644
index 0000000..6e33514
--- /dev/null
+++ b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/ChannelEvent.java
@@ -0,0 +1,60 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.p4runtime.ctl;
+
+import org.onosproject.net.DeviceId;
+import org.onosproject.p4runtime.api.P4RuntimeEventSubject;
+
+/**
+ * Channel event in P4Runtime.
+ */
+final class ChannelEvent implements P4RuntimeEventSubject {
+
+ enum Type {
+ OPEN,
+ CLOSED,
+ ERROR
+ }
+
+ private DeviceId deviceId;
+ private Type type;
+
+ /**
+ * Creates channel event with given status and throwable.
+ *
+ * @param deviceId the device
+ * @param type error type
+ */
+ ChannelEvent(DeviceId deviceId, Type type) {
+ this.deviceId = deviceId;
+ this.type = type;
+ }
+
+ /**
+ * Gets the type of this event.
+ *
+ * @return the error type
+ */
+ public Type type() {
+ return type;
+ }
+
+ @Override
+ public DeviceId deviceId() {
+ return deviceId;
+ }
+}
diff --git a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/DefaultArbitration.java b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/DefaultArbitration.java
deleted file mode 100644
index e8b8658..0000000
--- a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/DefaultArbitration.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Copyright 2017-present Open Networking Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.onosproject.p4runtime.ctl;
-
-import org.onosproject.net.DeviceId;
-import org.onosproject.net.MastershipRole;
-import org.onosproject.p4runtime.api.P4RuntimeEventSubject;
-import p4.v1.P4RuntimeOuterClass.Uint128;
-
-/**
- * Default implementation of arbitration in P4Runtime.
- */
-public class DefaultArbitration implements P4RuntimeEventSubject {
- private MastershipRole role;
- private Uint128 electionId;
- private DeviceId deviceId;
-
- /**
- * Creates arbitration with given role and election id.
- *
- * @param deviceId the device
- * @param role the role
- * @param electionId the election id
- */
- DefaultArbitration(DeviceId deviceId, MastershipRole role, Uint128 electionId) {
- this.deviceId = deviceId;
- this.role = role;
- this.electionId = electionId;
- }
-
- /**
- * Gets the role of this arbitration.
- *
- * @return the role
- */
- public MastershipRole role() {
- return role;
- }
-
- /**
- * Gets election id of this arbitration.
- *
- * @return the election id
- */
- public Uint128 electionId() {
- return electionId;
- }
-
- @Override
- public DeviceId deviceId() {
- return deviceId;
- }
-}
diff --git a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/DefaultChannelEvent.java b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/DefaultChannelEvent.java
deleted file mode 100644
index 5f5b3b9..0000000
--- a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/DefaultChannelEvent.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Copyright 2018-present Open Networking Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.onosproject.p4runtime.ctl;
-
-import org.onosproject.net.DeviceId;
-import org.onosproject.net.device.ChannelEvent.Type;
-import org.onosproject.p4runtime.api.P4RuntimeEventSubject;
-
-/**
- * Default implementation of channel event in P4Runtime. It allows passing any type of event.
- * If the event is an error a throwable can be directly passed.
- * Any other type of event cause can be passed as string.
- */
-public class DefaultChannelEvent implements P4RuntimeEventSubject {
- private DeviceId deviceId;
- private Type type;
- private Throwable throwable;
- private String message;
-
- /**
- * Creates channel event with given status and throwable.
- *
- * @param deviceId the device
- * @param type error type
- * @param throwable the cause
- */
- public DefaultChannelEvent(DeviceId deviceId, Type type, Throwable throwable) {
- this.deviceId = deviceId;
- this.type = type;
- this.message = throwable.getMessage();
- this.throwable = throwable;
- }
-
- /**
- * Creates channel event with given status and string cause.
- *
- * @param deviceId the device
- * @param type error type
- * @param message the message
- */
- public DefaultChannelEvent(DeviceId deviceId, Type type, String message) {
- this.deviceId = deviceId;
- this.type = type;
- this.message = message;
- this.throwable = null;
- }
-
- /**
- * Creates channel event with given status, cause and throwable.
- *
- * @param deviceId the device
- * @param type error type
- * @param message the message
- * @param throwable the cause
- */
- public DefaultChannelEvent(DeviceId deviceId, Type type, String message, Throwable throwable) {
- this.deviceId = deviceId;
- this.type = type;
- this.message = message;
- this.throwable = throwable;
- }
-
- /**
- * Gets the type of this event.
- *
- * @return the error type
- */
- public Type type() {
- return type;
- }
-
- /**
- * Gets the message related to this event.
- *
- * @return the message
- */
- public String message() {
- return message;
- }
-
-
- /**
- * Gets throwable of this event.
- * If no throwable is present returns null.
- *
- * @return the throwable
- */
- public Throwable throwable() {
- return throwable;
- }
-
- @Override
- public DeviceId deviceId() {
- return deviceId;
- }
-}
diff --git a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/DistributedElectionIdGenerator.java b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/DistributedElectionIdGenerator.java
new file mode 100644
index 0000000..ecfe35d
--- /dev/null
+++ b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/DistributedElectionIdGenerator.java
@@ -0,0 +1,85 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.p4runtime.ctl;
+
+import org.onlab.util.KryoNamespace;
+import org.onosproject.net.DeviceId;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.AtomicCounterMap;
+import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.StorageService;
+import org.slf4j.Logger;
+
+import java.math.BigInteger;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static org.slf4j.LoggerFactory.getLogger;
+
+/**
+ * Distributed implementation of a generator of P4Runtime election IDs.
+ */
+class DistributedElectionIdGenerator {
+
+ private final Logger log = getLogger(this.getClass());
+
+ private AtomicCounterMap<DeviceId> electionIds;
+
+ /**
+ * Creates a new election ID generator using the given storage service.
+ *
+ * @param storageService storage service
+ */
+ DistributedElectionIdGenerator(StorageService storageService) {
+ KryoNamespace serializer = KryoNamespace.newBuilder()
+ .register(KryoNamespaces.API)
+ .build();
+ this.electionIds = storageService.<DeviceId>atomicCounterMapBuilder()
+ .withName("p4runtime-election-ids")
+ .withSerializer(Serializer.using(serializer))
+ .build();
+ }
+
+ /**
+ * Returns an election ID for the given device ID. The first election ID for
+ * a given device ID is always 1.
+ *
+ * @param deviceId device ID
+ * @return new election ID
+ */
+ BigInteger generate(DeviceId deviceId) {
+ if (electionIds == null) {
+ return null;
+ }
+ // Default value is 0 for AtomicCounterMap.
+ return BigInteger.valueOf(electionIds.incrementAndGet(deviceId));
+ }
+
+ /**
+ * Destroy the backing distributed primitive of this generator.
+ */
+ void destroy() {
+ try {
+ electionIds.destroy().get(10, TimeUnit.SECONDS);
+ } catch (InterruptedException | ExecutionException | TimeoutException e) {
+ log.error("Exception while destroying distributed counter map", e);
+ } finally {
+ electionIds = null;
+ }
+ }
+}
diff --git a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeClientImpl.java b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeClientImpl.java
index 43d01e5..c952e61 100644
--- a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeClientImpl.java
+++ b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeClientImpl.java
@@ -19,7 +19,6 @@
import com.google.common.collect.HashMultimap;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
@@ -30,10 +29,9 @@
import io.grpc.stub.StreamObserver;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.onlab.osgi.DefaultServiceDirectory;
+import org.onlab.util.SharedExecutors;
import org.onlab.util.Tools;
import org.onosproject.net.DeviceId;
-import org.onosproject.net.MastershipRole;
-import org.onosproject.net.device.ChannelEvent;
import org.onosproject.net.pi.model.PiActionProfileId;
import org.onosproject.net.pi.model.PiCounterId;
import org.onosproject.net.pi.model.PiMeterId;
@@ -52,6 +50,8 @@
import org.onosproject.p4runtime.api.P4RuntimeClient;
import org.onosproject.p4runtime.api.P4RuntimeEvent;
import org.slf4j.Logger;
+import p4.config.v1.P4InfoOuterClass.P4Info;
+import p4.tmp.P4Config;
import p4.v1.P4RuntimeGrpc;
import p4.v1.P4RuntimeOuterClass;
import p4.v1.P4RuntimeOuterClass.ActionProfileGroup;
@@ -59,7 +59,6 @@
import p4.v1.P4RuntimeOuterClass.Entity;
import p4.v1.P4RuntimeOuterClass.ForwardingPipelineConfig;
import p4.v1.P4RuntimeOuterClass.MasterArbitrationUpdate;
-import p4.v1.P4RuntimeOuterClass.PacketIn;
import p4.v1.P4RuntimeOuterClass.ReadRequest;
import p4.v1.P4RuntimeOuterClass.ReadResponse;
import p4.v1.P4RuntimeOuterClass.SetForwardingPipelineConfigRequest;
@@ -69,9 +68,8 @@
import p4.v1.P4RuntimeOuterClass.Uint128;
import p4.v1.P4RuntimeOuterClass.Update;
import p4.v1.P4RuntimeOuterClass.WriteRequest;
-import p4.config.v1.P4InfoOuterClass.P4Info;
-import p4.tmp.P4Config;
+import java.math.BigInteger;
import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.Collections;
@@ -81,7 +79,6 @@
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -99,13 +96,17 @@
import static p4.v1.P4RuntimeOuterClass.Entity.EntityCase.ACTION_PROFILE_GROUP;
import static p4.v1.P4RuntimeOuterClass.Entity.EntityCase.ACTION_PROFILE_MEMBER;
import static p4.v1.P4RuntimeOuterClass.Entity.EntityCase.TABLE_ENTRY;
+import static p4.v1.P4RuntimeOuterClass.PacketIn;
import static p4.v1.P4RuntimeOuterClass.PacketOut;
import static p4.v1.P4RuntimeOuterClass.SetForwardingPipelineConfigRequest.Action.VERIFY_AND_COMMIT;
/**
* Implementation of a P4Runtime client.
*/
-public final class P4RuntimeClientImpl implements P4RuntimeClient {
+final class P4RuntimeClientImpl implements P4RuntimeClient {
+
+ // Timeout in seconds to obtain the client lock.
+ private static final int LOCK_TIMEOUT = 10;
private static final Map<WriteOperationType, Update.Type> UPDATE_TYPES = ImmutableMap.of(
WriteOperationType.UNSPECIFIED, Update.Type.UNSPECIFIED,
@@ -116,18 +117,20 @@
private final Logger log = getLogger(getClass());
+ private final Lock requestLock = new ReentrantLock();
+ private final Context.CancellableContext cancellableContext =
+ Context.current().withCancellation();
+
private final DeviceId deviceId;
private final long p4DeviceId;
private final P4RuntimeControllerImpl controller;
private final P4RuntimeGrpc.P4RuntimeBlockingStub blockingStub;
- private final Context.CancellableContext cancellableContext;
private final ExecutorService executorService;
private final Executor contextExecutor;
- private final Lock writeLock = new ReentrantLock();
private final StreamObserver<StreamMessageRequest> streamRequestObserver;
- private Map<Uint128, CompletableFuture<Boolean>> arbitrationUpdateMap = Maps.newConcurrentMap();
- protected Uint128 p4RuntimeElectionId;
+ // Used by this client for write requests.
+ private Uint128 clientElectionId = Uint128.newBuilder().setLow(1).build();
/**
* Default constructor.
@@ -142,45 +145,77 @@
this.deviceId = deviceId;
this.p4DeviceId = p4DeviceId;
this.controller = controller;
- this.cancellableContext = Context.current().withCancellation();
this.executorService = Executors.newFixedThreadPool(15, groupedThreads(
- "onos/p4runtime-client-" + deviceId.toString(),
- deviceId.toString() + "-%d"));
+ "onos-p4runtime-client-" + deviceId.toString(), "%d"));
this.contextExecutor = this.cancellableContext.fixedContextExecutor(executorService);
- //TODO Investigate deadline or timeout in supplyInContext Method
+ //TODO Investigate use of stub deadlines instead of timeout in supplyInContext
this.blockingStub = P4RuntimeGrpc.newBlockingStub(channel);
- P4RuntimeGrpc.P4RuntimeStub asyncStub = P4RuntimeGrpc.newStub(channel);
- this.streamRequestObserver = asyncStub.streamChannel(new StreamChannelResponseObserver());
+ this.streamRequestObserver = P4RuntimeGrpc.newStub(channel)
+ .streamChannel(new StreamChannelResponseObserver());
}
/**
- * Executes the given task (supplier) in the gRPC context executor of this client, such that if the context is
- * cancelled (e.g. client shutdown) the RPC is automatically cancelled.
- * <p>
- * Important: Tasks submitted in parallel by different threads are forced executed sequentially.
- * <p>
+ * Submits a task for async execution via the given executor.
+ * All tasks submitted with this method will be executed sequentially.
*/
- private <U> CompletableFuture<U> supplyInContext(Supplier<U> supplier, String opDescription) {
+ private <U> CompletableFuture<U> supplyWithExecutor(
+ Supplier<U> supplier, String opDescription, Executor executor) {
return CompletableFuture.supplyAsync(() -> {
// TODO: explore a more relaxed locking strategy.
- writeLock.lock();
+ try {
+ if (!requestLock.tryLock(LOCK_TIMEOUT, TimeUnit.SECONDS)) {
+ log.error("LOCK TIMEOUT! This is likely a deadlock, "
+ + "please debug (executing {})",
+ opDescription);
+ throw new IllegalThreadStateException("Lock timeout");
+ }
+ } catch (InterruptedException e) {
+ log.warn("Thread interrupted while waiting for lock (executing {})",
+ opDescription);
+ throw new RuntimeException(e);
+ }
try {
return supplier.get();
} catch (StatusRuntimeException ex) {
- log.warn("Unable to execute {} on {}: {}", opDescription, deviceId, ex.toString());
+ log.warn("Unable to execute {} on {}: {}",
+ opDescription, deviceId, ex.toString());
throw ex;
} catch (Throwable ex) {
- log.error("Exception in client of {}, executing {}", deviceId, opDescription, ex);
+ log.error("Exception in client of {}, executing {}",
+ deviceId, opDescription, ex);
throw ex;
} finally {
- writeLock.unlock();
+ requestLock.unlock();
}
- }, contextExecutor);
+ }, executor);
+ }
+
+ /**
+ * Equivalent of supplyWithExecutor using the gRPC context executor of this
+ * client, such that if the context is cancelled (e.g. client shutdown) the
+ * RPC is automatically cancelled.
+ */
+ private <U> CompletableFuture<U> supplyInContext(
+ Supplier<U> supplier, String opDescription) {
+ return supplyWithExecutor(supplier, opDescription, contextExecutor);
}
@Override
- public CompletableFuture<Boolean> initStreamChannel() {
- return supplyInContext(this::doInitStreamChannel, "initStreamChannel");
+ public CompletableFuture<Boolean> start() {
+ return supplyInContext(this::doInitStreamChannel,
+ "start-initStreamChannel");
+ }
+
+ @Override
+ public CompletableFuture<Void> shutdown() {
+ return supplyWithExecutor(this::doShutdown, "shutdown",
+ SharedExecutors.getPoolThreadExecutor());
+ }
+
+ @Override
+ public CompletableFuture<Boolean> becomeMaster() {
+ return supplyInContext(this::doBecomeMaster,
+ "becomeMaster");
}
@Override
@@ -201,6 +236,11 @@
}
@Override
+ public CompletableFuture<Collection<PiTableEntry>> dumpAllTables(PiPipeconf pipeconf) {
+ return supplyInContext(() -> doDumpTable(null, pipeconf), "dumpAllTables");
+ }
+
+ @Override
public CompletableFuture<Boolean> packetOut(PiPacketOperation packet, PiPipeconf pipeconf) {
return supplyInContext(() -> doPacketOut(packet, pipeconf), "packetOut");
}
@@ -245,11 +285,6 @@
}
@Override
- public CompletableFuture<Boolean> sendMasterArbitrationUpdate() {
- return supplyInContext(this::doArbitrationUpdate, "arbitrationUpdate");
- }
-
- @Override
public CompletableFuture<Boolean> writeMeterCells(Collection<PiMeterCellConfig> cellIds, PiPipeconf pipeconf) {
return supplyInContext(() -> doWriteMeterCells(cellIds, pipeconf),
@@ -272,42 +307,48 @@
/* Blocking method implementations below */
- private boolean doArbitrationUpdate() {
+ private boolean doBecomeMaster() {
+ final Uint128 newId = bigIntegerToUint128(
+ controller.newMasterElectionId(deviceId));
+ if (sendMasterArbitrationUpdate(newId)) {
+ clientElectionId = newId;
+ return true;
+ }
+ return false;
+ }
- CompletableFuture<Boolean> result = new CompletableFuture<>();
- // TODO: currently we use 64-bit Long type for election id, should
- // we use 128-bit ?
- long nextElectId = controller.getNewMasterElectionId();
- Uint128 newElectionId = Uint128.newBuilder()
- .setLow(nextElectId)
- .build();
- MasterArbitrationUpdate arbitrationUpdate = MasterArbitrationUpdate.newBuilder()
- .setDeviceId(p4DeviceId)
- .setElectionId(newElectionId)
- .build();
- StreamMessageRequest requestMsg = StreamMessageRequest.newBuilder()
- .setArbitration(arbitrationUpdate)
- .build();
- log.debug("Sending arbitration update to {} with election id {}...",
- deviceId, newElectionId);
- arbitrationUpdateMap.put(newElectionId, result);
+ private boolean sendMasterArbitrationUpdate(Uint128 electionId) {
+ log.info("Sending arbitration update to {}... electionId={}",
+ deviceId, uint128ToBigInteger(electionId));
try {
- streamRequestObserver.onNext(requestMsg);
- return result.get();
+ streamRequestObserver.onNext(
+ StreamMessageRequest.newBuilder()
+ .setArbitration(
+ MasterArbitrationUpdate
+ .newBuilder()
+ .setDeviceId(p4DeviceId)
+ .setElectionId(electionId)
+ .build())
+ .build());
+ return true;
} catch (StatusRuntimeException e) {
log.error("Unable to perform arbitration update on {}: {}", deviceId, e.getMessage());
- arbitrationUpdateMap.remove(newElectionId);
- return false;
- } catch (InterruptedException | ExecutionException e) {
- log.warn("Arbitration update failed for {} due to {}", deviceId, e);
- arbitrationUpdateMap.remove(newElectionId);
- return false;
}
+ return false;
}
+
private boolean doInitStreamChannel() {
// To listen for packets and other events, we need to start the RPC.
- // Here we do it by sending a master arbitration update.
- return doArbitrationUpdate();
+ // Here we send an empty StreamMessageRequest.
+ try {
+ log.info("Starting stream channel with {}...", deviceId);
+ streamRequestObserver.onNext(StreamMessageRequest.newBuilder().build());
+ return true;
+ } catch (StatusRuntimeException e) {
+ log.error("Unable to start stream channel with {}: {}",
+ deviceId, e.getMessage());
+ return false;
+ }
}
private boolean doSetPipelineConfig(PiPipeconf pipeconf, ByteBuffer deviceData) {
@@ -338,7 +379,7 @@
SetForwardingPipelineConfigRequest request = SetForwardingPipelineConfigRequest
.newBuilder()
.setDeviceId(p4DeviceId)
- .setElectionId(p4RuntimeElectionId)
+ .setElectionId(clientElectionId)
.setAction(VERIFY_AND_COMMIT)
.setConfig(pipelineConfig)
.build();
@@ -380,7 +421,7 @@
writeRequestBuilder
.setDeviceId(p4DeviceId)
- .setElectionId(p4RuntimeElectionId)
+ .setElectionId(clientElectionId)
.addAllUpdates(updateMsgs)
.build();
@@ -388,7 +429,7 @@
blockingStub.write(writeRequestBuilder.build());
return true;
} catch (StatusRuntimeException e) {
- logWriteErrors(piTableEntries, e, opType, "table entry");
+ checkAndLogWriteErrors(piTableEntries, e, opType, "table entry");
return false;
}
}
@@ -397,13 +438,18 @@
log.debug("Dumping table {} from {} (pipeconf {})...", piTableId, deviceId, pipeconf.id());
- P4InfoBrowser browser = PipeconfHelper.getP4InfoBrowser(pipeconf);
int tableId;
- try {
- tableId = browser.tables().getByName(piTableId.id()).getPreamble().getId();
- } catch (P4InfoBrowser.NotFoundException e) {
- log.warn("Unable to dump table: {}", e.getMessage());
- return Collections.emptyList();
+ if (piTableId == null) {
+ // Dump all tables.
+ tableId = 0;
+ } else {
+ P4InfoBrowser browser = PipeconfHelper.getP4InfoBrowser(pipeconf);
+ try {
+ tableId = browser.tables().getByName(piTableId.id()).getPreamble().getId();
+ } catch (P4InfoBrowser.NotFoundException e) {
+ log.warn("Unable to dump table: {}", e.getMessage());
+ return Collections.emptyList();
+ }
}
ReadRequest requestMsg = ReadRequest.newBuilder()
@@ -474,40 +520,29 @@
}
// Decode packet message and post event.
PiPacketOperation packetOperation = PacketIOCodec.decodePacketIn(packetInMsg, pipeconf, deviceId);
- DefaultPacketIn packetInEventSubject = new DefaultPacketIn(deviceId, packetOperation);
+ PacketInEvent packetInEventSubject = new PacketInEvent(deviceId, packetOperation);
P4RuntimeEvent event = new P4RuntimeEvent(P4RuntimeEvent.Type.PACKET_IN, packetInEventSubject);
log.debug("Received packet in: {}", event);
controller.postEvent(event);
}
- private void doArbitrationUpdateFromDevice(MasterArbitrationUpdate arbitrationMsg) {
- log.debug("Received arbitration update from {}: {}", deviceId, arbitrationMsg);
-
- Uint128 electionId = arbitrationMsg.getElectionId();
- CompletableFuture<Boolean> mastershipFeature = arbitrationUpdateMap.remove(electionId);
-
- if (mastershipFeature == null) {
- log.warn("Can't find completable future of election id {}", electionId);
+ private void doArbitrationResponse(MasterArbitrationUpdate msg) {
+ // From the spec...
+ // - Election_id: The stream RPC with the highest election_id is the
+ // master. Switch populates with the highest election ID it
+ // has received from all connected controllers.
+ // - Status: Switch populates this with OK for the client that is the
+ // master, and with an error status for all other connected clients (at
+ // every mastership change).
+ if (!msg.hasElectionId() || !msg.hasStatus()) {
return;
}
-
- this.p4RuntimeElectionId = electionId;
- int statusCode = arbitrationMsg.getStatus().getCode();
- MastershipRole arbitrationRole;
- // arbitration update success
-
- if (statusCode == Status.OK.getCode().value()) {
- mastershipFeature.complete(true);
- arbitrationRole = MastershipRole.MASTER;
- } else {
- mastershipFeature.complete(false);
- arbitrationRole = MastershipRole.STANDBY;
- }
-
- DefaultArbitration arbitrationEventSubject = new DefaultArbitration(deviceId, arbitrationRole, electionId);
- P4RuntimeEvent event = new P4RuntimeEvent(P4RuntimeEvent.Type.ARBITRATION,
- arbitrationEventSubject);
- controller.postEvent(event);
+ final boolean isMaster = msg.getStatus().getCode() == Status.OK.getCode().value();
+ log.info("Received arbitration update from {}: isMaster={}, electionId={}",
+ deviceId, isMaster, uint128ToBigInteger(msg.getElectionId()));
+ controller.postEvent(new P4RuntimeEvent(
+ P4RuntimeEvent.Type.ARBITRATION_RESPONSE,
+ new ArbitrationResponse(deviceId, isMaster)));
}
private Collection<PiCounterCellData> doReadAllCounterCells(
@@ -583,14 +618,14 @@
WriteRequest writeRequestMsg = WriteRequest.newBuilder()
.setDeviceId(p4DeviceId)
- .setElectionId(p4RuntimeElectionId)
+ .setElectionId(clientElectionId)
.addAllUpdates(updateMsgs)
.build();
try {
blockingStub.write(writeRequestMsg);
return true;
} catch (StatusRuntimeException e) {
- logWriteErrors(members, e, opType, "group member");
+ checkAndLogWriteErrors(members, e, opType, "group member");
return false;
}
}
@@ -729,7 +764,7 @@
final WriteRequest writeRequestMsg = WriteRequest.newBuilder()
.setDeviceId(p4DeviceId)
- .setElectionId(p4RuntimeElectionId)
+ .setElectionId(clientElectionId)
.addUpdates(Update.newBuilder()
.setEntity(Entity.newBuilder()
.setActionProfileGroup(actionProfileGroup)
@@ -741,7 +776,7 @@
blockingStub.write(writeRequestMsg);
return true;
} catch (StatusRuntimeException e) {
- logWriteErrors(Collections.singleton(group), e, opType, "group");
+ checkAndLogWriteErrors(Collections.singleton(group), e, opType, "group");
return false;
}
}
@@ -814,7 +849,7 @@
writeRequestBuilder
.setDeviceId(p4DeviceId)
- .setElectionId(p4RuntimeElectionId)
+ .setElectionId(clientElectionId)
.addAllUpdates(updateMsgs)
.build();
try {
@@ -827,53 +862,34 @@
}
}
- /**
- * Returns the internal P4 device ID associated with this client.
- *
- * @return P4 device ID
- */
- public long p4DeviceId() {
- return p4DeviceId;
- }
-
- /**
- * For testing purpose only. TODO: remove before release.
- *
- * @return blocking stub
- */
- public P4RuntimeGrpc.P4RuntimeBlockingStub blockingStub() {
- return this.blockingStub;
- }
-
-
- @Override
- public void shutdown() {
-
+ private Void doShutdown() {
log.info("Shutting down client for {}...", deviceId);
-
- writeLock.lock();
- try {
- if (streamRequestObserver != null) {
- streamRequestObserver.onCompleted();
- cancellableContext.cancel(new InterruptedException("Requested client shutdown"));
- }
-
- this.executorService.shutdown();
+ if (streamRequestObserver != null) {
try {
- executorService.awaitTermination(5, TimeUnit.SECONDS);
- } catch (InterruptedException e) {
- log.warn("Executor service didn't shutdown in time.");
- Thread.currentThread().interrupt();
+ streamRequestObserver.onCompleted();
+ } catch (IllegalStateException e) {
+ // Thrown if stream channel is already completed. Can ignore.
+ log.debug("Ignored expection: {}", e);
}
- } finally {
- writeLock.unlock();
+ cancellableContext.cancel(new InterruptedException(
+ "Requested client shutdown"));
}
+ this.executorService.shutdown();
+ try {
+ executorService.awaitTermination(5, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ log.warn("Executor service didn't shutdown in time.");
+ Thread.currentThread().interrupt();
+ }
+ return null;
}
- private <E extends PiEntity> void logWriteErrors(Collection<E> writeEntities,
- StatusRuntimeException ex,
- WriteOperationType opType,
- String entryType) {
+ private <E extends PiEntity> void checkAndLogWriteErrors(
+ Collection<E> writeEntities, StatusRuntimeException ex,
+ WriteOperationType opType, String entryType) {
+
+ checkGrpcException(ex);
+
List<P4RuntimeOuterClass.Error> errors = null;
String description = null;
try {
@@ -946,10 +962,80 @@
err.hasDetails() ? "\n" + err.getDetails().toString() : "");
}
+ private void checkGrpcException(StatusRuntimeException ex) {
+ switch (ex.getStatus().getCode()) {
+ case OK:
+ break;
+ case CANCELLED:
+ break;
+ case UNKNOWN:
+ break;
+ case INVALID_ARGUMENT:
+ break;
+ case DEADLINE_EXCEEDED:
+ break;
+ case NOT_FOUND:
+ break;
+ case ALREADY_EXISTS:
+ break;
+ case PERMISSION_DENIED:
+ // Notify upper layers that this node is not master.
+ controller.postEvent(new P4RuntimeEvent(
+ P4RuntimeEvent.Type.ARBITRATION_RESPONSE,
+ new ArbitrationResponse(deviceId, false)));
+ break;
+ case RESOURCE_EXHAUSTED:
+ break;
+ case FAILED_PRECONDITION:
+ break;
+ case ABORTED:
+ break;
+ case OUT_OF_RANGE:
+ break;
+ case UNIMPLEMENTED:
+ break;
+ case INTERNAL:
+ break;
+ case UNAVAILABLE:
+ // Channel might be closed.
+ controller.postEvent(new P4RuntimeEvent(
+ P4RuntimeEvent.Type.CHANNEL_EVENT,
+ new ChannelEvent(deviceId, ChannelEvent.Type.ERROR)));
+ break;
+ case DATA_LOSS:
+ break;
+ case UNAUTHENTICATED:
+ break;
+ default:
+ break;
+ }
+ }
+
+ private Uint128 bigIntegerToUint128(BigInteger value) {
+ final byte[] arr = value.toByteArray();
+ final ByteBuffer bb = ByteBuffer.allocate(Long.BYTES * 2)
+ .put(new byte[Long.BYTES * 2 - arr.length])
+ .put(arr);
+ bb.rewind();
+ return Uint128.newBuilder()
+ .setHigh(bb.getLong())
+ .setLow(bb.getLong())
+ .build();
+ }
+
+ private BigInteger uint128ToBigInteger(Uint128 value) {
+ return new BigInteger(
+ ByteBuffer.allocate(Long.BYTES * 2)
+ .putLong(value.getHigh())
+ .putLong(value.getLow())
+ .array());
+ }
+
/**
* Handles messages received from the device on the stream channel.
*/
- private class StreamChannelResponseObserver implements StreamObserver<StreamMessageResponse> {
+ private class StreamChannelResponseObserver
+ implements StreamObserver<StreamMessageResponse> {
@Override
public void onNext(StreamMessageResponse message) {
@@ -958,40 +1044,40 @@
private void doNext(StreamMessageResponse message) {
try {
- log.debug("Received message on stream channel from {}: {}", deviceId, message.getUpdateCase());
+ log.debug("Received message on stream channel from {}: {}",
+ deviceId, message.getUpdateCase());
switch (message.getUpdateCase()) {
case PACKET:
- // Packet-in
doPacketIn(message.getPacket());
return;
case ARBITRATION:
- doArbitrationUpdateFromDevice(message.getArbitration());
+ doArbitrationResponse(message.getArbitration());
return;
default:
- log.warn("Unrecognized stream message from {}: {}", deviceId, message.getUpdateCase());
+ log.warn("Unrecognized stream message from {}: {}",
+ deviceId, message.getUpdateCase());
}
} catch (Throwable ex) {
- log.error("Exception while processing stream channel message from {}", deviceId, ex);
+ log.error("Exception while processing stream message from {}",
+ deviceId, ex);
}
}
@Override
public void onError(Throwable throwable) {
- log.warn("Error on stream channel for {}: {}", deviceId, Status.fromThrowable(throwable));
- controller.postEvent(new P4RuntimeEvent(P4RuntimeEvent.Type.CHANNEL_EVENT,
- new DefaultChannelEvent(deviceId, ChannelEvent.Type.CHANNEL_ERROR,
- throwable)));
- // FIXME: we might want to recreate the channel.
- // In general, we want to be robust against any transient error and, if the channel is open, make sure the
- // stream channel is always on.
+ log.warn("Error on stream channel for {}: {}",
+ deviceId, Status.fromThrowable(throwable));
+ controller.postEvent(new P4RuntimeEvent(
+ P4RuntimeEvent.Type.CHANNEL_EVENT,
+ new ChannelEvent(deviceId, ChannelEvent.Type.ERROR)));
}
@Override
public void onCompleted() {
log.warn("Stream channel for {} has completed", deviceId);
- controller.postEvent(new P4RuntimeEvent(P4RuntimeEvent.Type.CHANNEL_EVENT,
- new DefaultChannelEvent(deviceId, ChannelEvent.Type.CHANNEL_DISCONNECTED,
- "Stream channel has completed")));
+ controller.postEvent(new P4RuntimeEvent(
+ P4RuntimeEvent.Type.CHANNEL_EVENT,
+ new ChannelEvent(deviceId, ChannelEvent.Type.CLOSED)));
}
}
}
diff --git a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeControllerImpl.java b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeControllerImpl.java
index 987356b..d2773b2 100644
--- a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeControllerImpl.java
+++ b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeControllerImpl.java
@@ -35,20 +35,21 @@
import org.onosproject.grpc.api.GrpcChannelId;
import org.onosproject.grpc.api.GrpcController;
import org.onosproject.net.DeviceId;
-import org.onosproject.net.device.ChannelEvent;
-import org.onosproject.net.device.ChannelListener;
+import org.onosproject.net.device.DeviceAgentEvent;
+import org.onosproject.net.device.DeviceAgentListener;
import org.onosproject.p4runtime.api.P4RuntimeClient;
import org.onosproject.p4runtime.api.P4RuntimeController;
import org.onosproject.p4runtime.api.P4RuntimeEvent;
import org.onosproject.p4runtime.api.P4RuntimeEventListener;
-import org.onosproject.store.service.AtomicCounter;
import org.onosproject.store.service.StorageService;
import org.slf4j.Logger;
import java.io.IOException;
-import java.util.ArrayList;
+import java.math.BigInteger;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -65,14 +66,13 @@
extends AbstractListenerManager<P4RuntimeEvent, P4RuntimeEventListener>
implements P4RuntimeController {
- private static final String P4R_ELECTION = "p4runtime-election";
private static final int DEVICE_LOCK_EXPIRE_TIME_IN_MIN = 10;
private final Logger log = getLogger(getClass());
private final NameResolverProvider nameResolverProvider = new DnsNameResolverProvider();
private final Map<DeviceId, ClientKey> deviceIdToClientKey = Maps.newHashMap();
private final Map<ClientKey, P4RuntimeClient> clientKeyToClient = Maps.newHashMap();
private final Map<DeviceId, GrpcChannelId> channelIds = Maps.newHashMap();
- private final Map<DeviceId, List<ChannelListener>> channelListeners = Maps.newConcurrentMap();
+ private final ConcurrentMap<DeviceId, List<DeviceAgentListener>> deviceAgentListeners = Maps.newConcurrentMap();
private final LoadingCache<DeviceId, ReadWriteLock> deviceLocks = CacheBuilder.newBuilder()
.expireAfterAccess(DEVICE_LOCK_EXPIRE_TIME_IN_MIN, TimeUnit.MINUTES)
.build(new CacheLoader<DeviceId, ReadWriteLock>() {
@@ -81,8 +81,7 @@
return new ReentrantReadWriteLock();
}
});
-
- private AtomicCounter electionIdGenerator;
+ private DistributedElectionIdGenerator electionIdGenerator;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
private GrpcController grpcController;
@@ -93,8 +92,7 @@
@Activate
public void activate() {
eventDispatcher.addSink(P4RuntimeEvent.class, listenerRegistry);
- electionIdGenerator = storageService.getAtomicCounter(P4R_ELECTION);
-
+ electionIdGenerator = new DistributedElectionIdGenerator(storageService);
log.info("Started");
}
@@ -102,6 +100,8 @@
@Deactivate
public void deactivate() {
grpcController = null;
+ electionIdGenerator.destroy();
+ electionIdGenerator = null;
eventDispatcher.removeSink(P4RuntimeEvent.class);
log.info("Stopped");
}
@@ -119,13 +119,13 @@
.usePlaintext(true);
deviceLocks.getUnchecked(deviceId).writeLock().lock();
- log.info("Creating client for {} (server={}:{}, p4DeviceId={})...",
- deviceId, serverAddr, serverPort, p4DeviceId);
try {
if (deviceIdToClientKey.containsKey(deviceId)) {
final ClientKey existingKey = deviceIdToClientKey.get(deviceId);
if (newKey.equals(existingKey)) {
+ log.info("Not creating client for {} as it already exists (server={}:{}, p4DeviceId={})...",
+ deviceId, serverAddr, serverPort, p4DeviceId);
return true;
} else {
throw new IllegalStateException(
@@ -133,6 +133,8 @@
"server endpoints already exists");
}
} else {
+ log.info("Creating client for {} (server={}:{}, p4DeviceId={})...",
+ deviceId, serverAddr, serverPort, p4DeviceId);
return doCreateClient(newKey, channelBuilder);
}
} finally {
@@ -187,12 +189,11 @@
public void removeClient(DeviceId deviceId) {
deviceLocks.getUnchecked(deviceId).writeLock().lock();
-
try {
if (deviceIdToClientKey.containsKey(deviceId)) {
final ClientKey clientKey = deviceIdToClientKey.get(deviceId);
- grpcController.disconnectChannel(channelIds.get(deviceId));
clientKeyToClient.remove(clientKey).shutdown();
+ grpcController.disconnectChannel(channelIds.get(deviceId));
deviceIdToClientKey.remove(deviceId);
channelIds.remove(deviceId);
}
@@ -222,7 +223,7 @@
log.debug("No client for {}, can't check for reachability", deviceId);
return false;
}
-
+ // FIXME: we're not checking for a P4Runtime server, it could be any gRPC service
return grpcController.isChannelOpen(channelIds.get(deviceId));
} finally {
deviceLocks.getUnchecked(deviceId).readLock().unlock();
@@ -230,67 +231,73 @@
}
@Override
- public long getNewMasterElectionId() {
- return electionIdGenerator.incrementAndGet();
+ public void addDeviceAgentListener(DeviceId deviceId, DeviceAgentListener listener) {
+ deviceAgentListeners.putIfAbsent(deviceId, new CopyOnWriteArrayList<>());
+ deviceAgentListeners.get(deviceId).add(listener);
}
@Override
- public void addChannelListener(DeviceId deviceId, ChannelListener listener) {
- channelListeners.compute(deviceId, (devId, listeners) -> {
- List<ChannelListener> newListeners;
- if (listeners != null) {
- newListeners = listeners;
- } else {
- newListeners = new ArrayList<>();
- }
- newListeners.add(listener);
- return newListeners;
+ public void removeDeviceAgentListener(DeviceId deviceId, DeviceAgentListener listener) {
+ deviceAgentListeners.computeIfPresent(deviceId, (did, listeners) -> {
+ listeners.remove(listener);
+ return listeners;
});
}
- @Override
- public void removeChannelListener(DeviceId deviceId, ChannelListener listener) {
- channelListeners.compute(deviceId, (devId, listeners) -> {
- if (listeners != null) {
- listeners.remove(listener);
- return listeners;
- } else {
- log.debug("Device {} has no listener registered", deviceId);
- return null;
- }
- });
+ BigInteger newMasterElectionId(DeviceId deviceId) {
+ return electionIdGenerator.generate(deviceId);
}
void postEvent(P4RuntimeEvent event) {
- if (event.type().equals(P4RuntimeEvent.Type.CHANNEL_EVENT)) {
- DefaultChannelEvent channelError = (DefaultChannelEvent) event.subject();
- DeviceId deviceId = event.subject().deviceId();
- ChannelEvent channelEvent = null;
- //If disconnection is already known we propagate it.
- if (channelError.type().equals(ChannelEvent.Type.CHANNEL_DISCONNECTED)) {
- channelEvent = new ChannelEvent(ChannelEvent.Type.CHANNEL_DISCONNECTED, channelError.deviceId(),
- channelError.throwable());
- } else if (channelError.type().equals(ChannelEvent.Type.CHANNEL_ERROR)) {
- //If we don't know what the error is we check for reachability
- if (!isReacheable(deviceId)) {
- //if false the channel has disconnected
- channelEvent = new ChannelEvent(ChannelEvent.Type.CHANNEL_DISCONNECTED, channelError.deviceId(),
- channelError.throwable());
- } else {
- // else we propagate the event.
- channelEvent = new ChannelEvent(ChannelEvent.Type.CHANNEL_ERROR, channelError.deviceId(),
- channelError.throwable());
- }
- }
- //Ignoring CHANNEL_CONNECTED
- if (channelEvent != null && channelListeners.get(deviceId) != null) {
- for (ChannelListener listener : channelListeners.get(deviceId)) {
- listener.event(channelEvent);
- }
- }
- } else {
- post(event);
+ switch (event.type()) {
+ case CHANNEL_EVENT:
+ handleChannelEvent(event);
+ break;
+ case ARBITRATION_RESPONSE:
+ handleArbitrationReply(event);
+ break;
+ default:
+ post(event);
+ break;
}
}
+ private void handleChannelEvent(P4RuntimeEvent event) {
+ final ChannelEvent channelEvent = (ChannelEvent) event.subject();
+ final DeviceId deviceId = channelEvent.deviceId();
+ final DeviceAgentEvent.Type agentEventType;
+ switch (channelEvent.type()) {
+ case OPEN:
+ agentEventType = DeviceAgentEvent.Type.CHANNEL_OPEN;
+ break;
+ case CLOSED:
+ agentEventType = DeviceAgentEvent.Type.CHANNEL_CLOSED;
+ break;
+ case ERROR:
+ agentEventType = !isReacheable(deviceId)
+ ? DeviceAgentEvent.Type.CHANNEL_CLOSED
+ : DeviceAgentEvent.Type.CHANNEL_ERROR;
+ break;
+ default:
+ log.warn("Unrecognized channel event type {}", channelEvent.type());
+ return;
+ }
+ postDeviceAgentEvent(deviceId, new DeviceAgentEvent(agentEventType, deviceId));
+ }
+
+ private void handleArbitrationReply(P4RuntimeEvent event) {
+ final DeviceId deviceId = event.subject().deviceId();
+ final ArbitrationResponse response = (ArbitrationResponse) event.subject();
+ final DeviceAgentEvent.Type roleType = response.isMaster()
+ ? DeviceAgentEvent.Type.ROLE_MASTER
+ : DeviceAgentEvent.Type.ROLE_STANDBY;
+ postDeviceAgentEvent(deviceId, new DeviceAgentEvent(
+ roleType, response.deviceId()));
+ }
+
+ private void postDeviceAgentEvent(DeviceId deviceId, DeviceAgentEvent event) {
+ if (deviceAgentListeners.containsKey(deviceId)) {
+ deviceAgentListeners.get(deviceId).forEach(l -> l.event(event));
+ }
+ }
}
diff --git a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/DefaultPacketIn.java b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/PacketInEvent.java
similarity index 89%
rename from protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/DefaultPacketIn.java
rename to protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/PacketInEvent.java
index a1cce46..dddee2b 100644
--- a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/DefaultPacketIn.java
+++ b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/PacketInEvent.java
@@ -25,14 +25,14 @@
import static com.google.common.base.Preconditions.checkNotNull;
/**
- * Default implementation of a packet-in in P4Runtime.
+ * P4Runtime packet-in.
*/
-final class DefaultPacketIn implements P4RuntimePacketIn {
+final class PacketInEvent implements P4RuntimePacketIn {
private final DeviceId deviceId;
private final PiPacketOperation operation;
- DefaultPacketIn(DeviceId deviceId, PiPacketOperation operation) {
+ PacketInEvent(DeviceId deviceId, PiPacketOperation operation) {
this.deviceId = checkNotNull(deviceId);
this.operation = checkNotNull(operation);
}
@@ -55,7 +55,7 @@
if (o == null || getClass() != o.getClass()) {
return false;
}
- DefaultPacketIn that = (DefaultPacketIn) o;
+ PacketInEvent that = (PacketInEvent) o;
return Objects.equal(deviceId, that.deviceId) &&
Objects.equal(operation, that.operation);
}
diff --git a/protocols/p4runtime/ctl/src/test/java/org/onosproject/p4runtime/ctl/P4RuntimeGroupTest.java b/protocols/p4runtime/ctl/src/test/java/org/onosproject/p4runtime/ctl/P4RuntimeGroupTest.java
index 1e83577..893049f 100644
--- a/protocols/p4runtime/ctl/src/test/java/org/onosproject/p4runtime/ctl/P4RuntimeGroupTest.java
+++ b/protocols/p4runtime/ctl/src/test/java/org/onosproject/p4runtime/ctl/P4RuntimeGroupTest.java
@@ -144,7 +144,7 @@
}
@AfterClass
- public static void globalTeerDown() {
+ public static void globalTearDown() {
grpcServer.shutdown();
grpcChannel.shutdown();
}
@@ -156,7 +156,7 @@
client = new P4RuntimeClientImpl(DEVICE_ID, P4_DEVICE_ID,
grpcChannel,
controller);
- client.p4RuntimeElectionId = DEFAULT_ELECTION_ID;
+ client.becomeMaster();
}
@Test
diff --git a/protocols/p4runtime/ctl/src/test/java/org/onosproject/p4runtime/ctl/DefaultPacketInTest.java b/protocols/p4runtime/ctl/src/test/java/org/onosproject/p4runtime/ctl/PacketInEventTest.java
similarity index 88%
rename from protocols/p4runtime/ctl/src/test/java/org/onosproject/p4runtime/ctl/DefaultPacketInTest.java
rename to protocols/p4runtime/ctl/src/test/java/org/onosproject/p4runtime/ctl/PacketInEventTest.java
index 5993142..0a1f82e 100644
--- a/protocols/p4runtime/ctl/src/test/java/org/onosproject/p4runtime/ctl/DefaultPacketInTest.java
+++ b/protocols/p4runtime/ctl/src/test/java/org/onosproject/p4runtime/ctl/PacketInEventTest.java
@@ -32,7 +32,7 @@
/**
* Test for DefaultPacketIn class.
*/
-public class DefaultPacketInTest {
+public class PacketInEventTest {
private static final int DEFAULT_ORIGINAL_VALUE = 255;
private static final int DEFAULT_BIT_WIDTH = 9;
@@ -46,10 +46,10 @@
private PiPacketOperation packetOperation2;
private PiPacketOperation nullPacketOperation = null;
- private DefaultPacketIn packetIn;
- private DefaultPacketIn sameAsPacketIn;
- private DefaultPacketIn packetIn2;
- private DefaultPacketIn packetIn3;
+ private PacketInEvent packetIn;
+ private PacketInEvent sameAsPacketIn;
+ private PacketInEvent packetIn2;
+ private PacketInEvent packetIn3;
/**
* Setup method for packetOperation and packetOperation2.
@@ -78,10 +78,10 @@
.build())
.build();
- packetIn = new DefaultPacketIn(deviceId, packetOperation);
- sameAsPacketIn = new DefaultPacketIn(sameDeviceId, packetOperation);
- packetIn2 = new DefaultPacketIn(deviceId2, packetOperation);
- packetIn3 = new DefaultPacketIn(deviceId, packetOperation2);
+ packetIn = new PacketInEvent(deviceId, packetOperation);
+ sameAsPacketIn = new PacketInEvent(sameDeviceId, packetOperation);
+ packetIn2 = new PacketInEvent(deviceId2, packetOperation);
+ packetIn3 = new PacketInEvent(deviceId, packetOperation2);
}
/**
@@ -105,7 +105,7 @@
@Test(expected = NullPointerException.class)
public void testConstructorWithNullDeviceId() {
- new DefaultPacketIn(nullDeviceId, packetOperation);
+ new PacketInEvent(nullDeviceId, packetOperation);
}
/**
@@ -114,7 +114,7 @@
@Test(expected = NullPointerException.class)
public void testConstructorWithNullPacketOperation() {
- new DefaultPacketIn(deviceId, nullPacketOperation);
+ new PacketInEvent(deviceId, nullPacketOperation);
}
/**