[ONOS-7731] Add implementation of openstack vtap store and criterion

Change-Id: I7f41652f127038af9d3f79b34d427d28ce162d50
diff --git a/apps/openstackvtap/app/src/main/java/org/onosproject/openstackvtap/impl/DistributedOpenstackVtapStore.java b/apps/openstackvtap/app/src/main/java/org/onosproject/openstackvtap/impl/DistributedOpenstackVtapStore.java
new file mode 100644
index 0000000..d6f63eb
--- /dev/null
+++ b/apps/openstackvtap/app/src/main/java/org/onosproject/openstackvtap/impl/DistributedOpenstackVtapStore.java
@@ -0,0 +1,570 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.openstackvtap.impl;
+
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.apache.felix.scr.annotations.Service;
+import org.onlab.util.KryoNamespace;
+import org.onosproject.net.DefaultAnnotations;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.SparseAnnotations;
+import org.onosproject.openstackvtap.api.OpenstackVtap;
+import org.onosproject.openstackvtap.api.OpenstackVtapCriterion;
+import org.onosproject.openstackvtap.api.OpenstackVtapEvent;
+import org.onosproject.openstackvtap.api.OpenstackVtapId;
+import org.onosproject.openstackvtap.api.OpenstackVtapStore;
+import org.onosproject.openstackvtap.api.OpenstackVtapStoreDelegate;
+import org.onosproject.store.AbstractStore;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.ConsistentMap;
+import org.onosproject.store.service.DistributedPrimitive.Status;
+import org.onosproject.store.service.MapEvent;
+import org.onosproject.store.service.MapEventListener;
+import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.StorageService;
+import org.slf4j.Logger;
+
+import java.util.Comparator;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
+import static org.onlab.util.Tools.groupedThreads;
+import static org.onosproject.net.DefaultAnnotations.merge;
+import static org.slf4j.LoggerFactory.getLogger;
+
+/**
+ * Manages the inventory of users using a {@code ConsistentMap}.
+ */
+@Component(immediate = true)
+@Service
+public class DistributedOpenstackVtapStore
+        extends AbstractStore<OpenstackVtapEvent, OpenstackVtapStoreDelegate>
+        implements OpenstackVtapStore {
+    private final Logger log = getLogger(getClass());
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected StorageService storageService;
+
+    private ConsistentMap<OpenstackVtapId, DefaultOpenstackVtap> vTapConsistentMap;
+    private MapEventListener<OpenstackVtapId, DefaultOpenstackVtap>
+                                            vTapListener = new VTapEventListener();
+    private Map<OpenstackVtapId, DefaultOpenstackVtap> vTapMap;
+
+    private static final Serializer SERIALIZER = Serializer
+            .using(new KryoNamespace.Builder().register(KryoNamespaces.API)
+                    .register(OpenstackVtapId.class)
+                    .register(UUID.class)
+                    .register(DefaultOpenstackVtap.class)
+                    .register(OpenstackVtap.Type.class)
+                    .register(DefaultOpenstackVtapCriterion.class)
+                    .nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID)
+                    .build());
+
+    private Map<DeviceId, Set<OpenstackVtapId>>
+                                    vTapIdsByTxDeviceId = Maps.newConcurrentMap();
+    private Map<DeviceId, Set<OpenstackVtapId>>
+                                    vTapIdsByRxDeviceId = Maps.newConcurrentMap();
+
+    private ScheduledExecutorService eventExecutor;
+
+    private Consumer<Status> vTapStatusListener;
+
+    public static final String INVALID_DESCRIPTION = "Invalid create/update parameter";
+
+    @Activate
+    public void activate() {
+        eventExecutor = newSingleThreadScheduledExecutor(
+                groupedThreads(this.getClass().getSimpleName(), "event-handler", log));
+
+        vTapConsistentMap = storageService.<OpenstackVtapId, DefaultOpenstackVtap>
+                consistentMapBuilder()
+                .withName("vTapMap")
+                .withSerializer(SERIALIZER)
+                .build();
+
+        vTapMap = vTapConsistentMap.asJavaMap();
+        vTapConsistentMap.addListener(vTapListener);
+
+        vTapStatusListener = status -> {
+            if (status == Status.ACTIVE) {
+                eventExecutor.execute(this::loadVTapIds);
+            }
+        };
+        vTapConsistentMap.addStatusChangeListener(vTapStatusListener);
+
+        log.info("Started {} - {}", this.getClass().getSimpleName());
+    }
+
+    @Deactivate
+    public void deactivate() {
+        vTapConsistentMap.removeStatusChangeListener(vTapStatusListener);
+        vTapConsistentMap.removeListener(vTapListener);
+        eventExecutor.shutdown();
+
+        log.info("Stopped {} - {}", this.getClass().getSimpleName());
+    }
+
+    private void loadVTapIds() {
+        vTapIdsByTxDeviceId.clear();
+        vTapIdsByRxDeviceId.clear();
+        vTapMap.values().forEach(vTap -> refreshDeviceIdsByVtap(null, vTap));
+    }
+
+    private boolean shouldUpdate(DefaultOpenstackVtap existing,
+                                 OpenstackVtap description,
+                                 boolean replaceDevices) {
+        if (existing == null) {
+            return true;
+        }
+
+        if ((description.type() != null && !description.type().equals(existing.type()))
+                || (description.vTapCriterion() != null &&
+                !description.vTapCriterion().equals(existing.vTapCriterion()))) {
+            return true;
+        }
+
+        if (description.txDeviceIds() != null) {
+            if (replaceDevices) {
+                if (!existing.txDeviceIds().equals(description.txDeviceIds())) {
+                    return true;
+                }
+            } else {
+                if (!existing.txDeviceIds().containsAll(description.txDeviceIds())) {
+                    return true;
+                }
+            }
+        }
+
+        if (description.rxDeviceIds() != null) {
+            if (replaceDevices) {
+                if (!existing.rxDeviceIds().equals(description.rxDeviceIds())) {
+                    return true;
+                }
+            } else {
+                if (!existing.rxDeviceIds().containsAll(description.rxDeviceIds())) {
+                    return true;
+                }
+            }
+        }
+
+        // check to see if any of the annotations provided by vTap
+        // differ from those in the existing vTap
+        return description.annotations().keys().stream()
+                .anyMatch(k -> !Objects.equals(description.annotations().value(k),
+                        existing.annotations().value(k)));
+    }
+
+    @Override
+    public OpenstackVtap createOrUpdateVtap(OpenstackVtapId vTapId,
+                                            OpenstackVtap description,
+                                            boolean replaceFlag) {
+
+        return vTapMap.compute(vTapId, (id, existing) -> {
+            if (existing == null &&
+                    (description.type() == null ||
+                     description.vTapCriterion() == null ||
+                     description.txDeviceIds() == null ||
+                     description.rxDeviceIds() == null)) {
+                checkState(false, INVALID_DESCRIPTION);
+                return null;
+            }
+
+            if (shouldUpdate(existing, description, replaceFlag)) {
+                // Replace items
+                OpenstackVtap.Type type =
+                        (description.type() == null ? existing.type() : description.type());
+                OpenstackVtapCriterion vTapCriterion =
+                        (description.vTapCriterion() == null ?
+                        existing.vTapCriterion() : description.vTapCriterion());
+
+                // Replace or add devices
+                Set<DeviceId> txDeviceIds;
+                if (description.txDeviceIds() == null) {
+                    txDeviceIds = existing.txDeviceIds();
+                } else {
+                    if (existing == null || replaceFlag) {
+                        txDeviceIds = ImmutableSet.copyOf(description.txDeviceIds());
+                    } else {
+                        txDeviceIds = Sets.newHashSet(existing.txDeviceIds());
+                        txDeviceIds.addAll(description.txDeviceIds());
+                    }
+                }
+
+                Set<DeviceId> rxDeviceIds;
+                if (description.rxDeviceIds() == null) {
+                    rxDeviceIds = existing.rxDeviceIds();
+                } else {
+                    if (existing == null || replaceFlag) {
+                        rxDeviceIds = ImmutableSet.copyOf(description.rxDeviceIds());
+                    } else {
+                        rxDeviceIds = Sets.newHashSet(existing.rxDeviceIds());
+                        rxDeviceIds.addAll(description.rxDeviceIds());
+                    }
+                }
+
+                // Replace or add annotations
+                SparseAnnotations annotations;
+                if (existing != null) {
+                    annotations = merge((DefaultAnnotations) existing.annotations(),
+                            (SparseAnnotations) description.annotations());
+                } else {
+                    annotations = (SparseAnnotations) description.annotations();
+                }
+
+                // Make new changed vTap and return
+                return DefaultOpenstackVtap.builder()
+                        .id(vTapId)
+                        .type(type)
+                        .vTapCriterion(vTapCriterion)
+                        .txDeviceIds(txDeviceIds)
+                        .rxDeviceIds(rxDeviceIds)
+                        .annotations(annotations)
+                        .build();
+            }
+            return existing;
+        });
+    }
+
+    @Override
+    public OpenstackVtap removeVtapById(OpenstackVtapId vTapId) {
+        return vTapMap.remove(vTapId);
+    }
+
+    @Override
+    public boolean addDeviceToVtap(OpenstackVtapId vTapId,
+                                   OpenstackVtap.Type type,
+                                   DeviceId deviceId) {
+        checkNotNull(vTapId);
+        checkNotNull(deviceId);
+
+        OpenstackVtap vTap = vTapMap.compute(vTapId, (id, existing) -> {
+            if (existing == null) {
+                return null;
+            }
+            if (!existing.type().isValid(type)) {
+                log.error("Not valid OpenstackVtap type {} for requested type {}",
+                        existing.type(), type);
+                return existing;
+            }
+
+            Set<DeviceId> txDeviceIds = null;
+            if (type.isValid(OpenstackVtap.Type.VTAP_TX) &&
+                    !existing.txDeviceIds().contains(deviceId)) {
+                txDeviceIds = Sets.newHashSet(existing.txDeviceIds());
+                txDeviceIds.add(deviceId);
+            }
+
+            Set<DeviceId> rxDeviceIds = null;
+            if (type.isValid(OpenstackVtap.Type.VTAP_RX) &&
+                    !existing.rxDeviceIds().contains(deviceId)) {
+                rxDeviceIds = Sets.newHashSet(existing.rxDeviceIds());
+                rxDeviceIds.add(deviceId);
+            }
+
+            if (txDeviceIds != null || rxDeviceIds != null) {
+                //updateVTapIdFromDeviceId(existing.id(), deviceId);    // execute from event listener
+
+                return DefaultOpenstackVtap.builder()
+                        .id(vTapId)
+                        .type(existing.type())
+                        .vTapCriterion(existing.vTapCriterion())
+                        .txDeviceIds(txDeviceIds != null ? txDeviceIds : existing.txDeviceIds())
+                        .rxDeviceIds(rxDeviceIds != null ? rxDeviceIds : existing.rxDeviceIds())
+                        .annotations(existing.annotations())
+                        .build();
+            }
+            return existing;
+        });
+        return (vTap != null);
+    }
+
+    @Override
+    public boolean removeDeviceFromVtap(OpenstackVtapId vTapId,
+                                        OpenstackVtap.Type type,
+                                        DeviceId deviceId) {
+        checkNotNull(vTapId);
+        checkNotNull(deviceId);
+
+        OpenstackVtap vTap = vTapMap.compute(vTapId, (id, existing) -> {
+            if (existing == null) {
+                return null;
+            }
+            if (!existing.type().isValid(type)) {
+                log.error("Not valid OpenstackVtap type {} for requested type {}",
+                        existing.type(), type);
+                return existing;
+            }
+
+            Set<DeviceId> txDeviceIds = null;
+            if (type.isValid(OpenstackVtap.Type.VTAP_TX) &&
+                    existing.txDeviceIds().contains(deviceId)) {
+                txDeviceIds = Sets.newHashSet(existing.txDeviceIds());
+                txDeviceIds.remove(deviceId);
+            }
+
+            Set<DeviceId> rxDeviceIds = null;
+            if (type.isValid(OpenstackVtap.Type.VTAP_RX) &&
+                    existing.rxDeviceIds().contains(deviceId)) {
+                rxDeviceIds = Sets.newHashSet(existing.rxDeviceIds());
+                rxDeviceIds.remove(deviceId);
+            }
+
+            if (txDeviceIds != null || rxDeviceIds != null) {
+                //removeVTapIdFromDeviceId(existing.id(), deviceId);    // execute from event listener
+
+                return DefaultOpenstackVtap.builder()
+                        .id(vTapId)
+                        .type(existing.type())
+                        .vTapCriterion(existing.vTapCriterion())
+                        .txDeviceIds(txDeviceIds != null ? txDeviceIds : existing.txDeviceIds())
+                        .rxDeviceIds(rxDeviceIds != null ? rxDeviceIds : existing.rxDeviceIds())
+                        .annotations(existing.annotations())
+                        .build();
+            }
+            return existing;
+        });
+        return (vTap != null);
+    }
+
+    @Override
+    public boolean updateDeviceForVtap(OpenstackVtapId vTapId,
+                                       Set<DeviceId> txDeviceIds, Set<DeviceId> rxDeviceIds,
+                                       boolean replaceDevices) {
+        checkNotNull(vTapId);
+        checkNotNull(txDeviceIds);
+        checkNotNull(rxDeviceIds);
+
+        OpenstackVtap vTap = vTapMap.compute(vTapId, (id, existing) -> {
+            if (existing == null) {
+                return null;
+            }
+
+            // Replace or add devices
+            Set<DeviceId> txDS = null;
+            if (replaceDevices) {
+                if (!existing.txDeviceIds().equals(txDeviceIds)) {
+                    txDS = ImmutableSet.copyOf(txDeviceIds);
+                }
+            } else {
+                if (!existing.txDeviceIds().containsAll(txDeviceIds)) {
+                    txDS = Sets.newHashSet(existing.txDeviceIds());
+                    txDS.addAll(txDeviceIds);
+                }
+            }
+
+            Set<DeviceId> rxDS = null;
+            if (replaceDevices) {
+                if (!existing.rxDeviceIds().equals(rxDeviceIds)) {
+                    rxDS = ImmutableSet.copyOf(rxDeviceIds);
+                }
+            } else {
+                if (!existing.rxDeviceIds().containsAll(rxDeviceIds)) {
+                    rxDS = Sets.newHashSet(existing.rxDeviceIds());
+                    rxDS.addAll(rxDeviceIds);
+                }
+            }
+
+            if (txDS != null || rxDS != null) {
+
+                return DefaultOpenstackVtap.builder()
+                        .id(vTapId)
+                        .type(existing.type())
+                        .vTapCriterion(existing.vTapCriterion())
+                        .txDeviceIds(txDS != null ? txDS : existing.txDeviceIds())
+                        .rxDeviceIds(rxDS != null ? rxDS : existing.rxDeviceIds())
+                        .annotations(existing.annotations())
+                        .build();
+            }
+            return existing;
+        });
+        return (vTap != null);
+    }
+
+    @Override
+    public int getVtapCount(OpenstackVtap.Type type) {
+        return (int) vTapMap.values().parallelStream()
+                .filter(vTap -> vTap.type().isValid(type))
+                .count();
+    }
+
+    @Override
+    public Set<OpenstackVtap> getVtaps(OpenstackVtap.Type type) {
+        return ImmutableSet.copyOf(
+                vTapMap.values().parallelStream()
+                        .filter(vTap -> vTap.type().isValid(type))
+                        .collect(Collectors.toSet()));
+    }
+
+    @Override
+    public OpenstackVtap getVtap(OpenstackVtapId vTapId) {
+        return vTapMap.get(vTapId);
+    }
+
+    @Override
+    public Set<OpenstackVtap> getVtapsByDeviceId(OpenstackVtap.Type type,
+                                                 DeviceId deviceId) {
+        Set<OpenstackVtapId> vtapIds = Sets.newHashSet();
+        if (type.isValid(OpenstackVtap.Type.VTAP_TX)) {
+            vtapIds.addAll(vTapIdsByTxDeviceId.get(deviceId));
+        }
+        if (type.isValid(OpenstackVtap.Type.VTAP_RX)) {
+            vtapIds.addAll(vTapIdsByRxDeviceId.get(deviceId));
+        }
+
+        return ImmutableSet.copyOf(
+                vtapIds.parallelStream()
+                        .map(vTapId -> vTapMap.get(vTapId))
+                        .filter(Objects::nonNull)
+                        .collect(Collectors.toSet()));
+    }
+
+    private class VTapComparator implements Comparator<OpenstackVtap> {
+        @Override
+        public int compare(OpenstackVtap v1, OpenstackVtap v2) {
+            int diff = (v2.type().compareTo(v1.type()));
+            if (diff == 0) {
+                return (v2.vTapCriterion().ipProtocol() - v1.vTapCriterion().ipProtocol());
+            }
+            return diff;
+        }
+    }
+
+    private static Set<OpenstackVtapId> addVTapIds(OpenstackVtapId vTapId) {
+        Set<OpenstackVtapId> vtapIds = Sets.newConcurrentHashSet();
+        vtapIds.add(vTapId);
+        return vtapIds;
+    }
+
+    private static Set<OpenstackVtapId> updateVTapIds(Set<OpenstackVtapId> existingVtapIds,
+                                                      OpenstackVtapId vTapId) {
+        existingVtapIds.add(vTapId);
+        return existingVtapIds;
+    }
+
+    private static Set<OpenstackVtapId> removeVTapIds(Set<OpenstackVtapId> existingVtapIds,
+                                                      OpenstackVtapId vTapId) {
+        existingVtapIds.remove(vTapId);
+        if (existingVtapIds.isEmpty()) {
+            return null;
+        }
+        return existingVtapIds;
+    }
+
+    private void updateVTapIdFromTxDeviceId(OpenstackVtapId vTapId, DeviceId deviceId) {
+        vTapIdsByTxDeviceId.compute(deviceId, (k, v) -> v == null ?
+                addVTapIds(vTapId) : updateVTapIds(v, vTapId));
+    }
+
+    private void removeVTapIdFromTxDeviceId(OpenstackVtapId vTapId, DeviceId deviceId) {
+        vTapIdsByTxDeviceId.computeIfPresent(deviceId, (k, v) -> removeVTapIds(v, vTapId));
+    }
+
+    private void updateVTapIdFromRxDeviceId(OpenstackVtapId vTapId, DeviceId deviceId) {
+        vTapIdsByRxDeviceId.compute(deviceId, (k, v) -> v == null ?
+                addVTapIds(vTapId) : updateVTapIds(v, vTapId));
+    }
+
+    private void removeVTapIdFromRxDeviceId(OpenstackVtapId vTapId, DeviceId deviceId) {
+        vTapIdsByRxDeviceId.computeIfPresent(deviceId, (k, v) -> removeVTapIds(v, vTapId));
+    }
+
+    private void refreshDeviceIdsByVtap(OpenstackVtap oldOpenstackVtap,
+                                        OpenstackVtap newOpenstackVtap) {
+        if (oldOpenstackVtap != null) {
+            Set<DeviceId> removeDeviceIds;
+
+            // Remove TX vTap
+            removeDeviceIds = (newOpenstackVtap != null) ?
+                    Sets.difference(oldOpenstackVtap.txDeviceIds(),
+                            newOpenstackVtap.txDeviceIds()) : oldOpenstackVtap.txDeviceIds();
+            removeDeviceIds.forEach(id -> removeVTapIdFromTxDeviceId(oldOpenstackVtap.id(), id));
+
+            // Remove RX vTap
+            removeDeviceIds = (newOpenstackVtap != null) ?
+                    Sets.difference(oldOpenstackVtap.rxDeviceIds(),
+                            newOpenstackVtap.rxDeviceIds()) : oldOpenstackVtap.rxDeviceIds();
+            removeDeviceIds.forEach(id -> removeVTapIdFromRxDeviceId(oldOpenstackVtap.id(), id));
+        }
+
+        if (newOpenstackVtap != null) {
+            Set<DeviceId> addDeviceIds;
+
+            // Add TX vTap
+            addDeviceIds = (oldOpenstackVtap != null) ?
+                    Sets.difference(newOpenstackVtap.txDeviceIds(),
+                            oldOpenstackVtap.txDeviceIds()) : newOpenstackVtap.txDeviceIds();
+            addDeviceIds.forEach(id -> updateVTapIdFromTxDeviceId(newOpenstackVtap.id(), id));
+
+            // Add RX vTap
+            addDeviceIds = (oldOpenstackVtap != null) ?
+                    Sets.difference(newOpenstackVtap.rxDeviceIds(),
+                            oldOpenstackVtap.rxDeviceIds()) : newOpenstackVtap.rxDeviceIds();
+            addDeviceIds.forEach(id -> updateVTapIdFromRxDeviceId(newOpenstackVtap.id(), id));
+        }
+    }
+
+    private class VTapEventListener
+            implements MapEventListener<OpenstackVtapId, DefaultOpenstackVtap> {
+        @Override
+        public void event(MapEvent<OpenstackVtapId, DefaultOpenstackVtap> event) {
+            DefaultOpenstackVtap newValue =
+                    event.newValue() != null ? event.newValue().value() : null;
+            DefaultOpenstackVtap oldValue =
+                    event.oldValue() != null ? event.oldValue().value() : null;
+
+            log.debug("VTapEventListener {} -> {}, {}", event.type(), oldValue, newValue);
+            switch (event.type()) {
+                case INSERT:
+                    refreshDeviceIdsByVtap(oldValue, newValue);
+                    notifyDelegate(new OpenstackVtapEvent(
+                            OpenstackVtapEvent.Type.VTAP_ADDED, newValue));
+                    break;
+
+                case UPDATE:
+                    if (!Objects.equals(newValue, oldValue)) {
+                        refreshDeviceIdsByVtap(oldValue, newValue);
+                        notifyDelegate(new OpenstackVtapEvent(
+                                OpenstackVtapEvent.Type.VTAP_UPDATED, newValue, oldValue));
+                    }
+                    break;
+
+                case REMOVE:
+                    refreshDeviceIdsByVtap(oldValue, newValue);
+                    notifyDelegate(new OpenstackVtapEvent(
+                            OpenstackVtapEvent.Type.VTAP_REMOVED, oldValue));
+                    break;
+
+                default:
+                    log.warn("Unknown map event type: {}", event.type());
+            }
+        }
+    }
+}