ONOS-6096 initial impl of dist. virtual packet store; virtual PacketRequest CLI; PacketRequest codec

Change-Id: Iea0a159a977701685c4487e806b26c85a1fcc1a5
diff --git a/cli/src/main/java/org/onosproject/cli/net/vnet/VirtualNetworkPacketRequestCommand.java b/cli/src/main/java/org/onosproject/cli/net/vnet/VirtualNetworkPacketRequestCommand.java
new file mode 100644
index 0000000..4cd1c98
--- /dev/null
+++ b/cli/src/main/java/org/onosproject/cli/net/vnet/VirtualNetworkPacketRequestCommand.java
@@ -0,0 +1,310 @@
+/*
+ * Copyright 2016-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.cli.net.vnet;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import org.apache.karaf.shell.commands.Argument;
+import org.apache.karaf.shell.commands.Command;
+import org.apache.karaf.shell.commands.Option;
+import org.onlab.packet.Ip6Address;
+import org.onlab.packet.IpPrefix;
+import org.onlab.packet.MacAddress;
+import org.onlab.packet.TpPort;
+import org.onlab.packet.VlanId;
+import org.onosproject.cli.AbstractShellCommand;
+import org.onosproject.cli.net.EthType;
+import org.onosproject.cli.net.ExtHeader;
+import org.onosproject.cli.net.Icmp6Code;
+import org.onosproject.cli.net.Icmp6Type;
+import org.onosproject.cli.net.IpProtocol;
+import org.onosproject.incubator.net.virtual.NetworkId;
+import org.onosproject.incubator.net.virtual.VirtualNetworkService;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.flow.DefaultTrafficSelector;
+import org.onosproject.net.flow.TrafficSelector;
+import org.onosproject.net.packet.PacketPriority;
+import org.onosproject.net.packet.PacketRequest;
+import org.onosproject.net.packet.PacketService;
+
+import java.util.List;
+import java.util.Optional;
+
+import static com.google.common.base.Strings.isNullOrEmpty;
+
+/**
+ * Tests virtual network packet requests.
+ */
+@Command(scope = "onos", name = "vnet-packet",
+        description = "Tests virtual network packet requests")
+public class VirtualNetworkPacketRequestCommand extends AbstractShellCommand {
+
+    @Argument(index = 0, name = "command",
+            description = "Command name (requestPackets|getRequests|cancelPackets)",
+            required = true, multiValued = false)
+    private String command = null;
+
+    @Argument(index = 1, name = "networkId", description = "Network ID",
+            required = true, multiValued = false)
+    private Long networkId = null;
+
+    @Option(name = "--deviceId", description = "Device ID",
+            required = false, multiValued = false)
+    private String deviceIdString = null;
+
+    // Traffic selector
+    @Option(name = "-s", aliases = "--ethSrc", description = "Source MAC Address",
+            required = false, multiValued = false)
+    private String srcMacString = null;
+
+    @Option(name = "-d", aliases = "--ethDst", description = "Destination MAC Address",
+            required = false, multiValued = false)
+    private String dstMacString = null;
+
+    @Option(name = "-t", aliases = "--ethType", description = "Ethernet Type",
+            required = false, multiValued = false)
+    private String ethTypeString = null;
+
+    @Option(name = "-v", aliases = "--vlan", description = "VLAN ID",
+            required = false, multiValued = false)
+    private String vlanString = null;
+
+    @Option(name = "--ipProto", description = "IP Protocol",
+            required = false, multiValued = false)
+    private String ipProtoString = null;
+
+    @Option(name = "--ipSrc", description = "Source IP Prefix",
+            required = false, multiValued = false)
+    private String srcIpString = null;
+
+    @Option(name = "--ipDst", description = "Destination IP Prefix",
+            required = false, multiValued = false)
+    private String dstIpString = null;
+
+    @Option(name = "--fLabel", description = "IPv6 Flow Label",
+            required = false, multiValued = false)
+    private String fLabelString = null;
+
+    @Option(name = "--icmp6Type", description = "ICMPv6 Type",
+            required = false, multiValued = false)
+    private String icmp6TypeString = null;
+
+    @Option(name = "--icmp6Code", description = "ICMPv6 Code",
+            required = false, multiValued = false)
+    private String icmp6CodeString = null;
+
+    @Option(name = "--ndTarget", description = "IPv6 Neighbor Discovery Target Address",
+            required = false, multiValued = false)
+    private String ndTargetString = null;
+
+    @Option(name = "--ndSLL", description = "IPv6 Neighbor Discovery Source Link-Layer",
+            required = false, multiValued = false)
+    private String ndSllString = null;
+
+    @Option(name = "--ndTLL", description = "IPv6 Neighbor Discovery Target Link-Layer",
+            required = false, multiValued = false)
+    private String ndTllString = null;
+
+    @Option(name = "--tcpSrc", description = "Source TCP Port",
+            required = false, multiValued = false)
+    private String srcTcpString = null;
+
+    @Option(name = "--tcpDst", description = "Destination TCP Port",
+            required = false, multiValued = false)
+    private String dstTcpString = null;
+
+    @Option(name = "--extHdr", description = "IPv6 Extension Header Pseudo-field",
+            required = false, multiValued = true)
+    private List<String> extHdrStringList = null;
+
+    @Override
+    protected void execute() {
+        VirtualNetworkService service = get(VirtualNetworkService.class);
+        PacketService virtualPacketService = service.get(NetworkId.networkId(networkId), PacketService.class);
+
+        if (command == null) {
+            print("Command is not defined");
+            return;
+        }
+
+        if (command.equals("getRequests")) {
+            getRequests(virtualPacketService);
+            return;
+        }
+
+        TrafficSelector selector = buildTrafficSelector();
+        PacketPriority packetPriority = PacketPriority.CONTROL; //TODO allow user to specify
+        Optional<DeviceId> optionalDeviceId = null;
+        if (!isNullOrEmpty(deviceIdString)) {
+            optionalDeviceId = Optional.of(DeviceId.deviceId(deviceIdString));
+        }
+
+        if (command.equals("requestPackets")) {
+            if (optionalDeviceId != null) {
+                virtualPacketService.requestPackets(selector, packetPriority, appId(), optionalDeviceId);
+            } else {
+                virtualPacketService.requestPackets(selector, packetPriority, appId());
+            }
+            print("Virtual packet requested:\n%s", selector);
+            return;
+        }
+
+       if (command.equals("cancelPackets")) {
+            if (optionalDeviceId != null) {
+                virtualPacketService.cancelPackets(selector, packetPriority, appId(), optionalDeviceId);
+            } else {
+                virtualPacketService.cancelPackets(selector, packetPriority, appId());
+            }
+            print("Virtual packet cancelled:\n%s", selector);
+            return;
+        }
+
+        print("Unsupported command %s", command);
+    }
+
+    private void getRequests(PacketService packetService) {
+        List<PacketRequest> packetRequests = packetService.getRequests();
+        if (outputJson()) {
+            print("%s", json(packetRequests));
+        } else {
+            packetRequests.forEach(packetRequest -> print(packetRequest.toString()));
+        }
+    }
+
+    private JsonNode json(List<PacketRequest> packetRequests) {
+        ObjectMapper mapper = new ObjectMapper();
+        ArrayNode result = mapper.createArrayNode();
+        packetRequests.forEach(packetRequest ->
+                                       result.add(jsonForEntity(packetRequest, PacketRequest.class)));
+        return result;
+    }
+
+    /**
+     * Constructs a traffic selector based on the command line arguments
+     * presented to the command.
+     * @return traffic selector
+     */
+    private TrafficSelector buildTrafficSelector() {
+        IpPrefix srcIpPrefix = null;
+        IpPrefix dstIpPrefix = null;
+
+        TrafficSelector.Builder selectorBuilder = DefaultTrafficSelector.builder();
+
+        if (!isNullOrEmpty(srcIpString)) {
+            srcIpPrefix = IpPrefix.valueOf(srcIpString);
+            if (srcIpPrefix.isIp4()) {
+                selectorBuilder.matchIPSrc(srcIpPrefix);
+            } else {
+                selectorBuilder.matchIPv6Src(srcIpPrefix);
+            }
+        }
+
+        if (!isNullOrEmpty(dstIpString)) {
+            dstIpPrefix = IpPrefix.valueOf(dstIpString);
+            if (dstIpPrefix.isIp4()) {
+                selectorBuilder.matchIPDst(dstIpPrefix);
+            } else {
+                selectorBuilder.matchIPv6Dst(dstIpPrefix);
+            }
+        }
+
+        if ((srcIpPrefix != null) && (dstIpPrefix != null) &&
+            (srcIpPrefix.version() != dstIpPrefix.version())) {
+            // ERROR: IP src/dst version mismatch
+            throw new IllegalArgumentException(
+                        "IP source and destination version mismatch");
+        }
+
+        //
+        // Set the default EthType based on the IP version if the matching
+        // source or destination IP prefixes.
+        //
+        Short ethType = null;
+        if ((srcIpPrefix != null) && srcIpPrefix.isIp6()) {
+            ethType = EthType.IPV6.value();
+        }
+        if ((dstIpPrefix != null) && dstIpPrefix.isIp6()) {
+            ethType = EthType.IPV6.value();
+        }
+        if (!isNullOrEmpty(ethTypeString)) {
+            ethType = EthType.parseFromString(ethTypeString);
+        }
+        if (ethType != null) {
+            selectorBuilder.matchEthType(ethType);
+        }
+        if (!isNullOrEmpty(vlanString)) {
+            selectorBuilder.matchVlanId(VlanId.vlanId(Short.parseShort(vlanString)));
+        }
+        if (!isNullOrEmpty(srcMacString)) {
+            selectorBuilder.matchEthSrc(MacAddress.valueOf(srcMacString));
+        }
+
+        if (!isNullOrEmpty(dstMacString)) {
+            selectorBuilder.matchEthDst(MacAddress.valueOf(dstMacString));
+        }
+
+        if (!isNullOrEmpty(ipProtoString)) {
+            short ipProtoShort = IpProtocol.parseFromString(ipProtoString);
+            selectorBuilder.matchIPProtocol((byte) ipProtoShort);
+        }
+
+        if (!isNullOrEmpty(fLabelString)) {
+            selectorBuilder.matchIPv6FlowLabel(Integer.parseInt(fLabelString));
+        }
+
+        if (!isNullOrEmpty(icmp6TypeString)) {
+            byte icmp6Type = Icmp6Type.parseFromString(icmp6TypeString);
+            selectorBuilder.matchIcmpv6Type(icmp6Type);
+        }
+
+        if (!isNullOrEmpty(icmp6CodeString)) {
+            byte icmp6Code = Icmp6Code.parseFromString(icmp6CodeString);
+            selectorBuilder.matchIcmpv6Code(icmp6Code);
+        }
+
+        if (!isNullOrEmpty(ndTargetString)) {
+            selectorBuilder.matchIPv6NDTargetAddress(Ip6Address.valueOf(ndTargetString));
+        }
+
+        if (!isNullOrEmpty(ndSllString)) {
+            selectorBuilder.matchIPv6NDSourceLinkLayerAddress(MacAddress.valueOf(ndSllString));
+        }
+
+        if (!isNullOrEmpty(ndTllString)) {
+            selectorBuilder.matchIPv6NDTargetLinkLayerAddress(MacAddress.valueOf(ndTllString));
+        }
+
+        if (!isNullOrEmpty(srcTcpString)) {
+            selectorBuilder.matchTcpSrc(TpPort.tpPort(Integer.parseInt(srcTcpString)));
+        }
+
+        if (!isNullOrEmpty(dstTcpString)) {
+            selectorBuilder.matchTcpDst(TpPort.tpPort(Integer.parseInt(dstTcpString)));
+        }
+
+        if (extHdrStringList != null) {
+            short extHdr = 0;
+            for (String extHdrString : extHdrStringList) {
+                extHdr = (short) (extHdr | ExtHeader.parseFromString(extHdrString));
+            }
+            selectorBuilder.matchIPv6ExthdrFlags(extHdr);
+        }
+
+        return selectorBuilder.build();
+    }
+}
diff --git a/cli/src/main/resources/OSGI-INF/blueprint/shell-config.xml b/cli/src/main/resources/OSGI-INF/blueprint/shell-config.xml
index 7b990c1..e5bf7fb 100644
--- a/cli/src/main/resources/OSGI-INF/blueprint/shell-config.xml
+++ b/cli/src/main/resources/OSGI-INF/blueprint/shell-config.xml
@@ -876,6 +876,16 @@
             </completers>
         </command>
         <command>
