More robust deferring for P4Runtime MasterArbitrationUpdate messages

The P4RuntimeClient implements a deferring mechanism to avoid becoming
master when it shouldn't, i.e. when the requested election ID is bigger
than the master one on the device, but it was not asked to be master.
We rely on a distributed primitive to make sure all nodes are aware of
the latest master election ID on the device. In case we lose primitive
update events, this patch adds a maximum timeout after which the
MasterArbitrationUpdate message is always sent.

Change-Id: I98669dac3fc79173b9e98f41e11a4d08901c6cb1
diff --git a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/client/StreamClientImpl.java b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/client/StreamClientImpl.java
index df4844b..11e91aa 100644
--- a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/client/StreamClientImpl.java
+++ b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/client/StreamClientImpl.java
@@ -21,6 +21,7 @@
 import io.grpc.StatusRuntimeException;
 import io.grpc.stub.ClientCallStreamObserver;
 import io.grpc.stub.StreamObserver;
+import org.onlab.util.SharedScheduledExecutors;
 import org.onosproject.net.DeviceId;
 import org.onosproject.net.device.DeviceAgentEvent;
 import org.onosproject.net.pi.model.PiPipeconf;
@@ -41,11 +42,14 @@
 import java.math.BigInteger;
 import java.net.ConnectException;
 import java.nio.ByteBuffer;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
 import static java.lang.String.format;
+import static java.lang.System.currentTimeMillis;
 import static org.onosproject.p4runtime.ctl.codec.Codecs.CODECS;
 import static org.slf4j.LoggerFactory.getLogger;
 
@@ -57,6 +61,9 @@
 
     private static final Logger log = getLogger(StreamClientImpl.class);
 
+    private static final int ARBITRATION_RETRY_SECONDS = 3;
+    private static final int ARBITRATION_TIMEOUT_SECONDS = 15;
+
     private final P4RuntimeClientImpl client;
     private final DeviceId deviceId;
     private final long p4DeviceId;
@@ -73,6 +80,9 @@
     private BigInteger pendingElectionId = null;
     private BigInteger lastUsedElectionId = null;
 
+    private ScheduledFuture<?> pendingElectionIdRetryTask = null;
+    private long pendingElectionIdTimestamp = 0;
+
     StreamClientImpl(
             PiPipeconfService pipeconfService,
             MasterElectionIdStore masterElectionIdStore,
@@ -120,23 +130,54 @@
                 // No pending requests.
                 return;
             }
-            if (!requestedToBeMaster.get() && masterElectionId != null
-                    && pendingElectionId.compareTo(masterElectionId) > 0) {
-                log.info("Deferring sending master arbitration update, master " +
-                                  "election ID of server ({}) is smaller than " +
-                                  "requested one ({}), but we do NOT want to be master...",
-                          masterElectionId, pendingElectionId);
-                // Will try again as soon as the server reports a new master
-                // election ID that is bigger than the pending non-master one.
+            // Cancel any pending task. We'll reschedule if needed.
+            if (pendingElectionIdRetryTask != null) {
+                // Do not interrupt if running, as this might be executed by the
+                // pending task itself.
+                pendingElectionIdRetryTask.cancel(false);
+                pendingElectionIdRetryTask = null;
+            }
+            // We implement a deferring mechanism to avoid becoming master when
+            // we shouldn't, i.e. when the requested election ID is bigger than
+            // the master one on the device, but we don't want to be master.
+            // However, we make sure not to defer for more than
+            // ARBITRATION_TIMEOUT_SECONDS.
+            final boolean timeoutExpired;
+            if (pendingElectionIdTimestamp == 0) {
+                pendingElectionIdTimestamp = currentTimeMillis();
+                timeoutExpired = false;
+            } else {
+                timeoutExpired = (currentTimeMillis() - pendingElectionIdTimestamp)
+                        > ARBITRATION_TIMEOUT_SECONDS * 1000;
+            }
+            if (timeoutExpired) {
+                log.warn("{} arbitration timeout expired! Will send pending election ID now...",
+                         deviceId);
+            }
+            if (!timeoutExpired &&
+                    !requestedToBeMaster.get() && masterElectionId != null &&
+                    pendingElectionId.compareTo(masterElectionId) > 0) {
+                log.info("Deferring sending master arbitration update for {}, master " +
+                                 "election ID of server ({}) is smaller than " +
+                                 "requested one ({}), but we do NOT want to be master...",
+                         deviceId, masterElectionId, pendingElectionId);
+                // Will try again as soon as the master election ID store is
+                // updated...
                 masterElectionIdStore.setListener(deviceId, masterElectionIdListener);
+                // ..or in ARBITRATION_RETRY_SECONDS at the latest (if we missed
+                // the store event).
+                pendingElectionIdRetryTask = SharedScheduledExecutors.newTimeout(
+                        () -> handlePendingElectionId(masterElectionIdStore.get(deviceId)),
+                        ARBITRATION_RETRY_SECONDS, TimeUnit.SECONDS);
             } else {
                 // Send now.
                 log.info("Setting mastership on {}... " +
-                                  "master={}, newElectionId={}, masterElectionId={}",
-                          deviceId, requestedToBeMaster.get(),
-                          pendingElectionId, masterElectionId);
+                                 "master={}, newElectionId={}, masterElectionId={}",
+                         deviceId, requestedToBeMaster.get(),
+                         pendingElectionId, masterElectionId);
                 sendMasterArbitrationUpdate(pendingElectionId);
                 pendingElectionId = null;
+                pendingElectionIdTimestamp = 0;
                 // No need to listen for master election ID changes.
                 masterElectionIdStore.unsetListener(deviceId);
             }
@@ -151,8 +192,8 @@
     @Override
     public void packetOut(PiPacketOperation packet, PiPipeconf pipeconf) {
         if (!isSessionOpen()) {
-            log.debug("Dropping packet-out request for {}, session is closed",
-                      deviceId);
+            log.warn("Dropping packet-out request for {}, session is closed",
+                     deviceId);
             return;
         }
         if (log.isTraceEnabled()) {
@@ -284,6 +325,8 @@
 
         @Override
         public void updated(BigInteger masterElectionId) {
+            log.debug("UPDATED master election ID of {}: {}",
+                      deviceId, masterElectionId);
             handlePendingElectionId(masterElectionId);
         }
     }