Merge branch 'master' into dev-karaf-4.2.1

Change-Id: I86b9d80581cd76a7c20e05201023090f9692d1ab
diff --git a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/mirror/AbstractDistributedP4RuntimeMirror.java b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/mirror/AbstractDistributedP4RuntimeMirror.java
index 079ab62f4..caa72fe 100644
--- a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/mirror/AbstractDistributedP4RuntimeMirror.java
+++ b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/mirror/AbstractDistributedP4RuntimeMirror.java
@@ -17,15 +17,21 @@
 package org.onosproject.drivers.p4runtime.mirror;
 
 import com.google.common.annotations.Beta;
+import com.google.common.collect.Maps;
 import org.osgi.service.component.annotations.Activate;
 import org.osgi.service.component.annotations.Component;
 import org.osgi.service.component.annotations.Deactivate;
 import org.osgi.service.component.annotations.Reference;
 import org.osgi.service.component.annotations.ReferenceCardinality;
+
 import org.onlab.util.KryoNamespace;
+import org.onlab.util.SharedExecutors;
 import org.onosproject.net.DeviceId;
 import org.onosproject.net.pi.runtime.PiEntity;
 import org.onosproject.net.pi.runtime.PiHandle;
+import org.onosproject.net.pi.service.PiPipeconfWatchdogEvent;
+import org.onosproject.net.pi.service.PiPipeconfWatchdogListener;
+import org.onosproject.net.pi.service.PiPipeconfWatchdogService;
 import org.onosproject.store.service.EventuallyConsistentMap;
 import org.onosproject.store.service.StorageService;
 import org.onosproject.store.service.WallClockTimestamp;
@@ -33,9 +39,12 @@
 
 import java.util.Collection;
 import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 
 import static com.google.common.base.Preconditions.checkNotNull;