+            <action class="org.onosproject.cli.net.vnet.VirtualNetworkPacketRequestCommand"/>
+            <optional-completers>
+                <entry key="-t" value-ref="ethTypeCompleter"/>
+                <entry key="--ipProto" value-ref="ipProtocolCompleter"/>
+                <entry key="--icmp6Type" value-ref="Icmp6TypeCompleter"/>
+                <entry key="--icmp6Code" value-ref="Icmp6CodeCompleter"/>
+                <entry key="--extHdr" value-ref="ExtHeaderCompleter"/>
+            </optional-completers>
+        </command>
+        <command>
             <action class="org.onosproject.cli.net.DpisListCommand"/>
         </command>
 
diff --git a/core/common/src/main/java/org/onosproject/codec/impl/CodecManager.java b/core/common/src/main/java/org/onosproject/codec/impl/CodecManager.java
index 67066a0..44bdacd 100644
--- a/core/common/src/main/java/org/onosproject/codec/impl/CodecManager.java
+++ b/core/common/src/main/java/org/onosproject/codec/impl/CodecManager.java
@@ -82,6 +82,7 @@
 import org.onosproject.net.meter.Band;
 import org.onosproject.net.meter.Meter;
 import org.onosproject.net.meter.MeterRequest;
+import org.onosproject.net.packet.PacketRequest;
 import org.onosproject.net.region.Region;
 import org.onosproject.net.statistic.Load;
 import org.onosproject.net.topology.Topology;
@@ -169,6 +170,7 @@
         registerCodec(FlowStatInfo.class, new FlowStatInfoCodec());
         registerCodec(FilteredConnectPoint.class, new FilteredConnectPointCodec());
         registerCodec(TransportEndpointDescription.class, new TransportEndpointDescriptionCodec());
+        registerCodec(PacketRequest.class, new PacketRequestCodec());
         log.info("Started");
     }
 
diff --git a/core/common/src/main/java/org/onosproject/codec/impl/PacketRequestCodec.java b/core/common/src/main/java/org/onosproject/codec/impl/PacketRequestCodec.java
new file mode 100644
index 0000000..a66d827
--- /dev/null
+++ b/core/common/src/main/java/org/onosproject/codec/impl/PacketRequestCodec.java
@@ -0,0 +1,110 @@
+/*
+ * Copyright 2016-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.codec.impl;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.onosproject.cluster.NodeId;
+import org.onosproject.codec.CodecContext;
+import org.onosproject.codec.JsonCodec;
+import org.onosproject.core.ApplicationId;
+import org.onosproject.core.CoreService;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.flow.TrafficSelector;
+import org.onosproject.net.packet.DefaultPacketRequest;
+import org.onosproject.net.packet.PacketPriority;
+import org.onosproject.net.packet.PacketRequest;
+
+import java.util.Optional;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.onlab.util.Tools.nullIsIllegal;
+
+/**
+ * Codec for the PacketRequest class.
+ */
+
+// TODO: Needs unit test
+
+public class PacketRequestCodec extends JsonCodec<PacketRequest> {
+
+    // JSON field names
+    static final String TRAFFIC_SELECTOR = "selector";
+    static final String PRIORITY = "priority";
+    static final String APP_ID = "appId";
+    static final String NODE_ID = "nodeId";
+    static final String DEVICE_ID = "deviceId";
+
+    private static final String NULL_OBJECT_MSG = "PacketRequest cannot be null";
+    private static final String MISSING_MEMBER_MSG = " member is required in PacketRequest";
+    public static final String REST_APP_ID = "org.onosproject.rest";
+
+    @Override
+    public ObjectNode encode(PacketRequest packetRequest, CodecContext context) {
+        checkNotNull(packetRequest, NULL_OBJECT_MSG);
+
+        final JsonCodec<TrafficSelector> trafficSelectorCodec =
+               context.codec(TrafficSelector.class);
+        final ObjectNode result = context.mapper().createObjectNode()
+                .put(NODE_ID, packetRequest.nodeId().toString())
+                .put(PRIORITY, packetRequest.priority().name())
+                .put(APP_ID, packetRequest.appId().toString());
+        if (packetRequest.deviceId().isPresent()) {
+            result.put(DEVICE_ID, packetRequest.deviceId().get().toString());
+        }
+
+        result.set(TRAFFIC_SELECTOR, trafficSelectorCodec.encode(packetRequest.selector(), context));
+
+        return result;
+    }
+
+    @Override
+    public PacketRequest decode(ObjectNode json, CodecContext context) {
+        if (json == null || !json.isObject()) {
+            return null;
+        }
+
+        final JsonCodec<TrafficSelector> trafficSelectorCodec =
+               context.codec(TrafficSelector.class);
+        TrafficSelector trafficSelector = trafficSelectorCodec.decode(
+                get(json, TRAFFIC_SELECTOR), context);
+        NodeId nodeId = NodeId.nodeId(extractMember(NODE_ID, json));
+        PacketPriority priority = PacketPriority.valueOf(extractMember(PRIORITY, json));
+
+        CoreService coreService = context.getService(CoreService.class);
+        // TODO check appId (currently hardcoded - should it be read from json node?)
+        ApplicationId appId = coreService.registerApplication(REST_APP_ID);
+
+        DeviceId deviceId = null;
+        JsonNode node = json.get(DEVICE_ID);
+        if (node != null) {
+             deviceId = DeviceId.deviceId(node.asText());
+        }
+
+        return new DefaultPacketRequest(trafficSelector, priority, appId, nodeId, Optional.ofNullable(deviceId));
+    }
+
+    /**
+     * Extract member from JSON ObjectNode.
+     *
+     * @param key key for which value is needed
+     * @param json JSON ObjectNode
+     * @return member value
+     */
+    private String extractMember(String key, ObjectNode json) {
+        return nullIsIllegal(json.get(key), key + MISSING_MEMBER_MSG).asText();
+    }
+}
diff --git a/incubator/net/src/test/java/org/onosproject/incubator/net/virtual/impl/VirtualNetworkManagerTest.java b/incubator/net/src/test/java/org/onosproject/incubator/net/virtual/impl/VirtualNetworkManagerTest.java
index 46b717e..44395ef 100644
--- a/incubator/net/src/test/java/org/onosproject/incubator/net/virtual/impl/VirtualNetworkManagerTest.java
+++ b/incubator/net/src/test/java/org/onosproject/incubator/net/virtual/impl/VirtualNetworkManagerTest.java
@@ -66,6 +66,7 @@
 import org.onosproject.net.PortNumber;
 import org.onosproject.net.device.DeviceService;
 import org.onosproject.net.flow.FlowRuleService;
