ONOS-7050 Refactored P4Runtime FRP to use distributed stores

It uses the PI translation store and a newly introduced P4Runtime device
mirror.

Change-Id: Id2031af5e9bbdc8be4ec6967b867f97d35d54ab0
diff --git a/drivers/p4runtime/BUCK b/drivers/p4runtime/BUCK
index b8167e8..09c9bf7 100644
--- a/drivers/p4runtime/BUCK
+++ b/drivers/p4runtime/BUCK
@@ -2,9 +2,11 @@
 
 COMPILE_DEPS = [
     '//lib:CORE_DEPS',
+    '//lib:KRYO',
     '//protocols/p4runtime/api:onos-protocols-p4runtime-api',
     '//incubator/grpc-dependencies:grpc-core-repkg-' + GRPC_VER,
     '//lib:grpc-netty-' + GRPC_VER,
+    '//core/store/serializers:onos-core-serializers',
 ]
 
 BUNDLES = [
diff --git a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeFlowRuleProgrammable.java b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeFlowRuleProgrammable.java
index 053dd7c..ca9392f 100644
--- a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeFlowRuleProgrammable.java
+++ b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeFlowRuleProgrammable.java
@@ -20,6 +20,9 @@
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import io.grpc.StatusRuntimeException;
+import org.onlab.util.SharedExecutors;
+import org.onosproject.drivers.p4runtime.mirror.P4RuntimeTableMirror;
+import org.onosproject.drivers.p4runtime.mirror.TimedEntry;
 import org.onosproject.net.flow.DefaultFlowEntry;
 import org.onosproject.net.flow.FlowEntry;
 import org.onosproject.net.flow.FlowRule;
@@ -32,15 +35,17 @@
 import org.onosproject.net.pi.runtime.PiCounterCellData;
 import org.onosproject.net.pi.runtime.PiCounterCellId;
 import org.onosproject.net.pi.runtime.PiTableEntry;
+import org.onosproject.net.pi.runtime.PiTableEntryHandle;
+import org.onosproject.net.pi.service.PiFlowRuleTranslator;
+import org.onosproject.net.pi.service.PiTranslatedEntity;
 import org.onosproject.net.pi.service.PiTranslationException;
 import org.onosproject.p4runtime.api.P4RuntimeClient.WriteOperationType;
-import org.onosproject.p4runtime.api.P4RuntimeFlowRuleWrapper;
-import org.onosproject.p4runtime.api.P4RuntimeTableEntryReference;
 
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutionException;
@@ -49,6 +54,7 @@
 import java.util.stream.Collectors;
 
 import static com.google.common.collect.Lists.newArrayList;
+import static java.util.Collections.singleton;
 import static org.onosproject.drivers.p4runtime.P4RuntimeFlowRuleProgrammable.Operation.APPLY;
 import static org.onosproject.drivers.p4runtime.P4RuntimeFlowRuleProgrammable.Operation.REMOVE;
 import static org.onosproject.net.flow.FlowEntry.FlowEntryState.ADDED;
@@ -59,44 +65,40 @@
 /**
  * Implementation of the flow rule programmable behaviour for P4Runtime.
  */
-public class P4RuntimeFlowRuleProgrammable extends AbstractP4RuntimeHandlerBehaviour implements FlowRuleProgrammable {
+public class P4RuntimeFlowRuleProgrammable
+        extends AbstractP4RuntimeHandlerBehaviour
+        implements FlowRuleProgrammable {
 
-    /*
-    When updating an existing rule, if true, we issue a DELETE operation before inserting the new one, otherwise we
-    issue a MODIFY operation. This is useful fore devices that do not support MODIFY operations for table entries.
-     */
+    // When updating an existing rule, if true, we issue a DELETE operation
+    // before inserting the new one, otherwise we issue a MODIFY operation. This
+    // is useful fore devices that do not support MODIFY operations for table
+    // entries.
     // TODO: make this attribute configurable by child drivers (e.g. BMv2 or Tofino)
     private boolean deleteEntryBeforeUpdate = true;
 
-    /*
-    If true, we ignore re-installing rules that are already known in the ENTRY_STORE, i.e. same match key and action.
-     */
-    // TODO: can remove this check as soon as the multi-apply-per-same-flow rule bug is fixed.
-    private boolean checkEntryStoreBeforeUpdate = true;
+    // If true, we ignore re-installing rules that are already exists the
+    // device, i.e. same match key and action.
+    // FIXME: can remove this check as soon as the multi-apply-per-same-flow rule bug is fixed.
+    private boolean checkStoreBeforeUpdate = true;
 
-    /*
-    If true, we avoid querying the device and return the content of the ENTRY_STORE.
-     */
-    // TODO: set to false after bmv2/PI bug fixed
+    // If true, we avoid querying the device and return what's already known by
+    // the ONOS store.
     private boolean ignoreDeviceWhenGet = true;
 
-    /*
-    If true, we read all direct counters of a table with one request. Otherwise, send as many request as the number of
-    table entries.
-     */
-    // TODO: set to true as soon as the feature is implemented in P4Runtime.
+    /* If true, we read all direct counters of a table with one request.
+    Otherwise, we send as many requests as the number of table entries. */
+    // FIXME: set to true as soon as the feature is implemented in P4Runtime.
     private boolean readAllDirectCounters = false;
 
     // Needed to synchronize operations over the same table entry.
-    private static final ConcurrentMap<P4RuntimeTableEntryReference, Lock> ENTRY_LOCKS = Maps.newConcurrentMap();
-
-    // TODO: replace with distributed store.
-    // Can reuse old BMv2TableEntryService from ONOS 1.6
-    private static final ConcurrentMap<P4RuntimeTableEntryReference, P4RuntimeFlowRuleWrapper> ENTRY_STORE =
-            Maps.newConcurrentMap();
+    // FIXME: locks should be removed when unused (hint use cache with timeout)
+    private static final ConcurrentMap<PiTableEntryHandle, Lock>
+            ENTRY_LOCKS = Maps.newConcurrentMap();
 
     private PiPipelineModel pipelineModel;
     private PiPipelineInterpreter interpreter;
+    private P4RuntimeTableMirror tableMirror;
+    private PiFlowRuleTranslator translator;
 
     @Override
     protected boolean setupBehaviour() {
@@ -111,6 +113,8 @@
         }
         interpreter = device.as(PiPipelineInterpreter.class);
         pipelineModel = pipeconf.pipelineModel();
+        tableMirror = handler().get(P4RuntimeTableMirror.class);
+        translator = piTranslationService.flowRuleTranslator();
         return true;
     }
 
@@ -122,98 +126,70 @@
         }
 
         if (ignoreDeviceWhenGet) {
-            return ENTRY_STORE.values().stream()
-                    .filter(frWrapper -> frWrapper.rule().deviceId().equals(this.deviceId))
-                    .map(frWrapper -> new DefaultFlowEntry(frWrapper.rule(), ADDED, frWrapper.lifeInSeconds(),
-                                                           0, 0))
-                    .collect(Collectors.toList());
+            return getFlowEntriesFromMirror();
         }
 
-        ImmutableList.Builder<FlowEntry> resultBuilder = ImmutableList.builder();
-        List<PiTableEntry> inconsistentEntries = Lists.newArrayList();
+        final ImmutableList.Builder<FlowEntry> result = ImmutableList.builder();
+        final List<PiTableEntry> inconsistentEntries = Lists.newArrayList();
 
         for (PiTableModel tableModel : pipelineModel.tables()) {
 
-            PiTableId piTableId = tableModel.id();
+            final PiTableId piTableId = tableModel.id();
 
-            Collection<PiTableEntry> installedEntries;
+            // Read table entries.
+            final Collection<PiTableEntry> installedEntries;
             try {
-                // TODO: optimize by dumping entries and counters in parallel, from ALL tables with the same request.
+                // TODO: optimize by dumping entries and counters in parallel
+                // From ALL tables with the same request.
                 installedEntries = client.dumpTable(piTableId, pipeconf).get();
             } catch (InterruptedException | ExecutionException e) {
                 if (!(e.getCause() instanceof StatusRuntimeException)) {
                     // gRPC errors are logged in the client.
-                    log.error("Exception while dumping table {} of {}", piTableId, deviceId, e);
+                    log.error("Exception while dumping table {} of {}",
+                              piTableId, deviceId, e);
                 }
-                return Collections.emptyList();
+                continue; // next table
             }
 
-            Map<PiTableEntry, PiCounterCellData> counterCellMap;
-            try {
-                if (interpreter.mapTableCounter(piTableId).isPresent()) {
-                    PiCounterId piCounterId = interpreter.mapTableCounter(piTableId).get();
-                    Collection<PiCounterCellData> cellDatas;
-                    if (readAllDirectCounters) {
-                        cellDatas = client.readAllCounterCells(Collections.singleton(piCounterId), pipeconf).get();
-                    } else {
-                        Set<PiCounterCellId> cellIds = installedEntries.stream()
-                                .map(entry -> PiCounterCellId.ofDirect(piCounterId, entry))
-                                .collect(Collectors.toSet());
-                        cellDatas = client.readCounterCells(cellIds, pipeconf).get();
-                    }
-                    counterCellMap = cellDatas.stream()
-                            .collect(Collectors.toMap(c -> (c.cellId()).tableEntry(), c -> c));
-                } else {
-                    counterCellMap = Collections.emptyMap();
-                }
-                installedEntries = client.dumpTable(piTableId, pipeconf).get();
-            } catch (InterruptedException | ExecutionException e) {
-                if (!(e.getCause() instanceof StatusRuntimeException)) {
-                    // gRPC errors are logged in the client.
-                    log.error("Exception while reading counters of table {} of {}", piTableId, deviceId, e);
-                }
+            if (installedEntries.size() == 0) {
+                continue; // next table
+            }
+
+            // Read table direct counters (if any).
+            final Map<PiTableEntry, PiCounterCellData> counterCellMap;
+            if (interpreter.mapTableCounter(piTableId).isPresent()) {
+                PiCounterId piCounterId = interpreter.mapTableCounter(piTableId).get();
+                counterCellMap = readEntryCounters(piCounterId, installedEntries);
+            } else {
                 counterCellMap = Collections.emptyMap();
             }
 
+            // Forge flow entries with counter values.
             for (PiTableEntry installedEntry : installedEntries) {
 
-                P4RuntimeTableEntryReference entryRef = new P4RuntimeTableEntryReference(deviceId,
-                                                                                         piTableId,
-                                                                                         installedEntry.matchKey());
+                final FlowEntry flowEntry = forgeFlowEntry(
+                        installedEntry, counterCellMap.get(installedEntry));
 
-                if (!ENTRY_STORE.containsKey(entryRef)) {
-                    // Inconsistent entry
+                if (flowEntry == null) {
+                    // Entry is on device but unknown to translation service or
+                    // device mirror. Inconsistent. Mark for removal.
+                    // TODO: make this behaviour configurable
+                    // In some cases it's fine for the device to have rules
+                    // that were not installed by us.
                     inconsistentEntries.add(installedEntry);
-                    continue; // next one.
+                } else {
+                    result.add(flowEntry);
                 }
-
-                P4RuntimeFlowRuleWrapper frWrapper = ENTRY_STORE.get(entryRef);
-
-                long bytes = 0L;
-                long packets = 0L;
-                if (counterCellMap.containsKey(installedEntry)) {
-                    PiCounterCellData counterCellData = counterCellMap.get(installedEntry);
-                    bytes = counterCellData.bytes();
-                    packets = counterCellData.packets();
-                }
-
-                resultBuilder.add(new DefaultFlowEntry(frWrapper.rule(),
-                                                       ADDED,
-                                                       frWrapper.lifeInSeconds(),
-                                                       packets,
-                                                       bytes));
             }
         }
 
         if (inconsistentEntries.size() > 0) {
-            log.warn("Found {} entries in {} that are not known by table entry service," +
-                             " removing them", inconsistentEntries.size(), deviceId);
-            inconsistentEntries.forEach(entry -> log.debug(entry.toString()));
-            // Async remove them.
-            client.writeTableEntries(inconsistentEntries, DELETE, pipeconf);
+            // Async clean up inconsistent entries.
+            SharedExecutors.getSingleThreadExecutor().execute(
+                    () -> cleanUpInconsistentEntries(inconsistentEntries));
         }
 
-        return resultBuilder.build();
+        return result.build();
     }
 
     @Override
@@ -226,109 +202,206 @@
         return processFlowRules(rules, REMOVE);
     }
 
-    private Collection<FlowRule> processFlowRules(Collection<FlowRule> rules, Operation operation) {
+    private FlowEntry forgeFlowEntry(PiTableEntry entry,
+                                     PiCounterCellData cellData) {
+        final PiTableEntryHandle handle = PiTableEntryHandle
+                .of(deviceId, entry);
+        final Optional<PiTranslatedEntity<FlowRule, PiTableEntry>>
+                translatedEntity = translator.lookup(handle);
+        final TimedEntry<PiTableEntry> timedEntry = tableMirror.get(handle);
+
+        if (!translatedEntity.isPresent()) {
+            log.debug("Handle not found in store: {}", handle);
+            return null;
+        }
+
+        if (timedEntry == null) {
+            log.debug("Handle not found in device mirror: {}", handle);
+            return null;
+        }
+
+        if (cellData != null) {
+            return new DefaultFlowEntry(translatedEntity.get().original(),
+                                        ADDED, timedEntry.lifeSec(), cellData.bytes(),
+                                        cellData.bytes());
+        } else {
+            return new DefaultFlowEntry(translatedEntity.get().original(),
+                                        ADDED, timedEntry.lifeSec(), 0, 0);
+        }
+    }
+
+    private Collection<FlowEntry> getFlowEntriesFromMirror() {
+        return tableMirror.getAll(deviceId).stream()
+                .map(timedEntry -> forgeFlowEntry(
+                        timedEntry.entry(), null))
+                .collect(Collectors.toList());
+    }
+
+    private void cleanUpInconsistentEntries(Collection<PiTableEntry> piEntries) {
+        log.warn("Found {} entries from {} not on translation store, removing them...",
+                 piEntries.size(), deviceId);
+        piEntries.forEach(entry -> {
+            log.debug(entry.toString());
+            applyEntry(PiTableEntryHandle.of(deviceId, entry),
+                       entry, null, REMOVE);
+        });
+    }
+
+    private Collection<FlowRule> processFlowRules(Collection<FlowRule> rules,
+                                                  Operation driverOperation) {
 
         if (!setupBehaviour()) {
             return Collections.emptyList();
         }
 
-        ImmutableList.Builder<FlowRule> processedFlowRuleListBuilder = ImmutableList.builder();
+        final ImmutableList.Builder<FlowRule> result = ImmutableList.builder();
 
-        // TODO: send write operations in bulk (e.g. all entries to insert, modify or delete).
+        // TODO: send writes in bulk (e.g. all entries to insert, modify or delete).
         // Instead of calling the client for each one of them.
 
-        for (FlowRule rule : rules) {
+        for (FlowRule ruleToApply : rules) {
 
-            PiTableEntry piTableEntry;
-
+            final PiTableEntry piEntryToApply;
             try {
-                piTableEntry = piTranslationService.flowRuleTranslator().translate(rule, pipeconf);
+                piEntryToApply = translator.translate(ruleToApply, pipeconf);
             } catch (PiTranslationException e) {
-                log.warn("Unable to translate flow rule: {} - {}", e.getMessage(), rule);
-                continue; // next rule
+                log.warn("Unable to translate flow rule for pipeconf '{}': {} - {}",
+                         pipeconf.id(), e.getMessage(), ruleToApply);
+                // Next rule.
+                continue;
             }
 
-            PiTableId tableId = piTableEntry.table();
-            P4RuntimeTableEntryReference entryRef = new P4RuntimeTableEntryReference(deviceId,
-                                                                                     tableId, piTableEntry.matchKey());
+            final PiTableEntryHandle handle = PiTableEntryHandle
+                    .of(deviceId, piEntryToApply);
 
-            Lock lock = ENTRY_LOCKS.computeIfAbsent(entryRef, k -> new ReentrantLock());
+            // Serialize operations over the same match key/table/device ID.
+            final Lock lock = ENTRY_LOCKS.computeIfAbsent(handle, k -> new ReentrantLock());
             lock.lock();
-
             try {
-
-                P4RuntimeFlowRuleWrapper frWrapper = ENTRY_STORE.get(entryRef);
-                WriteOperationType opType = null;
-                boolean doApply = true;
-
-                if (operation == APPLY) {
-                    if (frWrapper == null) {
-                        // Entry is first-timer.
-                        opType = INSERT;
-                    } else {
-                        // This match key already exists in the device.
-                        if (checkEntryStoreBeforeUpdate &&
-                                piTableEntry.action().equals(frWrapper.piTableEntry().action())) {
-                            doApply = false;
-                            log.debug("Ignoring re-apply of existing entry: {}", piTableEntry);
-                        }
-                        if (doApply) {
-                            if (deleteEntryBeforeUpdate) {
-                                // We've seen some strange error when trying to modify existing flow rules.
-                                // Remove before re-adding the modified one.
-                                try {
-                                    if (client.writeTableEntries(newArrayList(piTableEntry), DELETE, pipeconf).get()) {
-                                        frWrapper = null;
-                                    } else {
-                                        log.warn("Unable to DELETE table entry (before re-adding) in {}: {}",
-                                                 deviceId, piTableEntry);
-                                    }
-                                } catch (InterruptedException | ExecutionException e) {
-                                    log.warn("Exception while deleting table entry:", operation.name(), e);
-                                }
-                                opType = INSERT;
-                            } else {
-                                opType = MODIFY;
-                            }
-                        }
-                    }
-                } else {
-                    opType = DELETE;
+                if (applyEntry(handle, piEntryToApply,
+                               ruleToApply, driverOperation)) {
+                    result.add(ruleToApply);
                 }
-
-                if (doApply) {
-                    try {
-                        if (client.writeTableEntries(newArrayList(piTableEntry), opType, pipeconf).get()) {
-                            processedFlowRuleListBuilder.add(rule);
-                            if (operation == APPLY) {
-                                frWrapper = new P4RuntimeFlowRuleWrapper(rule, piTableEntry,
-                                                                         System.currentTimeMillis());
-                            } else {
-                                frWrapper = null;
-                            }
-                        } else {
-                            log.warn("Unable to {} table entry in {}: {}", opType.name(), deviceId, piTableEntry);
-                        }
-                    } catch (InterruptedException | ExecutionException e) {
-                        log.warn("Exception while performing {} table entry operation:", operation.name(), e);
-                    }
-                } else {
-                    processedFlowRuleListBuilder.add(rule);
-                }
-
-                // Update entryRef binding in table entry service.
-                if (frWrapper != null) {
-                    ENTRY_STORE.put(entryRef, frWrapper);
-                } else {
-                    ENTRY_STORE.remove(entryRef);
-                }
-
             } finally {
                 lock.unlock();
             }
         }
 
-        return processedFlowRuleListBuilder.build();
+        return result.build();
+    }
+
+    /**
+     * Applies the given entry to the device, and returns true if the operation
+     * was successful, false otherwise.
+     */
+    private boolean applyEntry(PiTableEntryHandle handle,
+                               PiTableEntry piEntryToApply,
+                               FlowRule ruleToApply,
+                               Operation driverOperation) {
+        // Depending on the driver operation, and if a matching rule exists on
+        // the device, decide which P4 Runtime write operation to perform for
+        // this entry.
+        final TimedEntry<PiTableEntry> piEntryOnDevice = tableMirror.get(handle);
+        final WriteOperationType p4Operation;
+        if (driverOperation == APPLY) {
+            if (piEntryOnDevice == null) {
+                // Entry is first-timer.
+                p4Operation = INSERT;
+            } else {
+                if (checkStoreBeforeUpdate
+                        && piEntryToApply.action().equals(piEntryOnDevice.entry().action())) {
+                    log.debug("Ignoring re-apply of existing entry: {}", piEntryToApply);
+                    p4Operation = null;
+                } else if (deleteEntryBeforeUpdate) {
+                    // Some devices return error when updating existing
+                    // entries. If requested, remove entry before
+                    // re-inserting the modified one.
+                    applyEntry(handle, piEntryOnDevice.entry(), null, REMOVE);
+                    p4Operation = INSERT;
+                } else {
+                    p4Operation = MODIFY;
+                }
+            }
+        } else {
+            p4Operation = DELETE;
+        }
+
+        if (p4Operation != null) {
+            if (writeEntry(piEntryToApply, p4Operation)) {
+                updateStores(handle, piEntryToApply, ruleToApply, p4Operation);
+                return true;
+            } else {
+                return false;
+            }
+        } else {
+            // If no operation, let's pretend we applied the rule to the device.
+            return true;
+        }
+    }
+
+    /**
+     * Performs a write operation on the device.
+     */
+    private boolean writeEntry(PiTableEntry entry,
+                               WriteOperationType p4Operation) {
+        try {
+            if (client.writeTableEntries(
+                    newArrayList(entry), p4Operation, pipeconf).get()) {
+                return true;
+            } else {
+                log.warn("Unable to {} table entry in {}: {}",
+                         p4Operation.name(), deviceId, entry);
+            }
+        } catch (InterruptedException | ExecutionException e) {
+            log.warn("Exception while performing {} table entry operation:",
+                     p4Operation, e);
+        }
+        return false;
+    }
+
+    private void updateStores(PiTableEntryHandle handle,
+                              PiTableEntry entry,
+                              FlowRule rule,
+                              WriteOperationType p4Operation) {
+        switch (p4Operation) {
+            case INSERT:
+            case MODIFY:
+                tableMirror.put(handle, entry);
+                translator.learn(handle, new PiTranslatedEntity<>(rule, entry, handle));
+                break;
+            case DELETE:
+                tableMirror.remove(handle);
+                translator.forget(handle);
+                break;
+            default:
+                throw new IllegalArgumentException(
+                        "Unknown operation " + p4Operation.name());
+        }
+    }
+
+    private Map<PiTableEntry, PiCounterCellData> readEntryCounters(
+            PiCounterId counterId, Collection<PiTableEntry> tableEntries) {
+        Collection<PiCounterCellData> cellDatas;
+        try {
+            if (readAllDirectCounters) {
+                cellDatas = client.readAllCounterCells(
+                        singleton(counterId), pipeconf).get();
+            } else {
+                Set<PiCounterCellId> cellIds = tableEntries.stream()
+                        .map(entry -> PiCounterCellId.ofDirect(counterId, entry))
+                        .collect(Collectors.toSet());
+                cellDatas = client.readCounterCells(cellIds, pipeconf).get();
+            }
+            return cellDatas.stream()
+                    .collect(Collectors.toMap(c -> c.cellId().tableEntry(), c -> c));
+        } catch (InterruptedException | ExecutionException e) {
+            if (!(e.getCause() instanceof StatusRuntimeException)) {
+                // gRPC errors are logged in the client.
+                log.error("Exception while reading counter '{}' from {}: {}",
+                          counterId, deviceId, e);
+            }
+            return Collections.emptyMap();
+        }
     }
 
     enum Operation {
diff --git a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/mirror/AbstractDistributedP4RuntimeMirror.java b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/mirror/AbstractDistributedP4RuntimeMirror.java
new file mode 100644
index 0000000..be0b0b3
--- /dev/null
+++ b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/mirror/AbstractDistributedP4RuntimeMirror.java
@@ -0,0 +1,112 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.drivers.p4runtime.mirror;
+
+import com.google.common.annotations.Beta;
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.onlab.util.KryoNamespace;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.pi.runtime.PiEntity;
+import org.onosproject.net.pi.runtime.PiHandle;
+import org.onosproject.store.service.EventuallyConsistentMap;
+import org.onosproject.store.service.StorageService;
+import org.onosproject.store.service.WallClockTimestamp;
+import org.slf4j.Logger;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.slf4j.LoggerFactory.getLogger;
+
+/**
+ * Abstract implementation of a distributed P4Runtime mirror, backed by an
+ * {@link EventuallyConsistentMap}.
+ *
+ * @param <H> handle class
+ * @param <E> entry class
+ */
+@Beta
+@Component(immediate = true)
+public abstract class AbstractDistributedP4RuntimeMirror
+        <H extends PiHandle, E extends PiEntity>
+        implements P4RuntimeMirror<H, E> {
+
+    private final Logger log = getLogger(getClass());
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    private StorageService storageService;
+
+    private EventuallyConsistentMap<H, TimedEntry<E>> mirrorMap;
+
+    @Activate
+    public void activate() {
+        mirrorMap = storageService
+                .<H, TimedEntry<E>>eventuallyConsistentMapBuilder()
+                .withName(mapName())
+                .withSerializer(storeSerializer())
+                .withTimestampProvider((k, v) -> new WallClockTimestamp())
+                .build();
+        log.info("Started");
+    }
+
+    abstract String mapName();
+
+    abstract KryoNamespace storeSerializer();
+
+    @Deactivate
+    public void deactivate() {
+        mirrorMap = null;
+        log.info("Stopped");
+    }
+
+    @Override
+    public Collection<TimedEntry<E>> getAll(DeviceId deviceId) {
+        checkNotNull(deviceId);
+        return mirrorMap.entrySet().stream()
+                .filter(entry -> entry.getKey().deviceId().equals(deviceId))
+                .map(Map.Entry::getValue)
+                .collect(Collectors.toList());
+    }
+
+    @Override
+    public TimedEntry<E> get(H handle) {
+        checkNotNull(handle);
+        return mirrorMap.get(handle);
+    }
+
+    @Override
+    public void put(H handle, E entry) {
+        checkNotNull(handle);
+        checkNotNull(entry);
+        final long now = new WallClockTimestamp().unixTimestamp();
+        final TimedEntry<E> timedEntry = new TimedEntry<>(now, entry);
+        mirrorMap.put(handle, timedEntry);
+    }
+
+    @Override
+    public void remove(H handle) {
+        checkNotNull(handle);
+        mirrorMap.remove(handle);
+    }
+
+}
diff --git a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/mirror/DistributedP4RuntimeTableMirror.java b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/mirror/DistributedP4RuntimeTableMirror.java
new file mode 100644
index 0000000..f37cf44
--- /dev/null
+++ b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/mirror/DistributedP4RuntimeTableMirror.java
@@ -0,0 +1,50 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.drivers.p4runtime.mirror;
+
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Service;
+import org.onlab.util.KryoNamespace;
+import org.onosproject.net.pi.runtime.PiTableEntry;
+import org.onosproject.net.pi.runtime.PiTableEntryHandle;
+import org.onosproject.store.serializers.KryoNamespaces;
+
+/**
+ * Distributed implementation of a P4Runtime table mirror.
+ */
+@Component(immediate = true)
+@Service
+public final class DistributedP4RuntimeTableMirror
+        extends AbstractDistributedP4RuntimeMirror
+                        <PiTableEntryHandle, PiTableEntry>
+        implements P4RuntimeTableMirror {
+
+    private static final String DIST_MAP_NAME = "onos-p4runtime-table-mirror";
+
+    @Override
+    String mapName() {
+        return DIST_MAP_NAME;
+    }
+
+    @Override
+    KryoNamespace storeSerializer() {
+        return KryoNamespace.newBuilder()
+                .register(KryoNamespaces.API)
+                .register(TimedEntry.class)
+                .build();
+    }
+}
diff --git a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/mirror/P4RuntimeMirror.java b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/mirror/P4RuntimeMirror.java
new file mode 100644
index 0000000..ab18c9d
--- /dev/null
+++ b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/mirror/P4RuntimeMirror.java
@@ -0,0 +1,74 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.drivers.p4runtime.mirror;
+
+import com.google.common.annotations.Beta;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.pi.runtime.PiEntity;
+import org.onosproject.net.pi.runtime.PiHandle;
+
+import java.util.Collection;
+
+/**
+ * Service to keep track of the device state for a given class of PI entities.
+ * The need of this service comes from the fact that P4 Runtime makes a
+ * distinction between INSERT and MODIFY operations, while ONOS drivers use a
+ * more generic "APPLY" behaviour (i.e. ADD or UPDATE). When applying an entry,
+ * we need to know if another one with the same handle (e.g. table entry with
+ * same match key) is already on the device to decide between INSERT or MODIFY.
+ * Moreover, this service maintains a "timed" version of PI entities such that
+ * we can compute the life of the entity on the device.
+ *
+ * @param <H> Handle class
+ * @param <E> Entity class
+ */
+@Beta
+public interface P4RuntimeMirror
+        <H extends PiHandle, E extends PiEntity> {
+
+    /**
+     * Returns all entries for the given device ID.
+     *
+     * @param deviceId device ID
+     * @return collection of table entries
+     */
+    Collection<TimedEntry<E>> getAll(DeviceId deviceId);
+
+    /**
+     * Returns entry associated to the given handle, if present, otherwise
+     * null.
+     *
+     * @param handle handle
+     * @return PI table entry
+     */
+    TimedEntry<E> get(H handle);
+
+    /**
+     * Stores the given entry associating it to the given handle.
+     *
+     * @param handle handle
+     * @param entry  entry
+     */
+    void put(H handle, E entry);
+
+    /**
+     * Removes the entry associated to the given handle.
+     *
+     * @param handle handle
+     */
+    void remove(H handle);
+}
diff --git a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/mirror/P4RuntimeTableMirror.java b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/mirror/P4RuntimeTableMirror.java
new file mode 100644
index 0000000..318e2b0
--- /dev/null
+++ b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/mirror/P4RuntimeTableMirror.java
@@ -0,0 +1,27 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.drivers.p4runtime.mirror;
+
+import org.onosproject.net.pi.runtime.PiTableEntry;
+import org.onosproject.net.pi.runtime.PiTableEntryHandle;
+
+/**
+ * Mirror of table entries installed on a P4Runtime device.
+ */
+public interface P4RuntimeTableMirror
+        extends P4RuntimeMirror<PiTableEntryHandle, PiTableEntry> {
+}
diff --git a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/mirror/TimedEntry.java b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/mirror/TimedEntry.java
new file mode 100644
index 0000000..76b44a0
--- /dev/null
+++ b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/mirror/TimedEntry.java
@@ -0,0 +1,44 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.drivers.p4runtime.mirror;
+
+import org.onosproject.net.pi.runtime.PiEntity;
+import org.onosproject.store.service.WallClockTimestamp;
+
+public class TimedEntry<E extends PiEntity> {
+
+    private final long timestamp;
+    private final E entity;
+
+    TimedEntry(long timestamp, E entity) {
+        this.timestamp = timestamp;
+        this.entity = entity;
+    }
+
+    public long timestamp() {
+        return timestamp;
+    }
+
+    public E entry() {
+        return entity;
+    }
+
+    public long lifeSec() {
+        final long now = new WallClockTimestamp().unixTimestamp();
+        return (now - timestamp) / 1000;
+    }
+}
diff --git a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/mirror/package-info.java b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/mirror/package-info.java
new file mode 100644
index 0000000..d9b21d6
--- /dev/null
+++ b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/mirror/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * P4 Runtime device mirror.
+ */
+package org.onosproject.drivers.p4runtime.mirror;
diff --git a/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeFlowRuleWrapper.java b/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeFlowRuleWrapper.java
deleted file mode 100644
index 634116f..0000000
--- a/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeFlowRuleWrapper.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Copyright 2017-present Open Networking Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.onosproject.p4runtime.api;
-
-import com.google.common.annotations.Beta;
-import com.google.common.base.MoreObjects;
-import com.google.common.base.Objects;
-import org.onosproject.net.flow.FlowRule;
-import org.onosproject.net.pi.runtime.PiTableEntry;
-
-/**
- * A wrapper for a ONOS flow rule installed on a P4Runtime device.
- */
-@Beta
-public final class P4RuntimeFlowRuleWrapper {
-
-    private final FlowRule rule;
-    private final PiTableEntry piTableEntry;
-    private final long installedOnMillis;
-
-    /**
-     * Creates a new flow rule wrapper.
-     *
-     * @param rule              a flow rule
-     * @param piTableEntry      PI table entry
-     * @param installedOnMillis the time (in milliseconds, since January 1, 1970 UTC) when the flow rule was installed
-     *                          on the device
-     */
-    public P4RuntimeFlowRuleWrapper(FlowRule rule, PiTableEntry piTableEntry, long installedOnMillis) {
-        this.rule = rule;
-        this.piTableEntry = piTableEntry;
-        this.installedOnMillis = installedOnMillis;
-    }
-
-    /**
-     * Returns the flow rule contained by this wrapper.
-     *
-     * @return a flow rule
-     */
-    public FlowRule rule() {
-        return rule;
-    }
-
-    /**
-     * Returns the PI table entry defined by this wrapper.
-     *
-     * @return table entry
-     */
-    public PiTableEntry piTableEntry() {
-        return piTableEntry;
-    }
-
-    /**
-     * Return the number of seconds since when this flow rule was installed on the device.
-     *
-     * @return an integer value
-     */
-    public long lifeInSeconds() {
-        return (System.currentTimeMillis() - installedOnMillis) / 1000;
-    }
-
-    @Override
-    public int hashCode() {
-        return Objects.hashCode(rule, installedOnMillis);
-    }
-
-    @Override
-    public boolean equals(Object obj) {
-        if (this == obj) {
-            return true;
-        }
-        if (obj == null || getClass() != obj.getClass()) {
-            return false;
-        }
-        final P4RuntimeFlowRuleWrapper other = (P4RuntimeFlowRuleWrapper) obj;
-        return Objects.equal(this.rule, other.rule)
-                && Objects.equal(this.installedOnMillis, other.installedOnMillis);
-    }
-
-    @Override
-    public String toString() {
-        return MoreObjects.toStringHelper(this)
-                .add("rule", rule)
-                .add("installedOnMillis", installedOnMillis)
-                .toString();
-    }
-}
diff --git a/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeTableEntryReference.java b/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeTableEntryReference.java
deleted file mode 100644
index d345f15..0000000
--- a/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeTableEntryReference.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Copyright 2017-present Open Networking Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.onosproject.p4runtime.api;
-
-import com.google.common.base.MoreObjects;
-import com.google.common.base.Objects;
-import org.onosproject.net.DeviceId;
-import org.onosproject.net.pi.model.PiTableId;
-import org.onosproject.net.pi.runtime.PiMatchKey;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-/**
- * Class containing the reference for a table entry in P4Runtime.
- */
-public final class P4RuntimeTableEntryReference {
-
-    private final DeviceId deviceId;
-    private final PiTableId tableId;
-    private final PiMatchKey matchKey;
-
-    /**
-     * Creates a new table entry reference.
-     *
-     * @param deviceId a device ID
-     * @param tableId  a table name
-     * @param matchKey a match key
-     */
-    public P4RuntimeTableEntryReference(DeviceId deviceId, PiTableId tableId, PiMatchKey matchKey) {
-        this.deviceId = checkNotNull(deviceId);
-        this.tableId = checkNotNull(tableId);
-        this.matchKey = checkNotNull(matchKey);
-    }
-
-    /**
-     * Returns the device ID of this table entry reference.
-     *
-     * @return a device ID
-     */
-    public DeviceId deviceId() {
-        return deviceId;
-    }
-
-    /**
-     * Returns the table id of this table entry reference.
-     *
-     * @return a table name
-     */
-    public PiTableId tableId() {
-        return tableId;
-    }
-
-    /**
-     * Returns the match key of this table entry reference.
-     *
-     * @return a match key
-     */
-    public PiMatchKey matchKey() {
-        return matchKey;
-    }
-
-    @Override
-    public int hashCode() {
-        return Objects.hashCode(deviceId, tableId, matchKey);
-    }
-
-    @Override
-    public boolean equals(Object obj) {
-        if (this == obj) {
-            return true;
-        }
-        if (obj == null || getClass() != obj.getClass()) {
-            return false;
-        }
-        final P4RuntimeTableEntryReference other = (P4RuntimeTableEntryReference) obj;
-        return Objects.equal(this.deviceId, other.deviceId)
-                && Objects.equal(this.tableId, other.tableId)
-                && Objects.equal(this.matchKey, other.matchKey);
-    }
-
-    @Override
-    public String toString() {
-        return MoreObjects.toStringHelper(this)
-                .add("deviceId", deviceId)
-                .add("tableId", tableId)
-                .add("matchKey", matchKey)
-                .toString();
-    }
-}