+import static org.onosproject.net.pi.service.PiPipeconfWatchdogService.PipelineStatus.READY;
 import static org.slf4j.LoggerFactory.getLogger;
 
 /**
@@ -56,8 +65,14 @@
     @Reference(cardinality = ReferenceCardinality.MANDATORY)
     private StorageService storageService;
 
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    private PiPipeconfWatchdogService pipeconfWatchdogService;
+
     private EventuallyConsistentMap<H, TimedEntry<E>> mirrorMap;
 
+    private final PiPipeconfWatchdogListener pipeconfListener =
+            new InternalPipeconfWatchdogListener();
+
     @Activate
     public void activate() {
         mirrorMap = storageService
@@ -66,6 +81,7 @@
                 .withSerializer(storeSerializer())
                 .withTimestampProvider((k, v) -> new WallClockTimestamp())
                 .build();
+        pipeconfWatchdogService.addListener(pipeconfListener);
         log.info("Started");
     }
 
@@ -75,6 +91,7 @@
 
     @Deactivate
     public void deactivate() {
+        pipeconfWatchdogService.removeListener(pipeconfListener);
         mirrorMap.destroy();
         mirrorMap = null;
         log.info("Stopped");
@@ -99,6 +116,14 @@
     public void put(H handle, E entry) {
         checkNotNull(handle);
         checkNotNull(entry);
+        final PiPipeconfWatchdogService.PipelineStatus status =
+                pipeconfWatchdogService.getStatus(handle.deviceId());
+        if (!status.equals(READY)) {
+            log.info("Ignoring device mirror update because pipeline " +
+                             "status of {} is {}: {}",
+                     handle.deviceId(), status, entry);
+            return;
+        }
         final long now = new WallClockTimestamp().unixTimestamp();
         final TimedEntry<E> timedEntry = new TimedEntry<>(now, entry);
         mirrorMap.put(handle, timedEntry);
@@ -110,4 +135,77 @@
         mirrorMap.remove(handle);
     }
 
+    @Override
+    public void sync(DeviceId deviceId, Map<H, E> deviceState) {
+        checkNotNull(deviceId);
+        final Map<H, E> localState = getMirrorMapForDevice(deviceId);
+
+        final AtomicInteger removeCount = new AtomicInteger(0);
+        final AtomicInteger updateCount = new AtomicInteger(0);
+        final AtomicInteger addCount = new AtomicInteger(0);
+        // Add missing entries.
+        deviceState.keySet().stream()
+                .filter(deviceHandle -> !localState.containsKey(deviceHandle))
+                .forEach(deviceHandle -> {
+                    final E entryToAdd = deviceState.get(deviceHandle);
+                    log.debug("Adding mirror entry for {}: {}",
+                              deviceId, entryToAdd);
+                    put(deviceHandle, entryToAdd);
+                    addCount.incrementAndGet();
+                });
+        // Update or remove local entries.
+        localState.keySet().forEach(localHandle -> {
+            final E localEntry = localState.get(localHandle);
+            final E deviceEntry = deviceState.get(localHandle);
+            if (deviceEntry == null) {
+                log.debug("Removing mirror entry for {}: {}", deviceId, localEntry);
+                remove(localHandle);
+                removeCount.incrementAndGet();
+            } else if (!deviceEntry.equals(localEntry)) {
+                log.debug("Updating mirror entry for {}: {}-->{}",
+                          deviceId, localEntry, deviceEntry);
+                put(localHandle, deviceEntry);
+                updateCount.incrementAndGet();
+            }
+        });
+        if (removeCount.get() + updateCount.get() + addCount.get() > 0) {
+            log.info("Synchronized mirror entries for {}: {} removed, {} updated, {} added",
+                     deviceId, removeCount, updateCount, addCount);
+        }
+    }
+
+    private Set<H> getHandlesForDevice(DeviceId deviceId) {
+        return mirrorMap.keySet().stream()
+                .filter(h -> h.deviceId().equals(deviceId))
+                .collect(Collectors.toSet());
+    }
+
+    private Map<H, E> getMirrorMapForDevice(DeviceId deviceId) {
+        final Map<H, E> deviceMap = Maps.newHashMap();
+        mirrorMap.entrySet().stream()
+                .filter(e -> e.getKey().deviceId().equals(deviceId))
+                .forEach(e -> deviceMap.put(e.getKey(), e.getValue().entry()));
+        return deviceMap;
+    }
+
+    private void removeAll(DeviceId deviceId) {
+        checkNotNull(deviceId);
+        Collection<H> handles = getHandlesForDevice(deviceId);
+        handles.forEach(mirrorMap::remove);
+    }
+
+    public class InternalPipeconfWatchdogListener implements PiPipeconfWatchdogListener {
+        @Override
+        public void event(PiPipeconfWatchdogEvent event) {
+            log.debug("Flushing mirror for {}, pipeline status is {}",
+                      event.subject(), event.type());
+            SharedExecutors.getPoolThreadExecutor().execute(
+                    () -> removeAll(event.subject()));
+        }
+
+        @Override
+        public boolean isRelevant(PiPipeconfWatchdogEvent event) {
+            return event.type().equals(PiPipeconfWatchdogEvent.Type.PIPELINE_UNKNOWN);
+        }
+    }
 }
diff --git a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/mirror/DistributedP4RuntimeActionProfileMemberMirror.java b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/mirror/DistributedP4RuntimeActionProfileMemberMirror.java
new file mode 100644
index 0000000..1ca29c1
--- /dev/null
+++ b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/mirror/DistributedP4RuntimeActionProfileMemberMirror.java
@@ -0,0 +1,48 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.drivers.p4runtime.mirror;
+
+import org.onlab.util.KryoNamespace;
+import org.onosproject.net.pi.runtime.PiActionGroupMember;
+import org.onosproject.net.pi.runtime.PiActionGroupMemberHandle;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.osgi.service.component.annotations.Component;
+
+/**
+ * Distributed implementation of a P4Runtime action profile member mirror.
+ */
+@Component(immediate = true, service = AbstractDistributedP4RuntimeMirror.class)
+public class DistributedP4RuntimeActionProfileMemberMirror
+        extends AbstractDistributedP4RuntimeMirror
+        <PiActionGroupMemberHandle, PiActionGroupMember>
+        implements P4RuntimeActionProfileMemberMirror {
+
+    private static final String DIST_MAP_NAME = "onos-p4runtime-act-prof-member-mirror";
+
+    @Override
+    String mapName() {
+        return DIST_MAP_NAME;
+    }
+
+    @Override
+    KryoNamespace storeSerializer() {
+        return KryoNamespace.newBuilder()
+                .register(KryoNamespaces.API)
+                .register(TimedEntry.class)
+                .build();
+    }
+}
diff --git a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/mirror/P4RuntimeActionProfileMemberMirror.java b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/mirror/P4RuntimeActionProfileMemberMirror.java
new file mode 100644
index 0000000..8ab1fa0
--- /dev/null
+++ b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/mirror/P4RuntimeActionProfileMemberMirror.java
@@ -0,0 +1,27 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.drivers.p4runtime.mirror;
+
+import org.onosproject.net.pi.runtime.PiActionGroupMember;
+import org.onosproject.net.pi.runtime.PiActionGroupMemberHandle;
+
+/**
+ * Mirror of action profile members installed on a P4Runtime device.
+ */
+public interface P4RuntimeActionProfileMemberMirror
+        extends P4RuntimeMirror<PiActionGroupMemberHandle, PiActionGroupMember> {
+}
diff --git a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/mirror/P4RuntimeMirror.java b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/mirror/P4RuntimeMirror.java
index ab18c9d..d1c9cdd 100644
--- a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/mirror/P4RuntimeMirror.java
+++ b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/mirror/P4RuntimeMirror.java
@@ -22,6 +22,7 @@
 import org.onosproject.net.pi.runtime.PiHandle;
 
 import java.util.Collection;
+import java.util.Map;
 
 /**
  * Service to keep track of the device state for a given class of PI entities.
@@ -71,4 +72,12 @@
      * @param handle handle
      */
     void remove(H handle);
+
+    /**
+     * Synchronizes the state of the given device ID with the given handle map.
+     *
+     * @param deviceId  device ID
+     * @param handleMap handle map
+     */
+    void sync(DeviceId deviceId, Map<H, E> handleMap);
 }