[ONOS-7143] Add arbitration update support by P4RuntimeClient

Change-Id: I671275576018d50447f969166a7b42a28dd93b1d
diff --git a/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeClient.java b/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeClient.java
index 7c75c91..5072c65 100644
--- a/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeClient.java
+++ b/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeClient.java
@@ -156,5 +156,12 @@
      */
     void shutdown();
 
+    /**
+     * Sends a master arbitration update to the device.
+     *
+     * @return a completable future containing true if the operation was successful; false otherwise
+     */
+    CompletableFuture<Boolean> sendMasterArbitrationUpdate();
+
     // TODO: work in progress.
 }
diff --git a/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeController.java b/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeController.java
index b095bca..3e015a4 100644
--- a/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeController.java
+++ b/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeController.java
@@ -75,4 +75,11 @@
      * @return true if a client was created and is able to contact the P4Runtime server, false otherwise.
      */
     boolean isReacheable(DeviceId deviceId);
+
+    /**
+     * Gets new election id for device arbitration request.
+     *
+     * @return the election id
+     */
+    long getNewMasterElectionId();
 }
diff --git a/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeEvent.java b/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeEvent.java
index 1f04374..ab0146f 100644
--- a/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeEvent.java
+++ b/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeEvent.java
@@ -33,7 +33,11 @@
          * A packet-in.
          */
         PACKET_IN,
-        // TODO: add mastership, device as soon as we define those.
+
+        /**
+         * Arbitration reply.
+         */
+        ARBITRATION,
     }
 
     public P4RuntimeEvent(Type type, P4RuntimeEventSubject subject) {
diff --git a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/DefaultArbitration.java b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/DefaultArbitration.java
new file mode 100644
index 0000000..7370dc7
--- /dev/null
+++ b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/DefaultArbitration.java
@@ -0,0 +1,58 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.p4runtime.ctl;
+
+import org.onosproject.net.MastershipRole;
+import org.onosproject.p4runtime.api.P4RuntimeEventSubject;
+import p4.P4RuntimeOuterClass.Uint128;
+
+/**
+ * Default implementation of arbitration in P4Runtime.
+ */
+public class DefaultArbitration implements P4RuntimeEventSubject {
+    private MastershipRole role;
+    private Uint128 electionId;
+
+    /**
+     * Creates arbitration with given role and election id.
+     *
+     * @param role the role
+     * @param electionId the election id
+     */
+    public DefaultArbitration(MastershipRole role, Uint128 electionId) {
+        this.role = role;
+        this.electionId = electionId;
+    }
+
+    /**
+     * Gets the role of this arbitration.
+     *
+     * @return the role
+     */
+    public MastershipRole role() {
+        return role;
+    }
+
+    /**
+     * Gets election id of this arbitration.
+     *
+     * @return the election id
+     */
+    public Uint128 electionId() {
+        return electionId;
+    }
+}
diff --git a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeClientImpl.java b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeClientImpl.java
index 21bd31b..738417e 100644
--- a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeClientImpl.java
+++ b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeClientImpl.java
@@ -31,6 +31,7 @@
 import org.onlab.osgi.DefaultServiceDirectory;
 import org.onlab.util.Tools;
 import org.onosproject.net.DeviceId;
+import org.onosproject.net.MastershipRole;
 import org.onosproject.net.pi.model.PiPipeconf;
 import org.onosproject.net.pi.runtime.PiActionGroup;
 import org.onosproject.net.pi.runtime.PiActionGroupMember;
@@ -60,6 +61,7 @@
 import p4.P4RuntimeOuterClass.StreamMessageRequest;
 import p4.P4RuntimeOuterClass.StreamMessageResponse;
 import p4.P4RuntimeOuterClass.TableEntry;
+import p4.P4RuntimeOuterClass.Uint128;
 import p4.P4RuntimeOuterClass.Update;
 import p4.P4RuntimeOuterClass.WriteRequest;
 import p4.config.P4InfoOuterClass.P4Info;
@@ -74,6 +76,7 @@
 import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -104,6 +107,7 @@
             WriteOperationType.MODIFY, Update.Type.MODIFY,
             WriteOperationType.DELETE, Update.Type.DELETE
     );
