[ONOS-5936] (vCore) Virtual FlowObjective Manager and Store

Changes
1. FlowObjective manager for virtual network is added
2. VirtualFlowObjective store is added
3. SimpleVirtualFlowObjectiveStore is implementation
4. Unit tests are added

Change-Id: I18ff1d440d1f85ca96fff36a33a8b67566031e2c
diff --git a/incubator/store/src/main/java/org/onosproject/incubator/store/virtual/impl/SimpleVirtualFlowObjectiveStore.java b/incubator/store/src/main/java/org/onosproject/incubator/store/virtual/impl/SimpleVirtualFlowObjectiveStore.java
new file mode 100644
index 0000000..1ac5396
--- /dev/null
+++ b/incubator/store/src/main/java/org/onosproject/incubator/store/virtual/impl/SimpleVirtualFlowObjectiveStore.java
@@ -0,0 +1,153 @@
+/*
+ * Copyright 2017-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.incubator.store.virtual.impl;
+
+import com.google.common.collect.Maps;
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.apache.felix.scr.annotations.Service;
+import org.onosproject.incubator.net.virtual.NetworkId;
+import org.onosproject.incubator.net.virtual.VirtualNetworkFlowObjectiveStore;
+import org.onosproject.net.behaviour.DefaultNextGroup;
+import org.onosproject.net.behaviour.NextGroup;
+import org.onosproject.net.flowobjective.FlowObjectiveStoreDelegate;
+import org.onosproject.net.flowobjective.ObjectiveEvent;
+import org.onosproject.store.service.AtomicCounter;
+import org.onosproject.store.service.StorageService;
+import org.slf4j.Logger;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import static org.onlab.util.Tools.groupedThreads;
+import static org.slf4j.LoggerFactory.getLogger;
+
+/**
+ * Single instance implementation of store to manage
+ * the inventory of created next groups for virtual network.
+ */
+@Component(immediate = true)
+@Service
+public class SimpleVirtualFlowObjectiveStore
+        extends AbstractVirtualStore<ObjectiveEvent, FlowObjectiveStoreDelegate>
+        implements VirtualNetworkFlowObjectiveStore {
+
+    private final Logger log = getLogger(getClass());
+
+    private ConcurrentMap<NetworkId, ConcurrentMap<Integer, byte[]>> nextGroupsMap;
+
+    private AtomicCounter nextIds;
+
+    // event queue to separate map-listener threads from event-handler threads (tpool)
+    private BlockingQueue<VirtualObjectiveEvent> eventQ;
+    private ExecutorService tpool;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected StorageService storageService;
+
+    @Activate
+    public void activate() {
+        tpool = Executors.newFixedThreadPool(4, groupedThreads("onos/virtual/flobj-notifier", "%d", log));
+        eventQ = new LinkedBlockingQueue<>();
+        tpool.execute(new FlowObjectiveNotifier());
+
+        nextGroupsMap = Maps.newConcurrentMap();
+
+        nextIds = storageService.getAtomicCounter("next-objective-counter");
+        log.info("Started");
+    }
+
+    private ConcurrentMap<Integer, byte[]> getNextGroups(NetworkId networkId) {
+        nextGroupsMap.computeIfAbsent(networkId, n -> Maps.newConcurrentMap());
+        return nextGroupsMap.get(networkId);
+    }
+
+    @Override
+    public void putNextGroup(NetworkId networkId, Integer nextId, NextGroup group) {
+        ConcurrentMap<Integer, byte[]> nextGroups = getNextGroups(networkId);
+        nextGroups.put(nextId, group.data());
+
+        eventQ.add(new VirtualObjectiveEvent(networkId, ObjectiveEvent.Type.ADD, nextId));
+    }
+
+    @Override
+    public NextGroup getNextGroup(NetworkId networkId, Integer nextId) {
+        ConcurrentMap<Integer, byte[]> nextGroups = getNextGroups(networkId);
+        return new DefaultNextGroup(nextGroups.get(nextId));
+    }
+
+    @Override
+    public NextGroup removeNextGroup(NetworkId networkId, Integer nextId) {
+        ConcurrentMap<Integer, byte[]> nextGroups = getNextGroups(networkId);
+        eventQ.add(new VirtualObjectiveEvent(networkId, ObjectiveEvent.Type.REMOVE, nextId));
+        return new DefaultNextGroup(nextGroups.remove(nextId));
+    }
+
+    @Override
+    public Map<Integer, NextGroup> getAllGroups(NetworkId networkId) {
+        ConcurrentMap<Integer, byte[]> nextGroups = getNextGroups(networkId);
+
+        Map<Integer, NextGroup> nextGroupMappings = new HashMap<>();
+        for (int key : nextGroups.keySet()) {
+            NextGroup nextGroup = getNextGroup(networkId, key);
+            if (nextGroup != null) {
+                nextGroupMappings.put(key, nextGroup);
+            }
+        }
+        return nextGroupMappings;
+    }
+
+    @Override
+    public int allocateNextId(NetworkId networkId) {
+        return (int) nextIds.incrementAndGet();
+    }
+
+    private class FlowObjectiveNotifier implements Runnable {
+        @Override
+        public void run() {
+            try {
+                while (!Thread.currentThread().isInterrupted()) {
+                    VirtualObjectiveEvent vEvent = eventQ.take();
+                    notifyDelegate(vEvent.networkId(), vEvent);
+                }
+            } catch (InterruptedException ex) {
+                Thread.currentThread().interrupt();
+            }
+        }
+    }
+
+    private class VirtualObjectiveEvent extends ObjectiveEvent {
+        NetworkId networkId;
+
+        public VirtualObjectiveEvent(NetworkId networkId, Type type,
+                                     Integer objective) {
+            super(type, objective);
+            this.networkId = networkId;
+        }
+
+        NetworkId networkId() {
+            return networkId;
+        }
+    }
+}
diff --git a/incubator/store/src/main/java/org/onosproject/incubator/store/virtual/impl/SimpleVirtualFlowRuleStore.java b/incubator/store/src/main/java/org/onosproject/incubator/store/virtual/impl/SimpleVirtualFlowRuleStore.java
index 5f71b57..0952574 100644
--- a/incubator/store/src/main/java/org/onosproject/incubator/store/virtual/impl/SimpleVirtualFlowRuleStore.java
+++ b/incubator/store/src/main/java/org/onosproject/incubator/store/virtual/impl/SimpleVirtualFlowRuleStore.java
@@ -151,8 +151,12 @@
 
     @Override
     public int getFlowRuleCount(NetworkId networkId) {
-
         int sum = 0;
+
+        if (flowEntries.get(networkId) == null) {
+            return 0;
+        }
+
         for (ConcurrentMap<FlowId, List<StoredFlowEntry>> ft :
                 flowEntries.get(networkId).values()) {
             for (List<StoredFlowEntry> fes : ft.values()) {