+import org.onosproject.net.flowobjective.FlowObjectiveService;
 import org.onosproject.net.group.GroupService;
 import org.onosproject.net.host.HostService;
 import org.onosproject.net.intent.FakeIntentManager;
@@ -927,6 +928,7 @@
                 .add(VirtualNetworkFlowObjectiveStore.class, new SimpleVirtualFlowObjectiveStore());
 
         validateServiceGetReturnsSavedInstance(virtualNetwork.id(), FlowRuleService.class);
+        validateServiceGetReturnsSavedInstance(virtualNetwork.id(), FlowObjectiveService.class);
         validateServiceGetReturnsSavedInstance(virtualNetwork.id(), PacketService.class);
         validateServiceGetReturnsSavedInstance(virtualNetwork.id(), GroupService.class);
     }
diff --git a/incubator/net/src/test/java/org/onosproject/incubator/net/virtual/impl/VirtualNetworkPacketManagerTest.java b/incubator/net/src/test/java/org/onosproject/incubator/net/virtual/impl/VirtualNetworkPacketManagerTest.java
index 1cedd4a..184a30e 100644
--- a/incubator/net/src/test/java/org/onosproject/incubator/net/virtual/impl/VirtualNetworkPacketManagerTest.java
+++ b/incubator/net/src/test/java/org/onosproject/incubator/net/virtual/impl/VirtualNetworkPacketManagerTest.java
@@ -50,12 +50,12 @@
 import org.onosproject.net.NetTestTools;
 import org.onosproject.net.flow.DefaultTrafficSelector;
 import org.onosproject.net.flow.DefaultTrafficTreatment;
+import org.onosproject.net.flow.FlowRule;
+import org.onosproject.net.flow.FlowRuleBatchOperation;
 import org.onosproject.net.flow.TrafficSelector;
 import org.onosproject.net.flowobjective.FlowObjectiveServiceAdapter;
 import org.onosproject.net.flowobjective.ForwardingObjective;
 import org.onosproject.net.flowobjective.Objective;
-import org.onosproject.net.flow.FlowRule;
-import org.onosproject.net.flow.FlowRuleBatchOperation;
 import org.onosproject.net.intent.FakeIntentManager;
 import org.onosproject.net.intent.TestableIntentService;
 import org.onosproject.net.packet.DefaultOutboundPacket;
@@ -80,15 +80,18 @@
 import static org.onosproject.net.packet.PacketPriority.CONTROL;
 import static org.onosproject.net.packet.PacketPriority.REACTIVE;
 