+    private static final String ARBITRATION_RESULT_MASTER = "Is master";
 
     private final Logger log = getLogger(getClass());
 
@@ -117,6 +121,9 @@
     private final Lock writeLock = new ReentrantLock();
     private final StreamObserver<StreamMessageRequest> streamRequestObserver;
 
+    private Map<Uint128, CompletableFuture<Boolean>> arbitrationUpdateMap = Maps.newConcurrentMap();
+    protected Uint128 p4RuntimeElectionId;
+
     /**
      * Default constructor.
      *
@@ -257,34 +264,44 @@
                                "dumpGroups-" + actionProfileId.id());
     }
 
+    @Override
+    public CompletableFuture<Boolean> sendMasterArbitrationUpdate() {
+        return supplyInContext(this::doArbitrationUpdate, "arbitrationUpdate");
+    }
+
     /* Blocking method implementations below */
 
+    private boolean doArbitrationUpdate() {
+        CompletableFuture<Boolean> result = new CompletableFuture<>();
+        // TODO: currently we use 64-bit Long type for election id, should
+        // we use 128-bit ?
+        long nextElectId = controller.getNewMasterElectionId();
+        Uint128 newElectionId = Uint128.newBuilder()
+                .setLow(nextElectId)
+                .build();
+        MasterArbitrationUpdate arbitrationUpdate = MasterArbitrationUpdate.newBuilder()
+                .setDeviceId(p4DeviceId)
+                .setElectionId(newElectionId)
+                .build();
+        StreamMessageRequest requestMsg = StreamMessageRequest.newBuilder()
+                .setArbitration(arbitrationUpdate)
+                .build();
+        log.debug("Sending arbitration update to {} with election id {}...",
+                  deviceId, newElectionId);
+        arbitrationUpdateMap.put(newElectionId, result);
+        try {
+            streamRequestObserver.onNext(requestMsg);
+            return result.get();
+        } catch (InterruptedException | ExecutionException | StatusRuntimeException e) {
+            log.warn("Arbitration update failed for {} due to {}", deviceId, e);
+            arbitrationUpdateMap.remove(newElectionId);
+            return false;
+        }
+    }
     private boolean doInitStreamChannel() {
         // To listen for packets and other events, we need to start the RPC.
         // Here we do it by sending a master arbitration update.
-        log.info("initializing stream chanel on {}...", deviceId);
-        if (!doArbitrationUpdate()) {
-            log.warn("Unable to initialize stream channel for {}", deviceId);
-            return false;
-        } else {
-            return true;
-        }
-    }
-
-    private boolean doArbitrationUpdate() {
-        log.info("Sending arbitration update to {}...", deviceId);
-        StreamMessageRequest requestMsg = StreamMessageRequest.newBuilder()
-                .setArbitration(MasterArbitrationUpdate.newBuilder()
-                                        .setDeviceId(p4DeviceId)
-                                        .build())
-                .build();
-        try {
-            streamRequestObserver.onNext(requestMsg);
-            return true;
-        } catch (StatusRuntimeException e) {
-            log.warn("Arbitration update failed for {}: {}", deviceId, e);
-            return false;
-        }
+        return doArbitrationUpdate();
     }
 
     private boolean doSetPipelineConfig(PiPipeconf pipeconf, ByteBuffer deviceData) {
@@ -315,6 +332,7 @@
 
         SetForwardingPipelineConfigRequest request = SetForwardingPipelineConfigRequest
                 .newBuilder()
+                .setElectionId(p4RuntimeElectionId)
                 .setAction(VERIFY_AND_COMMIT)
                 .addConfigs(pipelineConfig)
                 .build();
@@ -330,7 +348,6 @@
 
     private boolean doWriteTableEntries(Collection<PiTableEntry> piTableEntries, WriteOperationType opType,
                                         PiPipeconf pipeconf) {
-
         WriteRequest.Builder writeRequestBuilder = WriteRequest.newBuilder();
 
         Collection<Update> updateMsgs = TableEntryEncoder.encode(piTableEntries, pipeconf)
@@ -350,11 +367,7 @@
 
         writeRequestBuilder
                 .setDeviceId(p4DeviceId)
-                /* PI ignores this ElectionId, commenting out for now.
-                .setElectionId(Uint128.newBuilder()
-                                       .setHigh(0)
-                                       .setLow(ELECTION_ID)
-                                       .build()) */
+                .setElectionId(p4RuntimeElectionId)
                 .addAllUpdates(updateMsgs)
                 .build();
 
@@ -455,8 +468,33 @@
     }
 
     private void doArbitrationUpdateFromDevice(MasterArbitrationUpdate arbitrationMsg) {
+        log.debug("Received arbitration update from {}: {}", deviceId, arbitrationMsg);
 
-        log.warn("Received arbitration update from {} (NOT IMPLEMENTED YET): {}", deviceId, arbitrationMsg);
+        Uint128 electionId = arbitrationMsg.getElectionId();
+        CompletableFuture<Boolean> mastershipFeature = arbitrationUpdateMap.remove(electionId);
+
+        if (mastershipFeature == null) {
+            log.warn("Can't find completable future of election id {}", electionId);
+            return;
+        }
+
+        this.p4RuntimeElectionId = electionId;
+        int statusCode = arbitrationMsg.getStatus().getCode();
+        MastershipRole arbitrationRole;
+        // arbitration update success
+
+        if (statusCode == Status.OK.getCode().value()) {
+            mastershipFeature.complete(true);
+            arbitrationRole = MastershipRole.MASTER;
+        } else {
+            mastershipFeature.complete(false);
+            arbitrationRole = MastershipRole.STANDBY;
+        }
+
+        DefaultArbitration arbitrationEventSubject = new DefaultArbitration(arbitrationRole, electionId);
+        P4RuntimeEvent event = new P4RuntimeEvent(P4RuntimeEvent.Type.ARBITRATION,
+                                                  arbitrationEventSubject);
+        controller.postEvent(event);
     }
 
     private Collection<PiCounterCellData> doReadCounterCells(Collection<PiCounterCellId> cellIds, PiPipeconf pipeconf) {
@@ -490,7 +528,6 @@
     }
 
     private boolean doWriteActionGroupMembers(PiActionGroup group, WriteOperationType opType, PiPipeconf pipeconf) {
-
         final Collection<ActionProfileMember> actionProfileMembers = Lists.newArrayList();
         try {
             for (PiActionGroupMember member : group.members()) {
@@ -518,6 +555,7 @@
 
         WriteRequest writeRequestMsg = WriteRequest.newBuilder()
                 .setDeviceId(p4DeviceId)
+                .setElectionId(p4RuntimeElectionId)
                 .addAllUpdates(updateMsgs)
                 .build();
         try {
@@ -652,7 +690,6 @@
     }
 
     private boolean doWriteActionGroup(PiActionGroup group, WriteOperationType opType, PiPipeconf pipeconf) {
-
         final ActionProfileGroup actionProfileGroup;
         try {
             actionProfileGroup = ActionProfileGroupEncoder.encode(group, pipeconf);
@@ -663,6 +700,7 @@
 
         final WriteRequest writeRequestMsg = WriteRequest.newBuilder()
                 .setDeviceId(p4DeviceId)
+                .setElectionId(p4RuntimeElectionId)
                 .addUpdates(Update.newBuilder()
                                     .setEntity(Entity.newBuilder()
                                                        .setActionProfileGroup(actionProfileGroup)
diff --git a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeControllerImpl.java b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeControllerImpl.java
index 383b857..57703a1 100644
--- a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeControllerImpl.java
+++ b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeControllerImpl.java
@@ -35,6 +35,8 @@
 import org.onosproject.p4runtime.api.P4RuntimeController;
 import org.onosproject.p4runtime.api.P4RuntimeEvent;
 import org.onosproject.p4runtime.api.P4RuntimeEventListener;
+import org.onosproject.store.service.AtomicCounter;
+import org.onosproject.store.service.StorageService;
 import org.slf4j.Logger;
 
 import java.io.IOException;
@@ -55,19 +57,26 @@
         extends AbstractListenerManager<P4RuntimeEvent, P4RuntimeEventListener>
         implements P4RuntimeController {
 
+    private static final String P4R_ELECTION = "p4runtime-election";
     private final Logger log = getLogger(getClass());
     private final NameResolverProvider nameResolverProvider = new DnsNameResolverProvider();
     private final Map<DeviceId, P4RuntimeClient> clients = Maps.newHashMap();
     private final Map<DeviceId, GrpcChannelId> channelIds = Maps.newHashMap();
     // TODO: should use a cache to delete unused locks.
     private final Map<DeviceId, ReadWriteLock> deviceLocks = Maps.newConcurrentMap();
+    private AtomicCounter electionIdGenerator;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     public GrpcController grpcController;
 
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    public StorageService storageService;
+
     @Activate
     public void activate() {
         eventDispatcher.addSink(P4RuntimeEvent.class, listenerRegistry);
+        electionIdGenerator = storageService.getAtomicCounter(P4R_ELECTION);
+
         log.info("Started");
     }
 
@@ -186,6 +195,11 @@
         }
     }
 
+    @Override
+    public long getNewMasterElectionId() {
+        return electionIdGenerator.incrementAndGet();
+    }
+
     public void postEvent(P4RuntimeEvent event) {
         post(event);
     }
diff --git a/protocols/p4runtime/ctl/src/test/java/org/onosproject/p4runtime/ctl/P4RuntimeGroupTest.java b/protocols/p4runtime/ctl/src/test/java/org/onosproject/p4runtime/ctl/P4RuntimeGroupTest.java
index fda5053..debf6a6 100644
--- a/protocols/p4runtime/ctl/src/test/java/org/onosproject/p4runtime/ctl/P4RuntimeGroupTest.java
+++ b/protocols/p4runtime/ctl/src/test/java/org/onosproject/p4runtime/ctl/P4RuntimeGroupTest.java
@@ -48,6 +48,7 @@
 import p4.P4RuntimeOuterClass.ActionProfileGroup;
 import p4.P4RuntimeOuterClass.ActionProfileMember;
 import p4.P4RuntimeOuterClass.Entity;
+import p4.P4RuntimeOuterClass.Uint128;
 import p4.P4RuntimeOuterClass.Update;
 import p4.P4RuntimeOuterClass.WriteRequest;
 
@@ -101,6 +102,7 @@
     private static final int SET_EGRESS_PORT_ID = 16794308;
     private static final String GRPC_SERVER_NAME = "P4RuntimeGroupTest";
     private static final long DEFAULT_TIMEOUT_TIME = 10;
+    private static final Uint128 DEFAULT_ELECTION_ID = Uint128.newBuilder().setLow(1).build();
 
     private P4RuntimeClientImpl client;
     private P4RuntimeControllerImpl controller;
@@ -156,6 +158,7 @@
         client = new P4RuntimeClientImpl(DEVICE_ID, P4_DEVICE_ID,
                                          grpcChannel,
                                          controller);
+        client.p4RuntimeElectionId = DEFAULT_ELECTION_ID;
     }
 
     @Test
@@ -166,6 +169,7 @@
         WriteRequest result = p4RuntimeServerImpl.getWriteReqs().get(0);
         assertEquals(1, result.getDeviceId());
         assertEquals(1, result.getUpdatesCount());
+        assertEquals(DEFAULT_ELECTION_ID, result.getElectionId());
 
         Update update = result.getUpdatesList().get(0);
         assertEquals(Update.Type.INSERT, update.getType());
@@ -194,6 +198,7 @@
         WriteRequest result = p4RuntimeServerImpl.getWriteReqs().get(0);
         assertEquals(1, result.getDeviceId());
         assertEquals(3, result.getUpdatesCount());
+        assertEquals(DEFAULT_ELECTION_ID, result.getElectionId());
 
         List<Update> updates = result.getUpdatesList();
         for (Update update : updates) {
diff --git a/protocols/p4runtime/proto/BUCK b/protocols/p4runtime/proto/BUCK
index a50c71f..f8623f8 100644
--- a/protocols/p4runtime/proto/BUCK
+++ b/protocols/p4runtime/proto/BUCK
@@ -5,7 +5,7 @@
 PROTOBUF_VER = '3.0.2'
 GRPC_VER = '1.3.0'
 
-PI_COMMIT = '9fc50cd0a0187eb1346272524d4b8bafb51bb513'
+PI_COMMIT = 'a8814a8ac40838a9df83fe47a17a025b69026fcf'
 PI_BASEURL = 'https://github.com/p4lang/PI.git'
 
 # Wondering which .proto files to build? Check p4runtime's Makefile: