Improve scalability of P4Runtime subsystem

The P4Runtime client was hanging (deadlock) on a master arbitration
request. As such, all other requests (e.g. table write) were waiting
for the client's request lock to become available.

Apart from fixing those deadlocks, this patch brings a number of
improvements that all together allow to run networks of 100+ P4Runtime
devices on a single ONOS instance (before only ~20 devices)

Includes:
- Asynchrounous mastership handling in DevicHandshaker (as defined in
the P4Runtime and OpenFlow spec)
- Refactored arbitration handling in the P4RuntimeClient
to be consistent with the P4Runtime spec
- Report suspect deadlocks in P4RuntimeClientImpl
- Exploit write errors in P4RuntimeClient to quickly report
channel/mastership errors to upper layers
- Complete all futures with deadlines in P4Runtime driver
- Dump all tables in one request
- Re-purposed ChannelEvent to DeviceAgentEvent to carry also mastership
response events
- Fixed IntelliJ warnings
- Various code and log clean-ups

Change-Id: I9376793a9fe69d8eddf7e8ac2ef0ee4c14fbd198
diff --git a/protocols/p4runtime/ctl/BUCK b/protocols/p4runtime/ctl/BUCK
index cc62acb..46540b3 100644
--- a/protocols/p4runtime/ctl/BUCK
+++ b/protocols/p4runtime/ctl/BUCK
@@ -3,6 +3,7 @@
 
 COMPILE_DEPS = [
     '//lib:CORE_DEPS',
+    '//lib:KRYO',
     '//protocols/grpc/api:onos-protocols-grpc-api',
     '//protocols/p4runtime/api:onos-protocols-p4runtime-api',
     '//protocols/p4runtime/proto:onos-protocols-p4runtime-proto',
@@ -10,6 +11,7 @@
     '//lib:grpc-stub-' + GRPC_VER,
     '//lib:grpc-netty-' + GRPC_VER,
     '//lib:protobuf-java-' + PROTOBUF_VER,
+    '//core/store/serializers:onos-core-serializers',
 ]
 
 TEST_DEPS = [
diff --git a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/ActionProfileMemberEncoder.java b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/ActionProfileMemberEncoder.java
index ba31e69..4dbcac3 100644
--- a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/ActionProfileMemberEncoder.java
+++ b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/ActionProfileMemberEncoder.java
@@ -31,7 +31,7 @@
 /**
  * Encoder/Decoder of action profile member.
  */
-public final class ActionProfileMemberEncoder {
+final class ActionProfileMemberEncoder {
     private ActionProfileMemberEncoder() {
         // Hide default constructor
     }
diff --git a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/ArbitrationResponse.java b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/ArbitrationResponse.java
new file mode 100644
index 0000000..ea3c24f
--- /dev/null
+++ b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/ArbitrationResponse.java
@@ -0,0 +1,55 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.p4runtime.ctl;
+
+import org.onosproject.net.DeviceId;
+import org.onosproject.p4runtime.api.P4RuntimeEventSubject;
+
+/**
+ * Default implementation of arbitration in P4Runtime.
+ */
+class ArbitrationResponse implements P4RuntimeEventSubject {
+
+    private DeviceId deviceId;
+    private boolean isMaster;
+
+    /**
+     * Creates arbitration with given role and master flag.
+     *
+     * @param deviceId the device
+     * @param isMaster true if arbitration response signals master status
+     */
+    ArbitrationResponse(DeviceId deviceId, boolean isMaster) {
+        this.deviceId = deviceId;
+        this.isMaster = isMaster;
+    }
+
+    /**
+     * Returns true if arbitration response signals master status, false
+     * otherwise.
+     *
+     * @return boolean flag
+     */
+    boolean isMaster() {
+        return isMaster;
+    }
+
+    @Override
+    public DeviceId deviceId() {
+        return deviceId;
+    }
+}
diff --git a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/ChannelEvent.java b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/ChannelEvent.java
new file mode 100644
index 0000000..6e33514
--- /dev/null
+++ b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/ChannelEvent.java
@@ -0,0 +1,60 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.p4runtime.ctl;
+
+import org.onosproject.net.DeviceId;
+import org.onosproject.p4runtime.api.P4RuntimeEventSubject;
+
+/**
+ * Channel event in P4Runtime.
+ */
+final class ChannelEvent implements P4RuntimeEventSubject {
+
+    enum Type {
+        OPEN,
+        CLOSED,
+        ERROR
+    }
+
+    private DeviceId deviceId;
+    private Type type;
+
+    /**
+     * Creates channel event with given status and throwable.
+     *
+     * @param deviceId  the device
+     * @param type      error type
+     */
+    ChannelEvent(DeviceId deviceId, Type type) {
+        this.deviceId = deviceId;
+        this.type = type;
+    }
+
+    /**
+     * Gets the type of this event.
+     *
+     * @return the error type
+     */
+    public Type type() {
+        return type;
+    }
+
+    @Override
+    public DeviceId deviceId() {
+        return deviceId;
+    }
+}
diff --git a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/DefaultArbitration.java b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/DefaultArbitration.java
deleted file mode 100644
index e8b8658..0000000
--- a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/DefaultArbitration.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Copyright 2017-present Open Networking Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.onosproject.p4runtime.ctl;
-
-import org.onosproject.net.DeviceId;
-import org.onosproject.net.MastershipRole;
-import org.onosproject.p4runtime.api.P4RuntimeEventSubject;
-import p4.v1.P4RuntimeOuterClass.Uint128;
-
-/**
- * Default implementation of arbitration in P4Runtime.
- */
-public class DefaultArbitration implements P4RuntimeEventSubject {
-    private MastershipRole role;
-    private Uint128 electionId;
-    private DeviceId deviceId;
-
-    /**
-     * Creates arbitration with given role and election id.
-     *
-     * @param deviceId   the device
-     * @param role       the role
-     * @param electionId the election id
-     */
-     DefaultArbitration(DeviceId deviceId, MastershipRole role, Uint128 electionId) {
-        this.deviceId = deviceId;
-        this.role = role;
-        this.electionId = electionId;
-    }
-
-    /**
-     * Gets the role of this arbitration.
-     *
-     * @return the role
-     */
-    public MastershipRole role() {
-        return role;
-    }
-
-    /**
-     * Gets election id of this arbitration.
-     *
-     * @return the election id
-     */
-    public Uint128 electionId() {
-        return electionId;
-    }
-
-    @Override
-    public DeviceId deviceId() {
-        return deviceId;
-    }
-}
diff --git a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/DefaultChannelEvent.java b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/DefaultChannelEvent.java
deleted file mode 100644
index 5f5b3b9..0000000
--- a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/DefaultChannelEvent.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Copyright 2018-present Open Networking Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.onosproject.p4runtime.ctl;
-
-import org.onosproject.net.DeviceId;
-import org.onosproject.net.device.ChannelEvent.Type;
-import org.onosproject.p4runtime.api.P4RuntimeEventSubject;
-
-/**
- * Default implementation of channel event in P4Runtime. It allows passing any type of event.
- * If the event is an error a throwable can be directly passed.
- * Any other type of event cause can be passed as string.
- */
-public class DefaultChannelEvent implements P4RuntimeEventSubject {
-    private DeviceId deviceId;
-    private Type type;
-    private Throwable throwable;
-    private String message;
-
-    /**
-     * Creates channel event with given status and throwable.
-     *
-     * @param deviceId  the device
-     * @param type      error type
-     * @param throwable the cause
-     */
-    public DefaultChannelEvent(DeviceId deviceId, Type type, Throwable throwable) {
-        this.deviceId = deviceId;
-        this.type = type;
-        this.message = throwable.getMessage();
-        this.throwable = throwable;
-    }
-
-    /**
-     * Creates channel event with given status and string cause.
-     *
-     * @param deviceId the device
-     * @param type     error type
-     * @param message    the message
-     */
-    public DefaultChannelEvent(DeviceId deviceId, Type type, String message) {
-        this.deviceId = deviceId;
-        this.type = type;
-        this.message = message;
-        this.throwable = null;
-    }
-
-    /**
-     * Creates channel event with given status, cause and throwable.
-     *
-     * @param deviceId the device
-     * @param type     error type
-     * @param message the message
-     * @param throwable the cause
-     */
-    public DefaultChannelEvent(DeviceId deviceId, Type type, String message, Throwable throwable) {
-        this.deviceId = deviceId;
-        this.type = type;
-        this.message = message;
-        this.throwable = throwable;
-    }
-
-    /**
-     * Gets the type of this event.
-     *
-     * @return the error type
-     */
-    public Type type() {
-        return type;
-    }
-
-    /**
-     * Gets the message related to this event.
-     *
-     * @return the message
-     */
-    public String message() {
-        return message;
-    }
-
-
-    /**
-     * Gets throwable of this event.
-     * If no throwable is present returns null.
-     *
-     * @return the throwable
-     */
-    public Throwable throwable() {
-        return throwable;
-    }
-
-    @Override
-    public DeviceId deviceId() {
-        return deviceId;
-    }
-}
diff --git a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/DistributedElectionIdGenerator.java b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/DistributedElectionIdGenerator.java
new file mode 100644
index 0000000..ecfe35d
--- /dev/null
+++ b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/DistributedElectionIdGenerator.java
@@ -0,0 +1,85 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.p4runtime.ctl;
+
+import org.onlab.util.KryoNamespace;
+import org.onosproject.net.DeviceId;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.AtomicCounterMap;
+import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.StorageService;
+import org.slf4j.Logger;
+
+import java.math.BigInteger;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static org.slf4j.LoggerFactory.getLogger;
+
+/**
+ * Distributed implementation of a generator of P4Runtime election IDs.
+ */
+class DistributedElectionIdGenerator {
+
+    private final Logger log = getLogger(this.getClass());
+
+    private AtomicCounterMap<DeviceId> electionIds;
+
+    /**
+     * Creates a new election ID generator using the given storage service.
+     *
+     * @param storageService storage service
+     */
+    DistributedElectionIdGenerator(StorageService storageService) {
+        KryoNamespace serializer = KryoNamespace.newBuilder()
+                .register(KryoNamespaces.API)
+                .build();
+        this.electionIds = storageService.<DeviceId>atomicCounterMapBuilder()
+                .withName("p4runtime-election-ids")
+                .withSerializer(Serializer.using(serializer))
+                .build();
+    }
+
+    /**
+     * Returns an election ID for the given device ID. The first election ID for
+     * a given device ID is always 1.
+     *
+     * @param deviceId device ID
+     * @return new election ID
+     */
+    BigInteger generate(DeviceId deviceId) {
+        if (electionIds == null) {
+            return null;
+        }
+        // Default value is 0 for AtomicCounterMap.
+        return BigInteger.valueOf(electionIds.incrementAndGet(deviceId));
+    }
+
+    /**
+     * Destroy the backing distributed primitive of this generator.
+     */
+    void destroy() {
+        try {
+            electionIds.destroy().get(10, TimeUnit.SECONDS);
+        } catch (InterruptedException | ExecutionException | TimeoutException e) {
+            log.error("Exception while destroying distributed counter map", e);
+        } finally {
+            electionIds = null;
+        }
+    }
+}
diff --git a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeClientImpl.java b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeClientImpl.java
index 43d01e5..c952e61 100644
--- a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeClientImpl.java
+++ b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeClientImpl.java
@@ -19,7 +19,6 @@
 import com.google.common.collect.HashMultimap;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
 import com.google.common.collect.Multimap;
 import com.google.protobuf.ByteString;
 import com.google.protobuf.InvalidProtocolBufferException;
@@ -30,10 +29,9 @@
 import io.grpc.stub.StreamObserver;
 import org.apache.commons.lang3.tuple.ImmutablePair;
 import org.onlab.osgi.DefaultServiceDirectory;
+import org.onlab.util.SharedExecutors;
 import org.onlab.util.Tools;
 import org.onosproject.net.DeviceId;
-import org.onosproject.net.MastershipRole;
-import org.onosproject.net.device.ChannelEvent;
 import org.onosproject.net.pi.model.PiActionProfileId;
 import org.onosproject.net.pi.model.PiCounterId;
 import org.onosproject.net.pi.model.PiMeterId;
@@ -52,6 +50,8 @@
 import org.onosproject.p4runtime.api.P4RuntimeClient;
 import org.onosproject.p4runtime.api.P4RuntimeEvent;
 import org.slf4j.Logger;
+import p4.config.v1.P4InfoOuterClass.P4Info;
+import p4.tmp.P4Config;
 import p4.v1.P4RuntimeGrpc;
 import p4.v1.P4RuntimeOuterClass;
 import p4.v1.P4RuntimeOuterClass.ActionProfileGroup;
@@ -59,7 +59,6 @@
 import p4.v1.P4RuntimeOuterClass.Entity;
 import p4.v1.P4RuntimeOuterClass.ForwardingPipelineConfig;
 import p4.v1.P4RuntimeOuterClass.MasterArbitrationUpdate;
-import p4.v1.P4RuntimeOuterClass.PacketIn;
 import p4.v1.P4RuntimeOuterClass.ReadRequest;
 import p4.v1.P4RuntimeOuterClass.ReadResponse;
 import p4.v1.P4RuntimeOuterClass.SetForwardingPipelineConfigRequest;
@@ -69,9 +68,8 @@
 import p4.v1.P4RuntimeOuterClass.Uint128;
 import p4.v1.P4RuntimeOuterClass.Update;
 import p4.v1.P4RuntimeOuterClass.WriteRequest;
-import p4.config.v1.P4InfoOuterClass.P4Info;
-import p4.tmp.P4Config;
 
+import java.math.BigInteger;
 import java.nio.ByteBuffer;
 import java.util.Collection;
 import java.util.Collections;
@@ -81,7 +79,6 @@
 import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -99,13 +96,17 @@
 import static p4.v1.P4RuntimeOuterClass.Entity.EntityCase.ACTION_PROFILE_GROUP;
 import static p4.v1.P4RuntimeOuterClass.Entity.EntityCase.ACTION_PROFILE_MEMBER;
 import static p4.v1.P4RuntimeOuterClass.Entity.EntityCase.TABLE_ENTRY;
+import static p4.v1.P4RuntimeOuterClass.PacketIn;
 import static p4.v1.P4RuntimeOuterClass.PacketOut;
 import static p4.v1.P4RuntimeOuterClass.SetForwardingPipelineConfigRequest.Action.VERIFY_AND_COMMIT;
 
 /**
  * Implementation of a P4Runtime client.
  */
-public final class P4RuntimeClientImpl implements P4RuntimeClient {
+final class P4RuntimeClientImpl implements P4RuntimeClient {
+
+    // Timeout in seconds to obtain the client lock.
+    private static final int LOCK_TIMEOUT = 10;
 
     private static final Map<WriteOperationType, Update.Type> UPDATE_TYPES = ImmutableMap.of(
             WriteOperationType.UNSPECIFIED, Update.Type.UNSPECIFIED,
@@ -116,18 +117,20 @@
 
     private final Logger log = getLogger(getClass());
 
+    private final Lock requestLock = new ReentrantLock();
+    private final Context.CancellableContext cancellableContext =
+            Context.current().withCancellation();
+
     private final DeviceId deviceId;
     private final long p4DeviceId;
     private final P4RuntimeControllerImpl controller;
     private final P4RuntimeGrpc.P4RuntimeBlockingStub blockingStub;
-    private final Context.CancellableContext cancellableContext;
     private final ExecutorService executorService;
     private final Executor contextExecutor;
-    private final Lock writeLock = new ReentrantLock();
     private final StreamObserver<StreamMessageRequest> streamRequestObserver;
 
-    private Map<Uint128, CompletableFuture<Boolean>> arbitrationUpdateMap = Maps.newConcurrentMap();
-    protected Uint128 p4RuntimeElectionId;
+    // Used by this client for write requests.
+    private Uint128 clientElectionId = Uint128.newBuilder().setLow(1).build();
 
     /**
      * Default constructor.
@@ -142,45 +145,77 @@
         this.deviceId = deviceId;
         this.p4DeviceId = p4DeviceId;
         this.controller = controller;
-        this.cancellableContext = Context.current().withCancellation();
         this.executorService = Executors.newFixedThreadPool(15, groupedThreads(
-                "onos/p4runtime-client-" + deviceId.toString(),
-                deviceId.toString() + "-%d"));
+                "onos-p4runtime-client-" + deviceId.toString(), "%d"));
         this.contextExecutor = this.cancellableContext.fixedContextExecutor(executorService);
-        //TODO Investigate deadline or timeout in supplyInContext Method
+        //TODO Investigate use of stub deadlines instead of timeout in supplyInContext
         this.blockingStub = P4RuntimeGrpc.newBlockingStub(channel);
-        P4RuntimeGrpc.P4RuntimeStub asyncStub = P4RuntimeGrpc.newStub(channel);
-        this.streamRequestObserver = asyncStub.streamChannel(new StreamChannelResponseObserver());
+        this.streamRequestObserver = P4RuntimeGrpc.newStub(channel)
+                .streamChannel(new StreamChannelResponseObserver());
     }
 
     /**
-     * Executes the given task (supplier) in the gRPC context executor of this client, such that if the context is
-     * cancelled (e.g. client shutdown) the RPC is automatically cancelled.
-     * <p>
-     * Important: Tasks submitted in parallel by different threads are forced executed sequentially.
-     * <p>
+     * Submits a task for async execution via the given executor.
+     * All tasks submitted with this method will be executed sequentially.
      */
-    private <U> CompletableFuture<U> supplyInContext(Supplier<U> supplier, String opDescription) {
+    private <U> CompletableFuture<U> supplyWithExecutor(
+            Supplier<U> supplier, String opDescription, Executor executor) {
         return CompletableFuture.supplyAsync(() -> {
             // TODO: explore a more relaxed locking strategy.
-            writeLock.lock();
+            try {
+                if (!requestLock.tryLock(LOCK_TIMEOUT, TimeUnit.SECONDS)) {
+                    log.error("LOCK TIMEOUT! This is likely a deadlock, "
+                                      + "please debug (executing {})",
+                              opDescription);
+                    throw new IllegalThreadStateException("Lock timeout");
+                }
+            } catch (InterruptedException e) {
+                log.warn("Thread interrupted while waiting for lock (executing {})",
+                         opDescription);
+                throw new RuntimeException(e);
+            }
             try {
                 return supplier.get();
             } catch (StatusRuntimeException ex) {
-                log.warn("Unable to execute {} on {}: {}", opDescription, deviceId, ex.toString());
+                log.warn("Unable to execute {} on {}: {}",
+                         opDescription, deviceId, ex.toString());
                 throw ex;
             } catch (Throwable ex) {
-                log.error("Exception in client of {}, executing {}", deviceId, opDescription, ex);
+                log.error("Exception in client of {}, executing {}",
+                          deviceId, opDescription, ex);
                 throw ex;
             } finally {
-                writeLock.unlock();
+                requestLock.unlock();
             }
-        }, contextExecutor);
+        }, executor);
+    }
+
+    /**
+     * Equivalent of supplyWithExecutor using the gRPC context executor of this
+     * client, such that if the context is cancelled (e.g. client shutdown) the
+     * RPC is automatically cancelled.
+     */
+    private <U> CompletableFuture<U> supplyInContext(
+            Supplier<U> supplier, String opDescription) {
+        return supplyWithExecutor(supplier, opDescription, contextExecutor);
     }
 
     @Override
-    public CompletableFuture<Boolean> initStreamChannel() {
-        return supplyInContext(this::doInitStreamChannel, "initStreamChannel");
+    public CompletableFuture<Boolean> start() {
+        return supplyInContext(this::doInitStreamChannel,
+                               "start-initStreamChannel");
+    }
+
+    @Override
+    public CompletableFuture<Void> shutdown() {
+        return supplyWithExecutor(this::doShutdown, "shutdown",
+                                  SharedExecutors.getPoolThreadExecutor());
+    }
+
+    @Override
+    public CompletableFuture<Boolean> becomeMaster() {
+        return supplyInContext(this::doBecomeMaster,
+                               "becomeMaster");
     }
 
     @Override
@@ -201,6 +236,11 @@
     }
 
     @Override
+    public CompletableFuture<Collection<PiTableEntry>> dumpAllTables(PiPipeconf pipeconf) {
+        return supplyInContext(() -> doDumpTable(null, pipeconf), "dumpAllTables");
+    }
+
+    @Override
     public CompletableFuture<Boolean> packetOut(PiPacketOperation packet, PiPipeconf pipeconf) {
         return supplyInContext(() -> doPacketOut(packet, pipeconf), "packetOut");
     }
@@ -245,11 +285,6 @@
     }
 
     @Override
-    public CompletableFuture<Boolean> sendMasterArbitrationUpdate() {
-        return supplyInContext(this::doArbitrationUpdate, "arbitrationUpdate");
-    }
-
-    @Override
     public CompletableFuture<Boolean> writeMeterCells(Collection<PiMeterCellConfig> cellIds, PiPipeconf pipeconf) {
 
         return supplyInContext(() -> doWriteMeterCells(cellIds, pipeconf),
@@ -272,42 +307,48 @@
 
     /* Blocking method implementations below */
 
-    private boolean doArbitrationUpdate() {
+    private boolean doBecomeMaster() {
+        final Uint128 newId = bigIntegerToUint128(
+                controller.newMasterElectionId(deviceId));
+        if (sendMasterArbitrationUpdate(newId)) {
+            clientElectionId = newId;
+            return true;
+        }
+        return false;
+    }
 
-        CompletableFuture<Boolean> result = new CompletableFuture<>();
-        // TODO: currently we use 64-bit Long type for election id, should
-        // we use 128-bit ?
-        long nextElectId = controller.getNewMasterElectionId();
-        Uint128 newElectionId = Uint128.newBuilder()
-                .setLow(nextElectId)
-                .build();
-        MasterArbitrationUpdate arbitrationUpdate = MasterArbitrationUpdate.newBuilder()
-                .setDeviceId(p4DeviceId)
-                .setElectionId(newElectionId)
-                .build();
-        StreamMessageRequest requestMsg = StreamMessageRequest.newBuilder()
-                .setArbitration(arbitrationUpdate)
-                .build();
-        log.debug("Sending arbitration update to {} with election id {}...",
-                  deviceId, newElectionId);
-        arbitrationUpdateMap.put(newElectionId, result);
+    private boolean sendMasterArbitrationUpdate(Uint128 electionId) {
+        log.info("Sending arbitration update to {}... electionId={}",
+                  deviceId, uint128ToBigInteger(electionId));
         try {
-            streamRequestObserver.onNext(requestMsg);
-            return result.get();
+            streamRequestObserver.onNext(
+                    StreamMessageRequest.newBuilder()
+                            .setArbitration(
+                                    MasterArbitrationUpdate
+                                            .newBuilder()
+                                            .setDeviceId(p4DeviceId)
+                                            .setElectionId(electionId)
+                                            .build())
+                            .build());
+            return true;
         } catch (StatusRuntimeException e) {
             log.error("Unable to perform arbitration update on {}: {}", deviceId, e.getMessage());
-            arbitrationUpdateMap.remove(newElectionId);
-            return false;
-        } catch (InterruptedException | ExecutionException e) {
-            log.warn("Arbitration update failed for {} due to {}", deviceId, e);
-            arbitrationUpdateMap.remove(newElectionId);
-            return false;
         }
+        return false;
     }
+
     private boolean doInitStreamChannel() {
         // To listen for packets and other events, we need to start the RPC.
-        // Here we do it by sending a master arbitration update.
-        return doArbitrationUpdate();
+        // Here we send an empty StreamMessageRequest.
+        try {
+            log.info("Starting stream channel with {}...", deviceId);
+            streamRequestObserver.onNext(StreamMessageRequest.newBuilder().build());
+            return true;
+        } catch (StatusRuntimeException e) {
+            log.error("Unable to start stream channel with {}: {}",
+                      deviceId, e.getMessage());
+            return false;
+        }
     }
 
     private boolean doSetPipelineConfig(PiPipeconf pipeconf, ByteBuffer deviceData) {
@@ -338,7 +379,7 @@
         SetForwardingPipelineConfigRequest request = SetForwardingPipelineConfigRequest
                 .newBuilder()
                 .setDeviceId(p4DeviceId)
-                .setElectionId(p4RuntimeElectionId)
+                .setElectionId(clientElectionId)
                 .setAction(VERIFY_AND_COMMIT)
                 .setConfig(pipelineConfig)
                 .build();
@@ -380,7 +421,7 @@
 
         writeRequestBuilder
                 .setDeviceId(p4DeviceId)
-                .setElectionId(p4RuntimeElectionId)
+                .setElectionId(clientElectionId)
                 .addAllUpdates(updateMsgs)
                 .build();
 
@@ -388,7 +429,7 @@
             blockingStub.write(writeRequestBuilder.build());
             return true;
         } catch (StatusRuntimeException e) {
-            logWriteErrors(piTableEntries, e, opType, "table entry");
+            checkAndLogWriteErrors(piTableEntries, e, opType, "table entry");
             return false;
         }
     }
@@ -397,13 +438,18 @@
 
         log.debug("Dumping table {} from {} (pipeconf {})...", piTableId, deviceId, pipeconf.id());
 
-        P4InfoBrowser browser = PipeconfHelper.getP4InfoBrowser(pipeconf);
         int tableId;
-        try {
-            tableId = browser.tables().getByName(piTableId.id()).getPreamble().getId();
-        } catch (P4InfoBrowser.NotFoundException e) {
-            log.warn("Unable to dump table: {}", e.getMessage());
-            return Collections.emptyList();
+        if (piTableId == null) {
+            // Dump all tables.
+            tableId = 0;
+        } else {
+            P4InfoBrowser browser = PipeconfHelper.getP4InfoBrowser(pipeconf);
+            try {
+                tableId = browser.tables().getByName(piTableId.id()).getPreamble().getId();
+            } catch (P4InfoBrowser.NotFoundException e) {
+                log.warn("Unable to dump table: {}", e.getMessage());
+                return Collections.emptyList();
+            }
         }
 
         ReadRequest requestMsg = ReadRequest.newBuilder()
@@ -474,40 +520,29 @@
         }
         // Decode packet message and post event.
         PiPacketOperation packetOperation = PacketIOCodec.decodePacketIn(packetInMsg, pipeconf, deviceId);
-        DefaultPacketIn packetInEventSubject = new DefaultPacketIn(deviceId, packetOperation);
+        PacketInEvent packetInEventSubject = new PacketInEvent(deviceId, packetOperation);
         P4RuntimeEvent event = new P4RuntimeEvent(P4RuntimeEvent.Type.PACKET_IN, packetInEventSubject);
         log.debug("Received packet in: {}", event);
         controller.postEvent(event);
     }
 
-    private void doArbitrationUpdateFromDevice(MasterArbitrationUpdate arbitrationMsg) {
-        log.debug("Received arbitration update from {}: {}", deviceId, arbitrationMsg);
-
-        Uint128 electionId = arbitrationMsg.getElectionId();
-        CompletableFuture<Boolean> mastershipFeature = arbitrationUpdateMap.remove(electionId);
-
-        if (mastershipFeature == null) {
-            log.warn("Can't find completable future of election id {}", electionId);
+    private void doArbitrationResponse(MasterArbitrationUpdate msg) {
+        // From the spec...
+        // - Election_id: The stream RPC with the highest election_id is the
+        // master. Switch populates with the highest election ID it
+        // has received from all connected controllers.
+        // - Status: Switch populates this with OK for the client that is the
+        // master, and with an error status for all other connected clients (at
+        // every mastership change).
+        if (!msg.hasElectionId() || !msg.hasStatus()) {
             return;
         }
-
-        this.p4RuntimeElectionId = electionId;
-        int statusCode = arbitrationMsg.getStatus().getCode();
-        MastershipRole arbitrationRole;
-        // arbitration update success
-
-        if (statusCode == Status.OK.getCode().value()) {
-            mastershipFeature.complete(true);
-            arbitrationRole = MastershipRole.MASTER;
-        } else {
-            mastershipFeature.complete(false);
-            arbitrationRole = MastershipRole.STANDBY;
-        }
-
-        DefaultArbitration arbitrationEventSubject = new DefaultArbitration(deviceId, arbitrationRole, electionId);
-        P4RuntimeEvent event = new P4RuntimeEvent(P4RuntimeEvent.Type.ARBITRATION,
-                                                  arbitrationEventSubject);
-        controller.postEvent(event);
+        final boolean isMaster = msg.getStatus().getCode() == Status.OK.getCode().value();
+        log.info("Received arbitration update from {}: isMaster={}, electionId={}",
+                 deviceId, isMaster, uint128ToBigInteger(msg.getElectionId()));
+        controller.postEvent(new P4RuntimeEvent(
+                P4RuntimeEvent.Type.ARBITRATION_RESPONSE,
+                new ArbitrationResponse(deviceId, isMaster)));
     }
 
     private Collection<PiCounterCellData> doReadAllCounterCells(
@@ -583,14 +618,14 @@
 
         WriteRequest writeRequestMsg = WriteRequest.newBuilder()
                 .setDeviceId(p4DeviceId)
-                .setElectionId(p4RuntimeElectionId)
+                .setElectionId(clientElectionId)
                 .addAllUpdates(updateMsgs)
                 .build();
         try {
             blockingStub.write(writeRequestMsg);
             return true;
         } catch (StatusRuntimeException e) {
-            logWriteErrors(members, e, opType, "group member");
+            checkAndLogWriteErrors(members, e, opType, "group member");
             return false;
         }
     }
@@ -729,7 +764,7 @@
 
         final WriteRequest writeRequestMsg = WriteRequest.newBuilder()
                 .setDeviceId(p4DeviceId)
-                .setElectionId(p4RuntimeElectionId)
+                .setElectionId(clientElectionId)
                 .addUpdates(Update.newBuilder()
                                     .setEntity(Entity.newBuilder()
                                                        .setActionProfileGroup(actionProfileGroup)
@@ -741,7 +776,7 @@
             blockingStub.write(writeRequestMsg);
             return true;
         } catch (StatusRuntimeException e) {
-            logWriteErrors(Collections.singleton(group), e, opType, "group");
+            checkAndLogWriteErrors(Collections.singleton(group), e, opType, "group");
             return false;
         }
     }
@@ -814,7 +849,7 @@
 
         writeRequestBuilder
                 .setDeviceId(p4DeviceId)
-                .setElectionId(p4RuntimeElectionId)
+                .setElectionId(clientElectionId)
                 .addAllUpdates(updateMsgs)
                 .build();
         try {
@@ -827,53 +862,34 @@
         }
     }
 
-    /**
-     * Returns the internal P4 device ID associated with this client.
-     *
-     * @return P4 device ID
-     */
-    public long p4DeviceId() {
-        return p4DeviceId;
-    }
-
-    /**
-     * For testing purpose only. TODO: remove before release.
-     *
-     * @return blocking stub
-     */
-    public P4RuntimeGrpc.P4RuntimeBlockingStub blockingStub() {
-        return this.blockingStub;
-    }
-
-
-    @Override
-    public void shutdown() {
-
+    private Void doShutdown() {
         log.info("Shutting down client for {}...", deviceId);
-
-        writeLock.lock();
-        try {
-            if (streamRequestObserver != null) {
-                streamRequestObserver.onCompleted();
-                cancellableContext.cancel(new InterruptedException("Requested client shutdown"));
-            }
-
-            this.executorService.shutdown();
+        if (streamRequestObserver != null) {
             try {
-                executorService.awaitTermination(5, TimeUnit.SECONDS);
-            } catch (InterruptedException e) {
-                log.warn("Executor service didn't shutdown in time.");
-                Thread.currentThread().interrupt();
+                streamRequestObserver.onCompleted();
+            } catch (IllegalStateException e) {
+                // Thrown if stream channel is already completed. Can ignore.
+                log.debug("Ignored expection: {}", e);
             }
-        } finally {
-            writeLock.unlock();
+            cancellableContext.cancel(new InterruptedException(
+                    "Requested client shutdown"));
         }
+        this.executorService.shutdown();
+        try {
+            executorService.awaitTermination(5, TimeUnit.SECONDS);
+        } catch (InterruptedException e) {
+            log.warn("Executor service didn't shutdown in time.");
+            Thread.currentThread().interrupt();
+        }
+        return null;
     }
 
-    private <E extends PiEntity> void logWriteErrors(Collection<E> writeEntities,
-                                                     StatusRuntimeException ex,
-                                                     WriteOperationType opType,
-                                                     String entryType) {
+    private <E extends PiEntity> void checkAndLogWriteErrors(
+            Collection<E> writeEntities, StatusRuntimeException ex,
+            WriteOperationType opType, String entryType) {
+
+        checkGrpcException(ex);
+
         List<P4RuntimeOuterClass.Error> errors = null;
         String description = null;
         try {
@@ -946,10 +962,80 @@
                       err.hasDetails() ? "\n" + err.getDetails().toString() : "");
     }
 
+    private void checkGrpcException(StatusRuntimeException ex) {
+        switch (ex.getStatus().getCode()) {
+            case OK:
+                break;
+            case CANCELLED:
+                break;
+            case UNKNOWN:
+                break;
+            case INVALID_ARGUMENT:
+                break;
+            case DEADLINE_EXCEEDED:
+                break;
+            case NOT_FOUND:
+                break;
+            case ALREADY_EXISTS:
+                break;
+            case PERMISSION_DENIED:
+                // Notify upper layers that this node is not master.
+                controller.postEvent(new P4RuntimeEvent(
+                        P4RuntimeEvent.Type.ARBITRATION_RESPONSE,
+                        new ArbitrationResponse(deviceId, false)));
+                break;
+            case RESOURCE_EXHAUSTED:
+                break;
+            case FAILED_PRECONDITION:
+                break;
+            case ABORTED:
+                break;
+            case OUT_OF_RANGE:
+                break;
+            case UNIMPLEMENTED:
+                break;
+            case INTERNAL:
+                break;
+            case UNAVAILABLE:
+                // Channel might be closed.
+                controller.postEvent(new P4RuntimeEvent(
+                        P4RuntimeEvent.Type.CHANNEL_EVENT,
+                        new ChannelEvent(deviceId, ChannelEvent.Type.ERROR)));
+                break;
+            case DATA_LOSS:
+                break;
+            case UNAUTHENTICATED:
+                break;
+            default:
+                break;
+        }
+    }
+
+    private Uint128 bigIntegerToUint128(BigInteger value) {
+        final byte[] arr = value.toByteArray();
+        final ByteBuffer bb = ByteBuffer.allocate(Long.BYTES * 2)
+                .put(new byte[Long.BYTES * 2 - arr.length])
+                .put(arr);
+        bb.rewind();
+        return Uint128.newBuilder()
+                .setHigh(bb.getLong())
+                .setLow(bb.getLong())
+                .build();
+    }
+
+    private BigInteger uint128ToBigInteger(Uint128 value) {
+        return new BigInteger(
+                ByteBuffer.allocate(Long.BYTES * 2)
+                        .putLong(value.getHigh())
+                        .putLong(value.getLow())
+                        .array());
+    }
+
     /**
      * Handles messages received from the device on the stream channel.
      */
-    private class StreamChannelResponseObserver implements StreamObserver<StreamMessageResponse> {
+    private class StreamChannelResponseObserver
+            implements StreamObserver<StreamMessageResponse> {
 
         @Override
         public void onNext(StreamMessageResponse message) {
@@ -958,40 +1044,40 @@
 
         private void doNext(StreamMessageResponse message) {
             try {
-                log.debug("Received message on stream channel from {}: {}", deviceId, message.getUpdateCase());
+                log.debug("Received message on stream channel from {}: {}",
+                          deviceId, message.getUpdateCase());
                 switch (message.getUpdateCase()) {
                     case PACKET:
-                        // Packet-in
                         doPacketIn(message.getPacket());
                         return;
                     case ARBITRATION:
-                        doArbitrationUpdateFromDevice(message.getArbitration());
+                        doArbitrationResponse(message.getArbitration());
                         return;
                     default:
-                        log.warn("Unrecognized stream message from {}: {}", deviceId, message.getUpdateCase());
+                        log.warn("Unrecognized stream message from {}: {}",
+                                 deviceId, message.getUpdateCase());
                 }
             } catch (Throwable ex) {
-                log.error("Exception while processing stream channel message from {}", deviceId, ex);
+                log.error("Exception while processing stream message from {}",
+                          deviceId, ex);
             }
         }
 
         @Override
         public void onError(Throwable throwable) {
-            log.warn("Error on stream channel for {}: {}", deviceId, Status.fromThrowable(throwable));
-            controller.postEvent(new P4RuntimeEvent(P4RuntimeEvent.Type.CHANNEL_EVENT,
-                    new DefaultChannelEvent(deviceId, ChannelEvent.Type.CHANNEL_ERROR,
-                            throwable)));
-            // FIXME: we might want to recreate the channel.
-            // In general, we want to be robust against any transient error and, if the channel is open, make sure the
-            // stream channel is always on.
+            log.warn("Error on stream channel for {}: {}",
+                     deviceId, Status.fromThrowable(throwable));
+            controller.postEvent(new P4RuntimeEvent(
+                    P4RuntimeEvent.Type.CHANNEL_EVENT,
+                    new ChannelEvent(deviceId, ChannelEvent.Type.ERROR)));
         }
 
         @Override
         public void onCompleted() {
             log.warn("Stream channel for {} has completed", deviceId);
-            controller.postEvent(new P4RuntimeEvent(P4RuntimeEvent.Type.CHANNEL_EVENT,
-                    new DefaultChannelEvent(deviceId, ChannelEvent.Type.CHANNEL_DISCONNECTED,
-                            "Stream channel has completed")));
+            controller.postEvent(new P4RuntimeEvent(
+                    P4RuntimeEvent.Type.CHANNEL_EVENT,
+                    new ChannelEvent(deviceId, ChannelEvent.Type.CLOSED)));
         }
     }
 }
diff --git a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeControllerImpl.java b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeControllerImpl.java
index 987356b..d2773b2 100644
--- a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeControllerImpl.java
+++ b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeControllerImpl.java
@@ -35,20 +35,21 @@
 import org.onosproject.grpc.api.GrpcChannelId;
 import org.onosproject.grpc.api.GrpcController;
 import org.onosproject.net.DeviceId;
-import org.onosproject.net.device.ChannelEvent;
-import org.onosproject.net.device.ChannelListener;
+import org.onosproject.net.device.DeviceAgentEvent;
+import org.onosproject.net.device.DeviceAgentListener;
 import org.onosproject.p4runtime.api.P4RuntimeClient;
 import org.onosproject.p4runtime.api.P4RuntimeController;
 import org.onosproject.p4runtime.api.P4RuntimeEvent;
 import org.onosproject.p4runtime.api.P4RuntimeEventListener;
-import org.onosproject.store.service.AtomicCounter;
 import org.onosproject.store.service.StorageService;
 import org.slf4j.Logger;
 
 import java.io.IOException;
-import java.util.ArrayList;
+import java.math.BigInteger;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -65,14 +66,13 @@
         extends AbstractListenerManager<P4RuntimeEvent, P4RuntimeEventListener>
         implements P4RuntimeController {
 
-    private static final String P4R_ELECTION = "p4runtime-election";
     private static final int DEVICE_LOCK_EXPIRE_TIME_IN_MIN = 10;
     private final Logger log = getLogger(getClass());
     private final NameResolverProvider nameResolverProvider = new DnsNameResolverProvider();
     private final Map<DeviceId, ClientKey> deviceIdToClientKey = Maps.newHashMap();
     private final Map<ClientKey, P4RuntimeClient> clientKeyToClient = Maps.newHashMap();
     private final Map<DeviceId, GrpcChannelId> channelIds = Maps.newHashMap();
-    private final Map<DeviceId, List<ChannelListener>> channelListeners = Maps.newConcurrentMap();
+    private final ConcurrentMap<DeviceId, List<DeviceAgentListener>> deviceAgentListeners = Maps.newConcurrentMap();
     private final LoadingCache<DeviceId, ReadWriteLock> deviceLocks = CacheBuilder.newBuilder()
             .expireAfterAccess(DEVICE_LOCK_EXPIRE_TIME_IN_MIN, TimeUnit.MINUTES)
             .build(new CacheLoader<DeviceId, ReadWriteLock>() {
@@ -81,8 +81,7 @@
                     return new ReentrantReadWriteLock();
                 }
             });
-
-    private AtomicCounter electionIdGenerator;
+    private DistributedElectionIdGenerator electionIdGenerator;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     private GrpcController grpcController;
@@ -93,8 +92,7 @@
     @Activate
     public void activate() {
         eventDispatcher.addSink(P4RuntimeEvent.class, listenerRegistry);
-        electionIdGenerator = storageService.getAtomicCounter(P4R_ELECTION);
-
+        electionIdGenerator = new DistributedElectionIdGenerator(storageService);
         log.info("Started");
     }
 
@@ -102,6 +100,8 @@
     @Deactivate
     public void deactivate() {
         grpcController = null;
+        electionIdGenerator.destroy();
+        electionIdGenerator = null;
         eventDispatcher.removeSink(P4RuntimeEvent.class);
         log.info("Stopped");
     }
@@ -119,13 +119,13 @@
                 .usePlaintext(true);
 
         deviceLocks.getUnchecked(deviceId).writeLock().lock();
-        log.info("Creating client for {} (server={}:{}, p4DeviceId={})...",
-                 deviceId, serverAddr, serverPort, p4DeviceId);
 
         try {
             if (deviceIdToClientKey.containsKey(deviceId)) {
                 final ClientKey existingKey = deviceIdToClientKey.get(deviceId);
                 if (newKey.equals(existingKey)) {
+                    log.info("Not creating client for {} as it already exists (server={}:{}, p4DeviceId={})...",
+                             deviceId, serverAddr, serverPort, p4DeviceId);
                     return true;
                 } else {
                     throw new IllegalStateException(
@@ -133,6 +133,8 @@
                                     "server endpoints already exists");
                 }
             } else {
+                log.info("Creating client for {} (server={}:{}, p4DeviceId={})...",
+                         deviceId, serverAddr, serverPort, p4DeviceId);
                 return doCreateClient(newKey, channelBuilder);
             }
         } finally {
@@ -187,12 +189,11 @@
     public void removeClient(DeviceId deviceId) {
 
         deviceLocks.getUnchecked(deviceId).writeLock().lock();
-
         try {
             if (deviceIdToClientKey.containsKey(deviceId)) {
                 final ClientKey clientKey = deviceIdToClientKey.get(deviceId);
-                grpcController.disconnectChannel(channelIds.get(deviceId));
                 clientKeyToClient.remove(clientKey).shutdown();
+                grpcController.disconnectChannel(channelIds.get(deviceId));
                 deviceIdToClientKey.remove(deviceId);
                 channelIds.remove(deviceId);
             }
@@ -222,7 +223,7 @@
                 log.debug("No client for {}, can't check for reachability", deviceId);
                 return false;
             }
-
+            // FIXME: we're not checking for a P4Runtime server, it could be any gRPC service
             return grpcController.isChannelOpen(channelIds.get(deviceId));
         } finally {
             deviceLocks.getUnchecked(deviceId).readLock().unlock();
@@ -230,67 +231,73 @@
     }
 
     @Override
-    public long getNewMasterElectionId() {
-        return electionIdGenerator.incrementAndGet();
+    public void addDeviceAgentListener(DeviceId deviceId, DeviceAgentListener listener) {
+        deviceAgentListeners.putIfAbsent(deviceId, new CopyOnWriteArrayList<>());
+        deviceAgentListeners.get(deviceId).add(listener);
     }
 
     @Override
-    public void addChannelListener(DeviceId deviceId, ChannelListener listener) {
-        channelListeners.compute(deviceId, (devId, listeners) -> {
-            List<ChannelListener> newListeners;
-            if (listeners != null) {
-                newListeners = listeners;
-            } else {
-                newListeners = new ArrayList<>();
-            }
-            newListeners.add(listener);
-            return newListeners;
+    public void removeDeviceAgentListener(DeviceId deviceId, DeviceAgentListener listener) {
+        deviceAgentListeners.computeIfPresent(deviceId, (did, listeners) -> {
+            listeners.remove(listener);
+            return listeners;
         });
     }
 
-    @Override
-    public void removeChannelListener(DeviceId deviceId, ChannelListener listener) {
-        channelListeners.compute(deviceId, (devId, listeners) -> {
-            if (listeners != null) {
-                listeners.remove(listener);
-                return listeners;
-            } else {
-                log.debug("Device {} has no listener registered", deviceId);
-                return null;
-            }
-        });
+    BigInteger newMasterElectionId(DeviceId deviceId) {
+        return electionIdGenerator.generate(deviceId);
     }
 
     void postEvent(P4RuntimeEvent event) {
-        if (event.type().equals(P4RuntimeEvent.Type.CHANNEL_EVENT)) {
-            DefaultChannelEvent channelError = (DefaultChannelEvent) event.subject();
-            DeviceId deviceId = event.subject().deviceId();
-            ChannelEvent channelEvent = null;
-            //If disconnection is already known we propagate it.
-            if (channelError.type().equals(ChannelEvent.Type.CHANNEL_DISCONNECTED)) {
-                channelEvent = new ChannelEvent(ChannelEvent.Type.CHANNEL_DISCONNECTED, channelError.deviceId(),
-                        channelError.throwable());
-            } else if (channelError.type().equals(ChannelEvent.Type.CHANNEL_ERROR)) {
-                //If we don't know what the error is we check for reachability
-                if (!isReacheable(deviceId)) {
-                    //if false the channel has disconnected
-                    channelEvent = new ChannelEvent(ChannelEvent.Type.CHANNEL_DISCONNECTED, channelError.deviceId(),
-                            channelError.throwable());
-                } else {
-                    // else we propagate the event.
-                    channelEvent = new ChannelEvent(ChannelEvent.Type.CHANNEL_ERROR, channelError.deviceId(),
-                            channelError.throwable());
-                }
-            }
-            //Ignoring CHANNEL_CONNECTED
-            if (channelEvent != null && channelListeners.get(deviceId) != null) {
-                for (ChannelListener listener : channelListeners.get(deviceId)) {
-                    listener.event(channelEvent);
-                }
-            }
-        } else {
-            post(event);
+        switch (event.type()) {
+            case CHANNEL_EVENT:
+                handleChannelEvent(event);
+                break;
+            case ARBITRATION_RESPONSE:
+                handleArbitrationReply(event);
+                break;
+            default:
+                post(event);
+                break;
         }
     }
 
+    private void handleChannelEvent(P4RuntimeEvent event) {
+        final ChannelEvent channelEvent = (ChannelEvent) event.subject();
+        final DeviceId deviceId = channelEvent.deviceId();
+        final DeviceAgentEvent.Type agentEventType;
+        switch (channelEvent.type()) {
+            case OPEN:
+                agentEventType = DeviceAgentEvent.Type.CHANNEL_OPEN;
+                break;
+            case CLOSED:
+                agentEventType = DeviceAgentEvent.Type.CHANNEL_CLOSED;
+                break;
+            case ERROR:
+                agentEventType = !isReacheable(deviceId)
+                        ? DeviceAgentEvent.Type.CHANNEL_CLOSED
+                        : DeviceAgentEvent.Type.CHANNEL_ERROR;
+                break;
+            default:
+                log.warn("Unrecognized channel event type {}", channelEvent.type());
+                return;
+        }
+        postDeviceAgentEvent(deviceId, new DeviceAgentEvent(agentEventType, deviceId));
+    }
+
+    private void handleArbitrationReply(P4RuntimeEvent event) {
+        final DeviceId deviceId = event.subject().deviceId();
+        final ArbitrationResponse response = (ArbitrationResponse) event.subject();
+        final DeviceAgentEvent.Type roleType = response.isMaster()
+                ? DeviceAgentEvent.Type.ROLE_MASTER
+                : DeviceAgentEvent.Type.ROLE_STANDBY;
+        postDeviceAgentEvent(deviceId, new DeviceAgentEvent(
+                roleType, response.deviceId()));
+    }
+
+    private void postDeviceAgentEvent(DeviceId deviceId, DeviceAgentEvent event) {
+        if (deviceAgentListeners.containsKey(deviceId)) {
+            deviceAgentListeners.get(deviceId).forEach(l -> l.event(event));
+        }
+    }
 }
diff --git a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/DefaultPacketIn.java b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/PacketInEvent.java
similarity index 89%
rename from protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/DefaultPacketIn.java
rename to protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/PacketInEvent.java
index a1cce46..dddee2b 100644
--- a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/DefaultPacketIn.java
+++ b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/PacketInEvent.java
@@ -25,14 +25,14 @@
 import static com.google.common.base.Preconditions.checkNotNull;
 
 /**
- * Default implementation of a packet-in in P4Runtime.
+ * P4Runtime packet-in.
  */
-final class DefaultPacketIn implements P4RuntimePacketIn {
+final class PacketInEvent implements P4RuntimePacketIn {
 
     private final DeviceId deviceId;
     private final PiPacketOperation operation;
 
-    DefaultPacketIn(DeviceId deviceId, PiPacketOperation operation) {
+    PacketInEvent(DeviceId deviceId, PiPacketOperation operation) {
         this.deviceId = checkNotNull(deviceId);
         this.operation = checkNotNull(operation);
     }
@@ -55,7 +55,7 @@
         if (o == null || getClass() != o.getClass()) {
             return false;
         }
-        DefaultPacketIn that = (DefaultPacketIn) o;
+        PacketInEvent that = (PacketInEvent) o;
         return Objects.equal(deviceId, that.deviceId) &&
                 Objects.equal(operation, that.operation);
     }
diff --git a/protocols/p4runtime/ctl/src/test/java/org/onosproject/p4runtime/ctl/P4RuntimeGroupTest.java b/protocols/p4runtime/ctl/src/test/java/org/onosproject/p4runtime/ctl/P4RuntimeGroupTest.java
index 1e83577..893049f 100644
--- a/protocols/p4runtime/ctl/src/test/java/org/onosproject/p4runtime/ctl/P4RuntimeGroupTest.java
+++ b/protocols/p4runtime/ctl/src/test/java/org/onosproject/p4runtime/ctl/P4RuntimeGroupTest.java
@@ -144,7 +144,7 @@
     }
 
     @AfterClass
-    public static void globalTeerDown() {
+    public static void globalTearDown() {
         grpcServer.shutdown();
         grpcChannel.shutdown();
     }
@@ -156,7 +156,7 @@
         client = new P4RuntimeClientImpl(DEVICE_ID, P4_DEVICE_ID,
                                          grpcChannel,
                                          controller);
-        client.p4RuntimeElectionId = DEFAULT_ELECTION_ID;
+        client.becomeMaster();
     }
 
     @Test
diff --git a/protocols/p4runtime/ctl/src/test/java/org/onosproject/p4runtime/ctl/DefaultPacketInTest.java b/protocols/p4runtime/ctl/src/test/java/org/onosproject/p4runtime/ctl/PacketInEventTest.java
similarity index 88%
rename from protocols/p4runtime/ctl/src/test/java/org/onosproject/p4runtime/ctl/DefaultPacketInTest.java
rename to protocols/p4runtime/ctl/src/test/java/org/onosproject/p4runtime/ctl/PacketInEventTest.java
index 5993142..0a1f82e 100644
--- a/protocols/p4runtime/ctl/src/test/java/org/onosproject/p4runtime/ctl/DefaultPacketInTest.java
+++ b/protocols/p4runtime/ctl/src/test/java/org/onosproject/p4runtime/ctl/PacketInEventTest.java
@@ -32,7 +32,7 @@
 /**
  * Test for DefaultPacketIn class.
  */
-public class DefaultPacketInTest {
+public class PacketInEventTest {
 
     private static final int DEFAULT_ORIGINAL_VALUE = 255;
     private static final int DEFAULT_BIT_WIDTH = 9;
@@ -46,10 +46,10 @@
     private PiPacketOperation packetOperation2;
     private PiPacketOperation nullPacketOperation = null;
 
-    private DefaultPacketIn packetIn;
-    private DefaultPacketIn sameAsPacketIn;
-    private DefaultPacketIn packetIn2;
-    private DefaultPacketIn packetIn3;
+    private PacketInEvent packetIn;
+    private PacketInEvent sameAsPacketIn;
+    private PacketInEvent packetIn2;
+    private PacketInEvent packetIn3;
 
     /**
      * Setup method for packetOperation and packetOperation2.
@@ -78,10 +78,10 @@
                                       .build())
                 .build();
 
-        packetIn = new DefaultPacketIn(deviceId, packetOperation);
-        sameAsPacketIn = new DefaultPacketIn(sameDeviceId, packetOperation);
-        packetIn2 = new DefaultPacketIn(deviceId2, packetOperation);
-        packetIn3 = new DefaultPacketIn(deviceId, packetOperation2);
+        packetIn = new PacketInEvent(deviceId, packetOperation);
+        sameAsPacketIn = new PacketInEvent(sameDeviceId, packetOperation);
+        packetIn2 = new PacketInEvent(deviceId2, packetOperation);
+        packetIn3 = new PacketInEvent(deviceId, packetOperation2);
     }
 
     /**
@@ -105,7 +105,7 @@
     @Test(expected = NullPointerException.class)
     public void testConstructorWithNullDeviceId() {
 
-        new DefaultPacketIn(nullDeviceId, packetOperation);
+        new PacketInEvent(nullDeviceId, packetOperation);
     }
 
     /**
@@ -114,7 +114,7 @@
     @Test(expected = NullPointerException.class)
     public void testConstructorWithNullPacketOperation() {
 
-        new DefaultPacketIn(deviceId, nullPacketOperation);
+        new PacketInEvent(deviceId, nullPacketOperation);
     }
 
     /**