+/**
+ * Junit tests for VirtualNetworkPacketManager using SimpleVirtualPacketStore.
+ */
 public class VirtualNetworkPacketManagerTest extends VirtualNetworkTestUtil {
 
     private static final int PROCESSOR_PRIORITY = 1;
 
-    private VirtualNetworkManager manager;
-    private DistributedVirtualNetworkStore virtualNetworkManagerStore;
+    protected VirtualNetworkManager manager;
+    protected DistributedVirtualNetworkStore virtualNetworkManagerStore;
     private CoreService coreService = new TestCoreService();
     private TestableIntentService intentService = new FakeIntentManager();
-    private TestServiceDirectory testDirectory;
+    protected TestServiceDirectory testDirectory;
     private EventDeliveryService eventDeliveryService;
     private VirtualProviderManager providerRegistryService;
 
@@ -96,9 +99,9 @@
     private VirtualNetwork vnet2;
 
     private VirtualPacketProvider provider = new TestPacketProvider();
-    private VirtualNetworkPacketStore packetStore = new SimpleVirtualPacketStore();
+    protected VirtualNetworkPacketStore packetStore = new SimpleVirtualPacketStore();
 
-    private VirtualNetworkPacketManager packetManager1;
+    protected VirtualNetworkPacketManager packetManager1;
     private VirtualNetworkPacketManager packetManager2;
 
     private ApplicationId appId = new TestApplicationId("VirtualPacketManagerTest");
@@ -106,13 +109,13 @@
     private VirtualFlowRuleProvider flowRuleProvider = new TestFlowRuleProvider();
     private SimpleVirtualFlowRuleStore flowRuleStore;
     private SimpleVirtualFlowObjectiveStore flowObjectiveStore;
+    protected StorageService storageService = new TestStorageService();
 
     @Before
     public void setUp() throws TestUtils.TestUtilsException {
         virtualNetworkManagerStore = new DistributedVirtualNetworkStore();
 
         TestUtils.setField(virtualNetworkManagerStore, "coreService", coreService);
-        StorageService storageService = new TestStorageService();
         TestUtils.setField(virtualNetworkManagerStore, "storageService", storageService);
         virtualNetworkManagerStore.activate();
 
@@ -293,7 +296,7 @@
         testFlowObjectiveService.validateObjectives(vnet1Devices, ts, CONTROL, REMOVE);
     }
 
-    private static OutboundPacket emittedPacket = null;
+    protected OutboundPacket emittedPacket = null;
 
     /**
      * Core service test class.
diff --git a/incubator/net/src/test/java/org/onosproject/incubator/net/virtual/impl/VirtualNetworkPacketManagerWithDistStoreTest.java b/incubator/net/src/test/java/org/onosproject/incubator/net/virtual/impl/VirtualNetworkPacketManagerWithDistStoreTest.java
new file mode 100644
index 0000000..0159ae2
--- /dev/null
+++ b/incubator/net/src/test/java/org/onosproject/incubator/net/virtual/impl/VirtualNetworkPacketManagerWithDistStoreTest.java
@@ -0,0 +1,116 @@
+/*
+ * Copyright 2017-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.incubator.net.virtual.impl;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.onlab.junit.TestUtils;
+import org.onlab.osgi.ComponentContextAdapter;
+import org.onosproject.cfg.ComponentConfigAdapter;
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.ClusterServiceAdapter;
+import org.onosproject.cluster.NodeId;
+import org.onosproject.incubator.store.virtual.impl.DistributedVirtualPacketStore;
+import org.onosproject.mastership.MastershipServiceAdapter;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.flow.DefaultTrafficTreatment;
+import org.onosproject.net.packet.DefaultOutboundPacket;
+import org.onosproject.net.packet.OutboundPacket;
+import org.onosproject.store.cluster.messaging.ClusterCommunicationServiceAdapter;
+import org.onosproject.store.cluster.messaging.MessageSubject;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+
+import static org.junit.Assert.assertNull;
+
+/**
+ * Junit tests for VirtualNetworkPacketManager using DistributedVirtualPacketStore..
+ * This test class extends VirtualNetworkPacketManagerTest - all the tests defined in
+ * VirtualNetworkPacketManagerTest will run using DistributedVirtualPacketStore.
+ */
+public class VirtualNetworkPacketManagerWithDistStoreTest extends VirtualNetworkPacketManagerTest {
+
+    private DistributedVirtualPacketStore distStore;
+    private ClusterService clusterService = new ClusterServiceAdapter();
+
+    @Before
+    public void setUp() throws TestUtils.TestUtilsException {
+        setUpDistPacketStore();
+        super.setUp();
+        TestUtils.setField(packetManager1, "storageService", storageService);
+    }
+
+    private void setUpDistPacketStore() throws TestUtils.TestUtilsException {
+        distStore = new DistributedVirtualPacketStore();
+        TestUtils.setField(distStore, "cfgService", new ComponentConfigAdapter());
+        TestUtils.setField(distStore, "storageService", storageService);
+        TestUtils.setField(distStore, "clusterService", clusterService);
+        TestUtils.setField(distStore, "communicationService", new TestClusterCommunicationService());
+        TestUtils.setField(distStore, "mastershipService", new TestMastershipService());
+
+        distStore.activate(new ComponentContextAdapter());
+        packetStore = distStore; // super.setUp() will cause Distributed store to be used.
+    }
+
+    @After
+    public void tearDown() {
+        distStore.deactivate();
+    }
+
+    @Override
+    @Test
+    @Ignore("Ignore until there is MastershipService support for virtual devices")
+    public void emitTest() {
+        super.emitTest();
+    }
+
+    /**
+     * Tests the correct usage of emit() for a outbound packet - master of packet's
+     * sendThrough is not local node.
+     */
+    @Test
+    @Ignore("Ignore until there is MastershipService support for virtual devices")
+    public void emit2Test() {
+        OutboundPacket packet =
+                new DefaultOutboundPacket(VDID2, DefaultTrafficTreatment.emptyTreatment(), ByteBuffer.allocate(5));
+        packetManager1.emit(packet);
+        assertNull("Packet should not have been emmitted", emittedPacket);
+    }
+
+    private final class TestMastershipService extends MastershipServiceAdapter {
+        @Override
+        public NodeId getMasterFor(DeviceId deviceId) {
+            if (VDID1.equals(deviceId)) {
+                return clusterService.getLocalNode().id();
+            }
+            return new NodeId("abc");
+        }
+    }
+
+    private final class TestClusterCommunicationService extends ClusterCommunicationServiceAdapter {
+        @Override
+        public <M> CompletableFuture<Void> unicast(M message, MessageSubject subject,
+                                                   Function<M, byte[]> encoder, NodeId toNodeId) {
+            return new CompletableFuture<>();
+        }
+    }
+
+}
diff --git a/incubator/store/src/main/java/org/onosproject/incubator/store/virtual/impl/DistributedVirtualPacketStore.java b/incubator/store/src/main/java/org/onosproject/incubator/store/virtual/impl/DistributedVirtualPacketStore.java
new file mode 100644
index 0000000..18a8cc3
--- /dev/null
+++ b/incubator/store/src/main/java/org/onosproject/incubator/store/virtual/impl/DistributedVirtualPacketStore.java
@@ -0,0 +1,414 @@
+/*
+ * Copyright 2017-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.incubator.store.virtual.impl;
+
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Modified;
+import org.apache.felix.scr.annotations.Property;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.apache.felix.scr.annotations.Service;
+import org.onlab.util.KryoNamespace;
+import org.onosproject.cfg.ComponentConfigService;
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.NodeId;
+import org.onosproject.incubator.net.virtual.NetworkId;
+import org.onosproject.incubator.net.virtual.VirtualNetworkPacketStore;
+import org.onosproject.mastership.MastershipService;
+import org.onosproject.net.flow.TrafficSelector;
+import org.onosproject.net.packet.OutboundPacket;
+import org.onosproject.net.packet.PacketEvent;
+import org.onosproject.net.packet.PacketPriority;
+import org.onosproject.net.packet.PacketRequest;
+import org.onosproject.net.packet.PacketStoreDelegate;
+import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
+import org.onosproject.store.cluster.messaging.MessageSubject;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.serializers.StoreSerializer;
+import org.onosproject.store.service.ConsistentMap;
+import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.StorageService;
+import org.osgi.service.component.ComponentContext;
+import org.slf4j.Logger;
+
+import java.util.Dictionary;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Strings.isNullOrEmpty;
+import static java.util.concurrent.Executors.newFixedThreadPool;
+import static org.onlab.util.Tools.get;
+import static org.onlab.util.Tools.groupedThreads;
+import static org.onosproject.net.packet.PacketEvent.Type.EMIT;
+import static org.slf4j.LoggerFactory.getLogger;
+
+/**
+ * Distributed virtual packet store implementation allowing packets to be sent to
+ * remote instances.  Implementation is based on DistributedPacketStore class.
+ */
+@Component(immediate = true, enabled = false)
+@Service
+public class DistributedVirtualPacketStore
+        extends AbstractVirtualStore<PacketEvent, PacketStoreDelegate>
+        implements VirtualNetworkPacketStore {
+
+    private final Logger log = getLogger(getClass());
+
+    private static final String FORMAT = "Setting: messageHandlerThreadPoolSize={}";
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected MastershipService mastershipService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected ClusterService clusterService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected ClusterCommunicationService communicationService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected StorageService storageService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected ComponentConfigService cfgService;
+
+    private PacketRequestTracker tracker;
+
+    private static final MessageSubject PACKET_OUT_SUBJECT =
+            new MessageSubject("virtual-packet-out");
+
+    private static final StoreSerializer SERIALIZER = StoreSerializer.using(KryoNamespaces.API);
+
+    private ExecutorService messageHandlingExecutor;
+
+    private static final int DEFAULT_MESSAGE_HANDLER_THREAD_POOL_SIZE = 4;
+    @Property(name = "messageHandlerThreadPoolSize", intValue = DEFAULT_MESSAGE_HANDLER_THREAD_POOL_SIZE,
+            label = "Size of thread pool to assign message handler")
+    private static int messageHandlerThreadPoolSize = DEFAULT_MESSAGE_HANDLER_THREAD_POOL_SIZE;
+
+    @Activate
+    public void activate(ComponentContext context) {
+        cfgService.registerProperties(getClass());
+
+        modified(context);
+
+        messageHandlingExecutor = Executors.newFixedThreadPool(
+                messageHandlerThreadPoolSize,
+                groupedThreads("onos/store/packet", "message-handlers", log));
+
+        communicationService.<OutboundPacketWrapper>addSubscriber(PACKET_OUT_SUBJECT,
+                SERIALIZER::decode,
+                packetWrapper -> notifyDelegate(packetWrapper.networkId,
+                                                new PacketEvent(EMIT,
+                                                                packetWrapper.outboundPacket)),
+                messageHandlingExecutor);
+
+        tracker = new PacketRequestTracker();
+
+        log.info("Started");
+    }
+
+    @Deactivate
+    public void deactivate() {
+        cfgService.unregisterProperties(getClass(), false);
+        communicationService.removeSubscriber(PACKET_OUT_SUBJECT);
+        messageHandlingExecutor.shutdown();
+        tracker = null;
+        log.info("Stopped");
+    }
+
+    @Modified
+    public void  modified(ComponentContext context) {
+        Dictionary<?, ?> properties = context != null ? context.getProperties() : new Properties();
+
+        int newMessageHandlerThreadPoolSize;
+
+        try {
+            String s = get(properties, "messageHandlerThreadPoolSize");
+
+            newMessageHandlerThreadPoolSize =
+                    isNullOrEmpty(s) ? messageHandlerThreadPoolSize : Integer.parseInt(s.trim());
+
+        } catch (NumberFormatException e) {
+            log.warn(e.getMessage());
+            newMessageHandlerThreadPoolSize = messageHandlerThreadPoolSize;
+        }
+
+        // Any change in the following parameters implies thread pool restart
+        if (newMessageHandlerThreadPoolSize != messageHandlerThreadPoolSize) {
+            setMessageHandlerThreadPoolSize(newMessageHandlerThreadPoolSize);
+            restartMessageHandlerThreadPool();
+        }
+
+        log.info(FORMAT, messageHandlerThreadPoolSize);
+    }
+
+    @Override
+    public void emit(NetworkId networkId, OutboundPacket packet) {
+        NodeId myId = clusterService.getLocalNode().id();
+        // TODO revive this when there is MastershipService support for virtual devices
+//        NodeId master = mastershipService.getMasterFor(packet.sendThrough());
+//
+//        if (master == null) {
+//            log.warn("No master found for {}", packet.sendThrough());
+//            return;
+//        }
+//
+//        log.debug("master {} found for {}", myId, packet.sendThrough());
+//        if (myId.equals(master)) {
+//            notifyDelegate(networkId, new PacketEvent(EMIT, packet));
+//            return;
+//        }
+//
+//        communicationService.unicast(packet, PACKET_OUT_SUBJECT, SERIALIZER::encode, master)
+//                            .whenComplete((r, error) -> {
+//                                if (error != null) {
+//                                    log.warn("Failed to send packet-out to {}", master, error);
+//                                }
+//                            });
+    }
+
+    @Override
+    public void requestPackets(NetworkId networkId, PacketRequest request) {
+        tracker.add(networkId, request);
+
+    }
+
+    @Override
+    public void cancelPackets(NetworkId networkId, PacketRequest request) {
+        tracker.remove(networkId, request);
+    }
+
+    @Override
+    public List<PacketRequest> existingRequests(NetworkId networkId) {
+        return tracker.requests(networkId);
+    }
+
+    private final class PacketRequestTracker {
+
+        private ConsistentMap<NetworkId, Map<RequestKey, Set<PacketRequest>>> distRequests;
+        private Map<NetworkId, Map<RequestKey, Set<PacketRequest>>> requests;
+
+        private PacketRequestTracker() {
+            distRequests = storageService.<NetworkId, Map<RequestKey, Set<PacketRequest>>>consistentMapBuilder()
+                    .withName("onos-virtual-packet-requests")
+                    .withSerializer(Serializer.using(KryoNamespace.newBuilder()
+                            .register(KryoNamespaces.API)
+                            .register(RequestKey.class)
+                            .register(NetworkId.class)
+                            .build()))
+                    .build();
+            requests = distRequests.asJavaMap();
+        }
+
+        private void add(NetworkId networkId, PacketRequest request) {
+            AtomicBoolean firstRequest = addInternal(networkId, request);
+            PacketStoreDelegate delegate = delegateMap.get(networkId);
+            if (firstRequest.get() && delegate != null) {
+                // The instance that makes the first request will push to all devices
+                delegate.requestPackets(request);
+            }
+        }
+
+        private AtomicBoolean addInternal(NetworkId networkId, PacketRequest request) {
+            AtomicBoolean firstRequest = new AtomicBoolean(false);
+            AtomicBoolean changed = new AtomicBoolean(true);
+            Map<RequestKey, Set<PacketRequest>> requestsForNetwork = getMap(networkId);
+            requestsForNetwork.compute(key(request), (s, existingRequests) -> {
+                // Reset to false just in case this is a retry due to
+                // ConcurrentModificationException
+                firstRequest.set(false);
+                if (existingRequests == null) {
+                    firstRequest.set(true);
+                    return ImmutableSet.of(request);
+                } else if (!existingRequests.contains(request)) {
+                    firstRequest.set(true);
+                    return ImmutableSet.<PacketRequest>builder()
+                                       .addAll(existingRequests)
+                                       .add(request)
+                                       .build();
+                } else {
+                    changed.set(false);
+                    return existingRequests;
+                }
+            });
+            if (changed.get()) {
+                requests.put(networkId, requestsForNetwork);
+            }
+            return firstRequest;
+        }
+
+        private void remove(NetworkId networkId, PacketRequest request) {
+            AtomicBoolean removedLast = removeInternal(networkId, request);
+            PacketStoreDelegate delegate = delegateMap.get(networkId);
+            if (removedLast.get() && delegate != null) {
+                // The instance that removes the last request will remove from all devices
+                delegate.cancelPackets(request);
+            }
+        }
+
+        private AtomicBoolean removeInternal(NetworkId networkId, PacketRequest request) {
+            AtomicBoolean removedLast = new AtomicBoolean(false);
+            AtomicBoolean changed = new AtomicBoolean(true);
+            Map<RequestKey, Set<PacketRequest>> requestsForNetwork = getMap(networkId);
+            requestsForNetwork.computeIfPresent(key(request), (s, existingRequests) -> {
+                // Reset to false just in case this is a retry due to
+                // ConcurrentModificationException
+                removedLast.set(false);
+                if (existingRequests.contains(request)) {
+                    Set<PacketRequest> newRequests = Sets.newHashSet(existingRequests);
+                    newRequests.remove(request);
+                    if (newRequests.size() > 0) {
+                        return ImmutableSet.copyOf(newRequests);
+                    } else {
+                        removedLast.set(true);
+                        return null;
+                    }
+                } else {
+                    changed.set(false);
+                    return existingRequests;
+                }
+            });
+            if (changed.get()) {
+                requests.put(networkId, requestsForNetwork);
+            }
+            return removedLast;
+        }
+
+        private List<PacketRequest> requests(NetworkId networkId) {
+            Map<RequestKey, Set<PacketRequest>> requestsForNetwork = getMap(networkId);
+            List<PacketRequest> list = Lists.newArrayList();
+            requestsForNetwork.values().forEach(v -> list.addAll(v));
+            list.sort((o1, o2) -> o1.priority().priorityValue() - o2.priority().priorityValue());
+            return list;
+        }
+
+        /*
+         * Gets PacketRequests for specified networkId.
+         */
+        private Map<RequestKey, Set<PacketRequest>> getMap(NetworkId networkId) {
+            return requests.computeIfAbsent(networkId, networkId1 -> {
+                        log.debug("Creating new map for {}", networkId1);
+                        Map newMap = Maps.newHashMap();
+                        return newMap;
+                    });
+        }
+    }
+
+    /**
+     * Creates a new request key from a packet request.
+     *
+     * @param request packet request
+     * @return request key
+     */
+    private static RequestKey key(PacketRequest request) {
+        return new RequestKey(request.selector(), request.priority());
+    }
+
+    /**
+     * Key of a packet request.
+     */
+    private static final class RequestKey {
+        private final TrafficSelector selector;
+        private final PacketPriority priority;
+
+        private RequestKey(TrafficSelector selector, PacketPriority priority) {
+            this.selector = selector;
+            this.priority = priority;
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(selector, priority);
+        }
+
+        @Override
+        public boolean equals(Object other) {
+            if (other == this) {
+                return true;
+            }
+
+            if (!(other instanceof RequestKey)) {
+                return false;
+            }
+
+            RequestKey that = (RequestKey) other;
+
+            return Objects.equals(selector, that.selector) &&
+                    Objects.equals(priority, that.priority);
+        }
+    }
+
+    private static OutboundPacketWrapper wrapper(NetworkId networkId, OutboundPacket outboundPacket) {
+        return new OutboundPacketWrapper(networkId, outboundPacket);
+    }
+
+    /*
+     * OutboundPacket in
+     */
+    private static final class OutboundPacketWrapper {
+        private NetworkId networkId;
+        private OutboundPacket outboundPacket;
+
+        private OutboundPacketWrapper(NetworkId networkId, OutboundPacket outboundPacket) {
+            this.networkId = networkId;
+            this.outboundPacket = outboundPacket;
+        }
+
+    }
+
+    /**
+     * Sets thread pool size of message handler.
+     *
+     * @param poolSize
+     */
+    private void setMessageHandlerThreadPoolSize(int poolSize) {
+        checkArgument(poolSize >= 0, "Message handler pool size must be 0 or more");
+        messageHandlerThreadPoolSize = poolSize;
+    }
+
+    /**
+     * Restarts thread pool of message handler.
+     */
+    private void restartMessageHandlerThreadPool() {
+        ExecutorService prevExecutor = messageHandlingExecutor;
+        messageHandlingExecutor = newFixedThreadPool(getMessageHandlerThreadPoolSize(),
+                                                     groupedThreads("DistPktStore", "messageHandling-%d", log));
+        prevExecutor.shutdown();
+    }
+
+    /**
+     * Gets current thread pool size of message handler.
+     *
+     * @return messageHandlerThreadPoolSize
+     */
+    private int getMessageHandlerThreadPoolSize() {
+        return messageHandlerThreadPoolSize;
+    }
+}
diff --git a/incubator/store/src/main/java/org/onosproject/incubator/store/virtual/impl/SimpleVirtualPacketStore.java b/incubator/store/src/main/java/org/onosproject/incubator/store/virtual/impl/SimpleVirtualPacketStore.java
index 01e8a9d..84caf80 100644
--- a/incubator/store/src/main/java/org/onosproject/incubator/store/virtual/impl/SimpleVirtualPacketStore.java
+++ b/incubator/store/src/main/java/org/onosproject/incubator/store/virtual/impl/SimpleVirtualPacketStore.java
@@ -118,8 +118,10 @@
     @Override
     public List<PacketRequest> existingRequests(NetworkId networkId) {
         List<PacketRequest> list = Lists.newArrayList();
-        requests.get(networkId).values().forEach(list::addAll);
-        list.sort((o1, o2) -> o1.priority().priorityValue() - o2.priority().priorityValue());
+        if (requests.get(networkId) != null) {
+            requests.get(networkId).values().forEach(list::addAll);
+            list.sort((o1, o2) -> o1.priority().priorityValue() - o2.priority().priorityValue());
+        }
         return list;
     }
 }