ONOS-6096 initial impl of dist. virtual packet store; virtual PacketRequest CLI; PacketRequest codec

Change-Id: Iea0a159a977701685c4487e806b26c85a1fcc1a5
diff --git a/incubator/store/src/main/java/org/onosproject/incubator/store/virtual/impl/DistributedVirtualPacketStore.java b/incubator/store/src/main/java/org/onosproject/incubator/store/virtual/impl/DistributedVirtualPacketStore.java
new file mode 100644
index 0000000..18a8cc3
--- /dev/null
+++ b/incubator/store/src/main/java/org/onosproject/incubator/store/virtual/impl/DistributedVirtualPacketStore.java
@@ -0,0 +1,414 @@
+/*
+ * Copyright 2017-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.incubator.store.virtual.impl;
+
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Modified;
+import org.apache.felix.scr.annotations.Property;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.apache.felix.scr.annotations.Service;
+import org.onlab.util.KryoNamespace;
+import org.onosproject.cfg.ComponentConfigService;
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.NodeId;
+import org.onosproject.incubator.net.virtual.NetworkId;
+import org.onosproject.incubator.net.virtual.VirtualNetworkPacketStore;
+import org.onosproject.mastership.MastershipService;
+import org.onosproject.net.flow.TrafficSelector;
+import org.onosproject.net.packet.OutboundPacket;
+import org.onosproject.net.packet.PacketEvent;
+import org.onosproject.net.packet.PacketPriority;
+import org.onosproject.net.packet.PacketRequest;
+import org.onosproject.net.packet.PacketStoreDelegate;
+import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
+import org.onosproject.store.cluster.messaging.MessageSubject;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.serializers.StoreSerializer;
+import org.onosproject.store.service.ConsistentMap;
+import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.StorageService;
+import org.osgi.service.component.ComponentContext;
+import org.slf4j.Logger;
+
+import java.util.Dictionary;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Strings.isNullOrEmpty;
+import static java.util.concurrent.Executors.newFixedThreadPool;
+import static org.onlab.util.Tools.get;
+import static org.onlab.util.Tools.groupedThreads;
+import static org.onosproject.net.packet.PacketEvent.Type.EMIT;
+import static org.slf4j.LoggerFactory.getLogger;
+
+/**
+ * Distributed virtual packet store implementation allowing packets to be sent to
+ * remote instances.  Implementation is based on DistributedPacketStore class.
+ */
+@Component(immediate = true, enabled = false)
+@Service
+public class DistributedVirtualPacketStore
+        extends AbstractVirtualStore<PacketEvent, PacketStoreDelegate>
+        implements VirtualNetworkPacketStore {
+
+    private final Logger log = getLogger(getClass());
+
+    private static final String FORMAT = "Setting: messageHandlerThreadPoolSize={}";
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected MastershipService mastershipService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected ClusterService clusterService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected ClusterCommunicationService communicationService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected StorageService storageService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected ComponentConfigService cfgService;
+
+    private PacketRequestTracker tracker;
+
+    private static final MessageSubject PACKET_OUT_SUBJECT =
+            new MessageSubject("virtual-packet-out");
+
+    private static final StoreSerializer SERIALIZER = StoreSerializer.using(KryoNamespaces.API);
+
+    private ExecutorService messageHandlingExecutor;
+
+    private static final int DEFAULT_MESSAGE_HANDLER_THREAD_POOL_SIZE = 4;
+    @Property(name = "messageHandlerThreadPoolSize", intValue = DEFAULT_MESSAGE_HANDLER_THREAD_POOL_SIZE,
+            label = "Size of thread pool to assign message handler")
+    private static int messageHandlerThreadPoolSize = DEFAULT_MESSAGE_HANDLER_THREAD_POOL_SIZE;
+
+    @Activate
+    public void activate(ComponentContext context) {
+        cfgService.registerProperties(getClass());
+
+        modified(context);
+
+        messageHandlingExecutor = Executors.newFixedThreadPool(
+                messageHandlerThreadPoolSize,
+                groupedThreads("onos/store/packet", "message-handlers", log));
+
+        communicationService.<OutboundPacketWrapper>addSubscriber(PACKET_OUT_SUBJECT,
+                SERIALIZER::decode,
+                packetWrapper -> notifyDelegate(packetWrapper.networkId,
+                                                new PacketEvent(EMIT,
+                                                                packetWrapper.outboundPacket)),
+                messageHandlingExecutor);
+
+        tracker = new PacketRequestTracker();
+
+        log.info("Started");
+    }
+
+    @Deactivate
+    public void deactivate() {
+        cfgService.unregisterProperties(getClass(), false);
+        communicationService.removeSubscriber(PACKET_OUT_SUBJECT);
+        messageHandlingExecutor.shutdown();
+        tracker = null;
+        log.info("Stopped");
+    }
+
+    @Modified
+    public void  modified(ComponentContext context) {
+        Dictionary<?, ?> properties = context != null ? context.getProperties() : new Properties();
+
+        int newMessageHandlerThreadPoolSize;
+
+        try {
+            String s = get(properties, "messageHandlerThreadPoolSize");
+
+            newMessageHandlerThreadPoolSize =
+                    isNullOrEmpty(s) ? messageHandlerThreadPoolSize : Integer.parseInt(s.trim());
+
+        } catch (NumberFormatException e) {
+            log.warn(e.getMessage());
+            newMessageHandlerThreadPoolSize = messageHandlerThreadPoolSize;
+        }
+
+        // Any change in the following parameters implies thread pool restart
+        if (newMessageHandlerThreadPoolSize != messageHandlerThreadPoolSize) {
+            setMessageHandlerThreadPoolSize(newMessageHandlerThreadPoolSize);
+            restartMessageHandlerThreadPool();
+        }
+
+        log.info(FORMAT, messageHandlerThreadPoolSize);
+    }
+
+    @Override
+    public void emit(NetworkId networkId, OutboundPacket packet) {
+        NodeId myId = clusterService.getLocalNode().id();
+        // TODO revive this when there is MastershipService support for virtual devices
+//        NodeId master = mastershipService.getMasterFor(packet.sendThrough());
+//
+//        if (master == null) {
+//            log.warn("No master found for {}", packet.sendThrough());
+//            return;
+//        }
+//
+//        log.debug("master {} found for {}", myId, packet.sendThrough());
+//        if (myId.equals(master)) {
+//            notifyDelegate(networkId, new PacketEvent(EMIT, packet));
+//            return;
+//        }
+//
+//        communicationService.unicast(packet, PACKET_OUT_SUBJECT, SERIALIZER::encode, master)
+//                            .whenComplete((r, error) -> {
+//                                if (error != null) {
+//                                    log.warn("Failed to send packet-out to {}", master, error);
+//                                }
+//                            });
+    }
+
+    @Override
+    public void requestPackets(NetworkId networkId, PacketRequest request) {
+        tracker.add(networkId, request);
+
+    }
+
+    @Override
+    public void cancelPackets(NetworkId networkId, PacketRequest request) {
+        tracker.remove(networkId, request);
+    }
+
+    @Override
+    public List<PacketRequest> existingRequests(NetworkId networkId) {
+        return tracker.requests(networkId);
+    }
+
+    private final class PacketRequestTracker {
+
+        private ConsistentMap<NetworkId, Map<RequestKey, Set<PacketRequest>>> distRequests;
+        private Map<NetworkId, Map<RequestKey, Set<PacketRequest>>> requests;
+
+        private PacketRequestTracker() {
+            distRequests = storageService.<NetworkId, Map<RequestKey, Set<PacketRequest>>>consistentMapBuilder()
+                    .withName("onos-virtual-packet-requests")
+                    .withSerializer(Serializer.using(KryoNamespace.newBuilder()
+                            .register(KryoNamespaces.API)
+                            .register(RequestKey.class)
+                            .register(NetworkId.class)
+                            .build()))
+                    .build();
+            requests = distRequests.asJavaMap();
+        }
+
+        private void add(NetworkId networkId, PacketRequest request) {
+            AtomicBoolean firstRequest = addInternal(networkId, request);
+            PacketStoreDelegate delegate = delegateMap.get(networkId);
+            if (firstRequest.get() && delegate != null) {
+                // The instance that makes the first request will push to all devices
+                delegate.requestPackets(request);
+            }
+        }
+
+        private AtomicBoolean addInternal(NetworkId networkId, PacketRequest request) {
+            AtomicBoolean firstRequest = new AtomicBoolean(false);
+            AtomicBoolean changed = new AtomicBoolean(true);
+            Map<RequestKey, Set<PacketRequest>> requestsForNetwork = getMap(networkId);
+            requestsForNetwork.compute(key(request), (s, existingRequests) -> {
+                // Reset to false just in case this is a retry due to
+                // ConcurrentModificationException
+                firstRequest.set(false);
+                if (existingRequests == null) {
+                    firstRequest.set(true);
+                    return ImmutableSet.of(request);
+                } else if (!existingRequests.contains(request)) {
+                    firstRequest.set(true);
+                    return ImmutableSet.<PacketRequest>builder()
+                                       .addAll(existingRequests)
+                                       .add(request)
+                                       .build();
+                } else {
+                    changed.set(false);
+                    return existingRequests;
+                }
+            });
+            if (changed.get()) {
+                requests.put(networkId, requestsForNetwork);
+            }
+            return firstRequest;
+        }
+
+        private void remove(NetworkId networkId, PacketRequest request) {
+            AtomicBoolean removedLast = removeInternal(networkId, request);
+            PacketStoreDelegate delegate = delegateMap.get(networkId);
+            if (removedLast.get() && delegate != null) {
+                // The instance that removes the last request will remove from all devices
+                delegate.cancelPackets(request);
+            }
+        }
+
+        private AtomicBoolean removeInternal(NetworkId networkId, PacketRequest request) {
+            AtomicBoolean removedLast = new AtomicBoolean(false);
+            AtomicBoolean changed = new AtomicBoolean(true);
+            Map<RequestKey, Set<PacketRequest>> requestsForNetwork = getMap(networkId);
+            requestsForNetwork.computeIfPresent(key(request), (s, existingRequests) -> {
+                // Reset to false just in case this is a retry due to
+                // ConcurrentModificationException
+                removedLast.set(false);
+                if (existingRequests.contains(request)) {
+                    Set<PacketRequest> newRequests = Sets.newHashSet(existingRequests);
+                    newRequests.remove(request);
+                    if (newRequests.size() > 0) {
+                        return ImmutableSet.copyOf(newRequests);
+                    } else {
+                        removedLast.set(true);
+                        return null;
+                    }
+                } else {
+                    changed.set(false);
+                    return existingRequests;
+                }
+            });
+            if (changed.get()) {
+                requests.put(networkId, requestsForNetwork);
+            }
+            return removedLast;
+        }
+
+        private List<PacketRequest> requests(NetworkId networkId) {
+            Map<RequestKey, Set<PacketRequest>> requestsForNetwork = getMap(networkId);
+            List<PacketRequest> list = Lists.newArrayList();
+            requestsForNetwork.values().forEach(v -> list.addAll(v));
+            list.sort((o1, o2) -> o1.priority().priorityValue() - o2.priority().priorityValue());
+            return list;
+        }
+
+        /*
+         * Gets PacketRequests for specified networkId.
+         */
+        private Map<RequestKey, Set<PacketRequest>> getMap(NetworkId networkId) {
+            return requests.computeIfAbsent(networkId, networkId1 -> {
+                        log.debug("Creating new map for {}", networkId1);
+                        Map newMap = Maps.newHashMap();
+                        return newMap;
+                    });
+        }
+    }
+
+    /**
+     * Creates a new request key from a packet request.
+     *
+     * @param request packet request
+     * @return request key
+     */
+    private static RequestKey key(PacketRequest request) {
+        return new RequestKey(request.selector(), request.priority());
+    }
+
+    /**
+     * Key of a packet request.
+     */
+    private static final class RequestKey {
+        private final TrafficSelector selector;
+        private final PacketPriority priority;
+
+        private RequestKey(TrafficSelector selector, PacketPriority priority) {
+            this.selector = selector;
+            this.priority = priority;
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(selector, priority);
+        }
+
+        @Override
+        public boolean equals(Object other) {
+            if (other == this) {
+                return true;
+            }
+
+            if (!(other instanceof RequestKey)) {
+                return false;
+            }
+
+            RequestKey that = (RequestKey) other;
+
+            return Objects.equals(selector, that.selector) &&
+                    Objects.equals(priority, that.priority);
+        }
+    }
+
+    private static OutboundPacketWrapper wrapper(NetworkId networkId, OutboundPacket outboundPacket) {
+        return new OutboundPacketWrapper(networkId, outboundPacket);
+    }
+
+    /*
+     * OutboundPacket in
+     */
+    private static final class OutboundPacketWrapper {
+        private NetworkId networkId;
+        private OutboundPacket outboundPacket;
+
+        private OutboundPacketWrapper(NetworkId networkId, OutboundPacket outboundPacket) {
+            this.networkId = networkId;
+            this.outboundPacket = outboundPacket;
+        }
+
+    }
+
+    /**
+     * Sets thread pool size of message handler.
+     *
+     * @param poolSize
+     */
+    private void setMessageHandlerThreadPoolSize(int poolSize) {
+        checkArgument(poolSize >= 0, "Message handler pool size must be 0 or more");
+        messageHandlerThreadPoolSize = poolSize;
+    }
+
+    /**
+     * Restarts thread pool of message handler.
+     */
+    private void restartMessageHandlerThreadPool() {
+        ExecutorService prevExecutor = messageHandlingExecutor;
+        messageHandlingExecutor = newFixedThreadPool(getMessageHandlerThreadPoolSize(),
+                                                     groupedThreads("DistPktStore", "messageHandling-%d", log));
+        prevExecutor.shutdown();
+    }
+
+    /**
+     * Gets current thread pool size of message handler.
+     *
+     * @return messageHandlerThreadPoolSize
+     */
+    private int getMessageHandlerThreadPoolSize() {
+        return messageHandlerThreadPoolSize;
+    }
+}