Merge branch 'master' into dev-karaf-4.2.1
Change-Id: I86b9d80581cd76a7c20e05201023090f9692d1ab
diff --git a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/mirror/AbstractDistributedP4RuntimeMirror.java b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/mirror/AbstractDistributedP4RuntimeMirror.java
index 079ab62f4..caa72fe 100644
--- a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/mirror/AbstractDistributedP4RuntimeMirror.java
+++ b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/mirror/AbstractDistributedP4RuntimeMirror.java
@@ -17,15 +17,21 @@
package org.onosproject.drivers.p4runtime.mirror;
import com.google.common.annotations.Beta;
+import com.google.common.collect.Maps;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Deactivate;
import org.osgi.service.component.annotations.Reference;
import org.osgi.service.component.annotations.ReferenceCardinality;
+
import org.onlab.util.KryoNamespace;
+import org.onlab.util.SharedExecutors;
import org.onosproject.net.DeviceId;
import org.onosproject.net.pi.runtime.PiEntity;
import org.onosproject.net.pi.runtime.PiHandle;
+import org.onosproject.net.pi.service.PiPipeconfWatchdogEvent;
+import org.onosproject.net.pi.service.PiPipeconfWatchdogListener;
+import org.onosproject.net.pi.service.PiPipeconfWatchdogService;
import org.onosproject.store.service.EventuallyConsistentMap;
import org.onosproject.store.service.StorageService;
import org.onosproject.store.service.WallClockTimestamp;
@@ -33,9 +39,12 @@
import java.util.Collection;
import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import static com.google.common.base.Preconditions.checkNotNull;
+import static org.onosproject.net.pi.service.PiPipeconfWatchdogService.PipelineStatus.READY;
import static org.slf4j.LoggerFactory.getLogger;
/**
@@ -56,8 +65,14 @@
@Reference(cardinality = ReferenceCardinality.MANDATORY)
private StorageService storageService;
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ private PiPipeconfWatchdogService pipeconfWatchdogService;
+
private EventuallyConsistentMap<H, TimedEntry<E>> mirrorMap;
+ private final PiPipeconfWatchdogListener pipeconfListener =
+ new InternalPipeconfWatchdogListener();
+
@Activate
public void activate() {
mirrorMap = storageService
@@ -66,6 +81,7 @@
.withSerializer(storeSerializer())
.withTimestampProvider((k, v) -> new WallClockTimestamp())
.build();
+ pipeconfWatchdogService.addListener(pipeconfListener);
log.info("Started");
}
@@ -75,6 +91,7 @@
@Deactivate
public void deactivate() {
+ pipeconfWatchdogService.removeListener(pipeconfListener);
mirrorMap.destroy();
mirrorMap = null;
log.info("Stopped");
@@ -99,6 +116,14 @@
public void put(H handle, E entry) {
checkNotNull(handle);
checkNotNull(entry);
+ final PiPipeconfWatchdogService.PipelineStatus status =
+ pipeconfWatchdogService.getStatus(handle.deviceId());
+ if (!status.equals(READY)) {
+ log.info("Ignoring device mirror update because pipeline " +
+ "status of {} is {}: {}",
+ handle.deviceId(), status, entry);
+ return;
+ }
final long now = new WallClockTimestamp().unixTimestamp();
final TimedEntry<E> timedEntry = new TimedEntry<>(now, entry);
mirrorMap.put(handle, timedEntry);
@@ -110,4 +135,77 @@
mirrorMap.remove(handle);
}
+ @Override
+ public void sync(DeviceId deviceId, Map<H, E> deviceState) {
+ checkNotNull(deviceId);
+ final Map<H, E> localState = getMirrorMapForDevice(deviceId);
+
+ final AtomicInteger removeCount = new AtomicInteger(0);
+ final AtomicInteger updateCount = new AtomicInteger(0);
+ final AtomicInteger addCount = new AtomicInteger(0);
+ // Add missing entries.
+ deviceState.keySet().stream()
+ .filter(deviceHandle -> !localState.containsKey(deviceHandle))
+ .forEach(deviceHandle -> {
+ final E entryToAdd = deviceState.get(deviceHandle);
+ log.debug("Adding mirror entry for {}: {}",
+ deviceId, entryToAdd);
+ put(deviceHandle, entryToAdd);
+ addCount.incrementAndGet();
+ });
+ // Update or remove local entries.
+ localState.keySet().forEach(localHandle -> {
+ final E localEntry = localState.get(localHandle);
+ final E deviceEntry = deviceState.get(localHandle);
+ if (deviceEntry == null) {
+ log.debug("Removing mirror entry for {}: {}", deviceId, localEntry);
+ remove(localHandle);
+ removeCount.incrementAndGet();
+ } else if (!deviceEntry.equals(localEntry)) {
+ log.debug("Updating mirror entry for {}: {}-->{}",
+ deviceId, localEntry, deviceEntry);
+ put(localHandle, deviceEntry);
+ updateCount.incrementAndGet();
+ }
+ });
+ if (removeCount.get() + updateCount.get() + addCount.get() > 0) {
+ log.info("Synchronized mirror entries for {}: {} removed, {} updated, {} added",
+ deviceId, removeCount, updateCount, addCount);
+ }
+ }
+
+ private Set<H> getHandlesForDevice(DeviceId deviceId) {
+ return mirrorMap.keySet().stream()
+ .filter(h -> h.deviceId().equals(deviceId))
+ .collect(Collectors.toSet());
+ }
+
+ private Map<H, E> getMirrorMapForDevice(DeviceId deviceId) {
+ final Map<H, E> deviceMap = Maps.newHashMap();
+ mirrorMap.entrySet().stream()
+ .filter(e -> e.getKey().deviceId().equals(deviceId))
+ .forEach(e -> deviceMap.put(e.getKey(), e.getValue().entry()));
+ return deviceMap;
+ }
+
+ private void removeAll(DeviceId deviceId) {
+ checkNotNull(deviceId);
+ Collection<H> handles = getHandlesForDevice(deviceId);
+ handles.forEach(mirrorMap::remove);
+ }
+
+ public class InternalPipeconfWatchdogListener implements PiPipeconfWatchdogListener {
+ @Override
+ public void event(PiPipeconfWatchdogEvent event) {
+ log.debug("Flushing mirror for {}, pipeline status is {}",
+ event.subject(), event.type());
+ SharedExecutors.getPoolThreadExecutor().execute(
+ () -> removeAll(event.subject()));
+ }
+
+ @Override
+ public boolean isRelevant(PiPipeconfWatchdogEvent event) {
+ return event.type().equals(PiPipeconfWatchdogEvent.Type.PIPELINE_UNKNOWN);
+ }
+ }
}
diff --git a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/mirror/DistributedP4RuntimeActionProfileMemberMirror.java b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/mirror/DistributedP4RuntimeActionProfileMemberMirror.java
new file mode 100644
index 0000000..1ca29c1
--- /dev/null
+++ b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/mirror/DistributedP4RuntimeActionProfileMemberMirror.java
@@ -0,0 +1,48 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.drivers.p4runtime.mirror;
+
+import org.onlab.util.KryoNamespace;
+import org.onosproject.net.pi.runtime.PiActionGroupMember;
+import org.onosproject.net.pi.runtime.PiActionGroupMemberHandle;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.osgi.service.component.annotations.Component;
+
+/**
+ * Distributed implementation of a P4Runtime action profile member mirror.
+ */
+@Component(immediate = true, service = AbstractDistributedP4RuntimeMirror.class)
+public class DistributedP4RuntimeActionProfileMemberMirror
+ extends AbstractDistributedP4RuntimeMirror
+ <PiActionGroupMemberHandle, PiActionGroupMember>
+ implements P4RuntimeActionProfileMemberMirror {
+
+ private static final String DIST_MAP_NAME = "onos-p4runtime-act-prof-member-mirror";
+
+ @Override
+ String mapName() {
+ return DIST_MAP_NAME;
+ }
+
+ @Override
+ KryoNamespace storeSerializer() {
+ return KryoNamespace.newBuilder()
+ .register(KryoNamespaces.API)
+ .register(TimedEntry.class)
+ .build();
+ }
+}
diff --git a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/mirror/P4RuntimeActionProfileMemberMirror.java b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/mirror/P4RuntimeActionProfileMemberMirror.java
new file mode 100644
index 0000000..8ab1fa0
--- /dev/null
+++ b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/mirror/P4RuntimeActionProfileMemberMirror.java
@@ -0,0 +1,27 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.drivers.p4runtime.mirror;
+
+import org.onosproject.net.pi.runtime.PiActionGroupMember;
+import org.onosproject.net.pi.runtime.PiActionGroupMemberHandle;
+
+/**
+ * Mirror of action profile members installed on a P4Runtime device.
+ */
+public interface P4RuntimeActionProfileMemberMirror
+ extends P4RuntimeMirror<PiActionGroupMemberHandle, PiActionGroupMember> {
+}
diff --git a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/mirror/P4RuntimeMirror.java b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/mirror/P4RuntimeMirror.java
index ab18c9d..d1c9cdd 100644
--- a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/mirror/P4RuntimeMirror.java
+++ b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/mirror/P4RuntimeMirror.java
@@ -22,6 +22,7 @@
import org.onosproject.net.pi.runtime.PiHandle;
import java.util.Collection;
+import java.util.Map;
/**
* Service to keep track of the device state for a given class of PI entities.
@@ -71,4 +72,12 @@
* @param handle handle
*/
void remove(H handle);
+
+ /**
+ * Synchronizes the state of the given device ID with the given handle map.
+ *
+ * @param deviceId device ID
+ * @param handleMap handle map
+ */
+ void sync(DeviceId deviceId, Map<H, E> handleMap);
}