ONOS-6096 initial impl of dist. virtual packet store; virtual PacketRequest CLI; PacketRequest codec
Change-Id: Iea0a159a977701685c4487e806b26c85a1fcc1a5
diff --git a/cli/src/main/java/org/onosproject/cli/net/vnet/VirtualNetworkPacketRequestCommand.java b/cli/src/main/java/org/onosproject/cli/net/vnet/VirtualNetworkPacketRequestCommand.java
new file mode 100644
index 0000000..4cd1c98
--- /dev/null
+++ b/cli/src/main/java/org/onosproject/cli/net/vnet/VirtualNetworkPacketRequestCommand.java
@@ -0,0 +1,310 @@
+/*
+ * Copyright 2016-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.cli.net.vnet;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import org.apache.karaf.shell.commands.Argument;
+import org.apache.karaf.shell.commands.Command;
+import org.apache.karaf.shell.commands.Option;
+import org.onlab.packet.Ip6Address;
+import org.onlab.packet.IpPrefix;
+import org.onlab.packet.MacAddress;
+import org.onlab.packet.TpPort;
+import org.onlab.packet.VlanId;
+import org.onosproject.cli.AbstractShellCommand;
+import org.onosproject.cli.net.EthType;
+import org.onosproject.cli.net.ExtHeader;
+import org.onosproject.cli.net.Icmp6Code;
+import org.onosproject.cli.net.Icmp6Type;
+import org.onosproject.cli.net.IpProtocol;
+import org.onosproject.incubator.net.virtual.NetworkId;
+import org.onosproject.incubator.net.virtual.VirtualNetworkService;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.flow.DefaultTrafficSelector;
+import org.onosproject.net.flow.TrafficSelector;
+import org.onosproject.net.packet.PacketPriority;
+import org.onosproject.net.packet.PacketRequest;
+import org.onosproject.net.packet.PacketService;
+
+import java.util.List;
+import java.util.Optional;
+
+import static com.google.common.base.Strings.isNullOrEmpty;
+
+/**
+ * Tests virtual network packet requests.
+ */
+@Command(scope = "onos", name = "vnet-packet",
+ description = "Tests virtual network packet requests")
+public class VirtualNetworkPacketRequestCommand extends AbstractShellCommand {
+
+ @Argument(index = 0, name = "command",
+ description = "Command name (requestPackets|getRequests|cancelPackets)",
+ required = true, multiValued = false)
+ private String command = null;
+
+ @Argument(index = 1, name = "networkId", description = "Network ID",
+ required = true, multiValued = false)
+ private Long networkId = null;
+
+ @Option(name = "--deviceId", description = "Device ID",
+ required = false, multiValued = false)
+ private String deviceIdString = null;
+
+ // Traffic selector
+ @Option(name = "-s", aliases = "--ethSrc", description = "Source MAC Address",
+ required = false, multiValued = false)
+ private String srcMacString = null;
+
+ @Option(name = "-d", aliases = "--ethDst", description = "Destination MAC Address",
+ required = false, multiValued = false)
+ private String dstMacString = null;
+
+ @Option(name = "-t", aliases = "--ethType", description = "Ethernet Type",
+ required = false, multiValued = false)
+ private String ethTypeString = null;
+
+ @Option(name = "-v", aliases = "--vlan", description = "VLAN ID",
+ required = false, multiValued = false)
+ private String vlanString = null;
+
+ @Option(name = "--ipProto", description = "IP Protocol",
+ required = false, multiValued = false)
+ private String ipProtoString = null;
+
+ @Option(name = "--ipSrc", description = "Source IP Prefix",
+ required = false, multiValued = false)
+ private String srcIpString = null;
+
+ @Option(name = "--ipDst", description = "Destination IP Prefix",
+ required = false, multiValued = false)
+ private String dstIpString = null;
+
+ @Option(name = "--fLabel", description = "IPv6 Flow Label",
+ required = false, multiValued = false)
+ private String fLabelString = null;
+
+ @Option(name = "--icmp6Type", description = "ICMPv6 Type",
+ required = false, multiValued = false)
+ private String icmp6TypeString = null;
+
+ @Option(name = "--icmp6Code", description = "ICMPv6 Code",
+ required = false, multiValued = false)
+ private String icmp6CodeString = null;
+
+ @Option(name = "--ndTarget", description = "IPv6 Neighbor Discovery Target Address",
+ required = false, multiValued = false)
+ private String ndTargetString = null;
+
+ @Option(name = "--ndSLL", description = "IPv6 Neighbor Discovery Source Link-Layer",
+ required = false, multiValued = false)
+ private String ndSllString = null;
+
+ @Option(name = "--ndTLL", description = "IPv6 Neighbor Discovery Target Link-Layer",
+ required = false, multiValued = false)
+ private String ndTllString = null;
+
+ @Option(name = "--tcpSrc", description = "Source TCP Port",
+ required = false, multiValued = false)
+ private String srcTcpString = null;
+
+ @Option(name = "--tcpDst", description = "Destination TCP Port",
+ required = false, multiValued = false)
+ private String dstTcpString = null;
+
+ @Option(name = "--extHdr", description = "IPv6 Extension Header Pseudo-field",
+ required = false, multiValued = true)
+ private List<String> extHdrStringList = null;
+
+ @Override
+ protected void execute() {
+ VirtualNetworkService service = get(VirtualNetworkService.class);
+ PacketService virtualPacketService = service.get(NetworkId.networkId(networkId), PacketService.class);
+
+ if (command == null) {
+ print("Command is not defined");
+ return;
+ }
+
+ if (command.equals("getRequests")) {
+ getRequests(virtualPacketService);
+ return;
+ }
+
+ TrafficSelector selector = buildTrafficSelector();
+ PacketPriority packetPriority = PacketPriority.CONTROL; //TODO allow user to specify
+ Optional<DeviceId> optionalDeviceId = null;
+ if (!isNullOrEmpty(deviceIdString)) {
+ optionalDeviceId = Optional.of(DeviceId.deviceId(deviceIdString));
+ }
+
+ if (command.equals("requestPackets")) {
+ if (optionalDeviceId != null) {
+ virtualPacketService.requestPackets(selector, packetPriority, appId(), optionalDeviceId);
+ } else {
+ virtualPacketService.requestPackets(selector, packetPriority, appId());
+ }
+ print("Virtual packet requested:\n%s", selector);
+ return;
+ }
+
+ if (command.equals("cancelPackets")) {
+ if (optionalDeviceId != null) {
+ virtualPacketService.cancelPackets(selector, packetPriority, appId(), optionalDeviceId);
+ } else {
+ virtualPacketService.cancelPackets(selector, packetPriority, appId());
+ }
+ print("Virtual packet cancelled:\n%s", selector);
+ return;
+ }
+
+ print("Unsupported command %s", command);
+ }
+
+ private void getRequests(PacketService packetService) {
+ List<PacketRequest> packetRequests = packetService.getRequests();
+ if (outputJson()) {
+ print("%s", json(packetRequests));
+ } else {
+ packetRequests.forEach(packetRequest -> print(packetRequest.toString()));
+ }
+ }
+
+ private JsonNode json(List<PacketRequest> packetRequests) {
+ ObjectMapper mapper = new ObjectMapper();
+ ArrayNode result = mapper.createArrayNode();
+ packetRequests.forEach(packetRequest ->
+ result.add(jsonForEntity(packetRequest, PacketRequest.class)));
+ return result;
+ }
+
+ /**
+ * Constructs a traffic selector based on the command line arguments
+ * presented to the command.
+ * @return traffic selector
+ */
+ private TrafficSelector buildTrafficSelector() {
+ IpPrefix srcIpPrefix = null;
+ IpPrefix dstIpPrefix = null;
+
+ TrafficSelector.Builder selectorBuilder = DefaultTrafficSelector.builder();
+
+ if (!isNullOrEmpty(srcIpString)) {
+ srcIpPrefix = IpPrefix.valueOf(srcIpString);
+ if (srcIpPrefix.isIp4()) {
+ selectorBuilder.matchIPSrc(srcIpPrefix);
+ } else {
+ selectorBuilder.matchIPv6Src(srcIpPrefix);
+ }
+ }
+
+ if (!isNullOrEmpty(dstIpString)) {
+ dstIpPrefix = IpPrefix.valueOf(dstIpString);
+ if (dstIpPrefix.isIp4()) {
+ selectorBuilder.matchIPDst(dstIpPrefix);
+ } else {
+ selectorBuilder.matchIPv6Dst(dstIpPrefix);
+ }
+ }
+
+ if ((srcIpPrefix != null) && (dstIpPrefix != null) &&
+ (srcIpPrefix.version() != dstIpPrefix.version())) {
+ // ERROR: IP src/dst version mismatch
+ throw new IllegalArgumentException(
+ "IP source and destination version mismatch");
+ }
+
+ //
+ // Set the default EthType based on the IP version if the matching
+ // source or destination IP prefixes.
+ //
+ Short ethType = null;
+ if ((srcIpPrefix != null) && srcIpPrefix.isIp6()) {
+ ethType = EthType.IPV6.value();
+ }
+ if ((dstIpPrefix != null) && dstIpPrefix.isIp6()) {
+ ethType = EthType.IPV6.value();
+ }
+ if (!isNullOrEmpty(ethTypeString)) {
+ ethType = EthType.parseFromString(ethTypeString);
+ }
+ if (ethType != null) {
+ selectorBuilder.matchEthType(ethType);
+ }
+ if (!isNullOrEmpty(vlanString)) {
+ selectorBuilder.matchVlanId(VlanId.vlanId(Short.parseShort(vlanString)));
+ }
+ if (!isNullOrEmpty(srcMacString)) {
+ selectorBuilder.matchEthSrc(MacAddress.valueOf(srcMacString));
+ }
+
+ if (!isNullOrEmpty(dstMacString)) {
+ selectorBuilder.matchEthDst(MacAddress.valueOf(dstMacString));
+ }
+
+ if (!isNullOrEmpty(ipProtoString)) {
+ short ipProtoShort = IpProtocol.parseFromString(ipProtoString);
+ selectorBuilder.matchIPProtocol((byte) ipProtoShort);
+ }
+
+ if (!isNullOrEmpty(fLabelString)) {
+ selectorBuilder.matchIPv6FlowLabel(Integer.parseInt(fLabelString));
+ }
+
+ if (!isNullOrEmpty(icmp6TypeString)) {
+ byte icmp6Type = Icmp6Type.parseFromString(icmp6TypeString);
+ selectorBuilder.matchIcmpv6Type(icmp6Type);
+ }
+
+ if (!isNullOrEmpty(icmp6CodeString)) {
+ byte icmp6Code = Icmp6Code.parseFromString(icmp6CodeString);
+ selectorBuilder.matchIcmpv6Code(icmp6Code);
+ }
+
+ if (!isNullOrEmpty(ndTargetString)) {
+ selectorBuilder.matchIPv6NDTargetAddress(Ip6Address.valueOf(ndTargetString));
+ }
+
+ if (!isNullOrEmpty(ndSllString)) {
+ selectorBuilder.matchIPv6NDSourceLinkLayerAddress(MacAddress.valueOf(ndSllString));
+ }
+
+ if (!isNullOrEmpty(ndTllString)) {
+ selectorBuilder.matchIPv6NDTargetLinkLayerAddress(MacAddress.valueOf(ndTllString));
+ }
+
+ if (!isNullOrEmpty(srcTcpString)) {
+ selectorBuilder.matchTcpSrc(TpPort.tpPort(Integer.parseInt(srcTcpString)));
+ }
+
+ if (!isNullOrEmpty(dstTcpString)) {
+ selectorBuilder.matchTcpDst(TpPort.tpPort(Integer.parseInt(dstTcpString)));
+ }
+
+ if (extHdrStringList != null) {
+ short extHdr = 0;
+ for (String extHdrString : extHdrStringList) {
+ extHdr = (short) (extHdr | ExtHeader.parseFromString(extHdrString));
+ }
+ selectorBuilder.matchIPv6ExthdrFlags(extHdr);
+ }
+
+ return selectorBuilder.build();
+ }
+}
diff --git a/cli/src/main/resources/OSGI-INF/blueprint/shell-config.xml b/cli/src/main/resources/OSGI-INF/blueprint/shell-config.xml
index 7b990c1..e5bf7fb 100644
--- a/cli/src/main/resources/OSGI-INF/blueprint/shell-config.xml
+++ b/cli/src/main/resources/OSGI-INF/blueprint/shell-config.xml
@@ -876,6 +876,16 @@
</completers>
</command>
<command>
+ <action class="org.onosproject.cli.net.vnet.VirtualNetworkPacketRequestCommand"/>
+ <optional-completers>
+ <entry key="-t" value-ref="ethTypeCompleter"/>
+ <entry key="--ipProto" value-ref="ipProtocolCompleter"/>
+ <entry key="--icmp6Type" value-ref="Icmp6TypeCompleter"/>
+ <entry key="--icmp6Code" value-ref="Icmp6CodeCompleter"/>
+ <entry key="--extHdr" value-ref="ExtHeaderCompleter"/>
+ </optional-completers>
+ </command>
+ <command>
<action class="org.onosproject.cli.net.DpisListCommand"/>
</command>
diff --git a/core/common/src/main/java/org/onosproject/codec/impl/CodecManager.java b/core/common/src/main/java/org/onosproject/codec/impl/CodecManager.java
index 67066a0..44bdacd 100644
--- a/core/common/src/main/java/org/onosproject/codec/impl/CodecManager.java
+++ b/core/common/src/main/java/org/onosproject/codec/impl/CodecManager.java
@@ -82,6 +82,7 @@
import org.onosproject.net.meter.Band;
import org.onosproject.net.meter.Meter;
import org.onosproject.net.meter.MeterRequest;
+import org.onosproject.net.packet.PacketRequest;
import org.onosproject.net.region.Region;
import org.onosproject.net.statistic.Load;
import org.onosproject.net.topology.Topology;
@@ -169,6 +170,7 @@
registerCodec(FlowStatInfo.class, new FlowStatInfoCodec());
registerCodec(FilteredConnectPoint.class, new FilteredConnectPointCodec());
registerCodec(TransportEndpointDescription.class, new TransportEndpointDescriptionCodec());
+ registerCodec(PacketRequest.class, new PacketRequestCodec());
log.info("Started");
}
diff --git a/core/common/src/main/java/org/onosproject/codec/impl/PacketRequestCodec.java b/core/common/src/main/java/org/onosproject/codec/impl/PacketRequestCodec.java
new file mode 100644
index 0000000..a66d827
--- /dev/null
+++ b/core/common/src/main/java/org/onosproject/codec/impl/PacketRequestCodec.java
@@ -0,0 +1,110 @@
+/*
+ * Copyright 2016-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.codec.impl;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.onosproject.cluster.NodeId;
+import org.onosproject.codec.CodecContext;
+import org.onosproject.codec.JsonCodec;
+import org.onosproject.core.ApplicationId;
+import org.onosproject.core.CoreService;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.flow.TrafficSelector;
+import org.onosproject.net.packet.DefaultPacketRequest;
+import org.onosproject.net.packet.PacketPriority;
+import org.onosproject.net.packet.PacketRequest;
+
+import java.util.Optional;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.onlab.util.Tools.nullIsIllegal;
+
+/**
+ * Codec for the PacketRequest class.
+ */
+
+// TODO: Needs unit test
+
+public class PacketRequestCodec extends JsonCodec<PacketRequest> {
+
+ // JSON field names
+ static final String TRAFFIC_SELECTOR = "selector";
+ static final String PRIORITY = "priority";
+ static final String APP_ID = "appId";
+ static final String NODE_ID = "nodeId";
+ static final String DEVICE_ID = "deviceId";
+
+ private static final String NULL_OBJECT_MSG = "PacketRequest cannot be null";
+ private static final String MISSING_MEMBER_MSG = " member is required in PacketRequest";
+ public static final String REST_APP_ID = "org.onosproject.rest";
+
+ @Override
+ public ObjectNode encode(PacketRequest packetRequest, CodecContext context) {
+ checkNotNull(packetRequest, NULL_OBJECT_MSG);
+
+ final JsonCodec<TrafficSelector> trafficSelectorCodec =
+ context.codec(TrafficSelector.class);
+ final ObjectNode result = context.mapper().createObjectNode()
+ .put(NODE_ID, packetRequest.nodeId().toString())
+ .put(PRIORITY, packetRequest.priority().name())
+ .put(APP_ID, packetRequest.appId().toString());
+ if (packetRequest.deviceId().isPresent()) {
+ result.put(DEVICE_ID, packetRequest.deviceId().get().toString());
+ }
+
+ result.set(TRAFFIC_SELECTOR, trafficSelectorCodec.encode(packetRequest.selector(), context));
+
+ return result;
+ }
+
+ @Override
+ public PacketRequest decode(ObjectNode json, CodecContext context) {
+ if (json == null || !json.isObject()) {
+ return null;
+ }
+
+ final JsonCodec<TrafficSelector> trafficSelectorCodec =
+ context.codec(TrafficSelector.class);
+ TrafficSelector trafficSelector = trafficSelectorCodec.decode(
+ get(json, TRAFFIC_SELECTOR), context);
+ NodeId nodeId = NodeId.nodeId(extractMember(NODE_ID, json));
+ PacketPriority priority = PacketPriority.valueOf(extractMember(PRIORITY, json));
+
+ CoreService coreService = context.getService(CoreService.class);
+ // TODO check appId (currently hardcoded - should it be read from json node?)
+ ApplicationId appId = coreService.registerApplication(REST_APP_ID);
+
+ DeviceId deviceId = null;
+ JsonNode node = json.get(DEVICE_ID);
+ if (node != null) {
+ deviceId = DeviceId.deviceId(node.asText());
+ }
+
+ return new DefaultPacketRequest(trafficSelector, priority, appId, nodeId, Optional.ofNullable(deviceId));
+ }
+
+ /**
+ * Extract member from JSON ObjectNode.
+ *
+ * @param key key for which value is needed
+ * @param json JSON ObjectNode
+ * @return member value
+ */
+ private String extractMember(String key, ObjectNode json) {
+ return nullIsIllegal(json.get(key), key + MISSING_MEMBER_MSG).asText();
+ }
+}
diff --git a/incubator/net/src/test/java/org/onosproject/incubator/net/virtual/impl/VirtualNetworkManagerTest.java b/incubator/net/src/test/java/org/onosproject/incubator/net/virtual/impl/VirtualNetworkManagerTest.java
index 46b717e..44395ef 100644
--- a/incubator/net/src/test/java/org/onosproject/incubator/net/virtual/impl/VirtualNetworkManagerTest.java
+++ b/incubator/net/src/test/java/org/onosproject/incubator/net/virtual/impl/VirtualNetworkManagerTest.java
@@ -66,6 +66,7 @@
import org.onosproject.net.PortNumber;
import org.onosproject.net.device.DeviceService;
import org.onosproject.net.flow.FlowRuleService;
+import org.onosproject.net.flowobjective.FlowObjectiveService;
import org.onosproject.net.group.GroupService;
import org.onosproject.net.host.HostService;
import org.onosproject.net.intent.FakeIntentManager;
@@ -927,6 +928,7 @@
.add(VirtualNetworkFlowObjectiveStore.class, new SimpleVirtualFlowObjectiveStore());
validateServiceGetReturnsSavedInstance(virtualNetwork.id(), FlowRuleService.class);
+ validateServiceGetReturnsSavedInstance(virtualNetwork.id(), FlowObjectiveService.class);
validateServiceGetReturnsSavedInstance(virtualNetwork.id(), PacketService.class);
validateServiceGetReturnsSavedInstance(virtualNetwork.id(), GroupService.class);
}
diff --git a/incubator/net/src/test/java/org/onosproject/incubator/net/virtual/impl/VirtualNetworkPacketManagerTest.java b/incubator/net/src/test/java/org/onosproject/incubator/net/virtual/impl/VirtualNetworkPacketManagerTest.java
index 1cedd4a..184a30e 100644
--- a/incubator/net/src/test/java/org/onosproject/incubator/net/virtual/impl/VirtualNetworkPacketManagerTest.java
+++ b/incubator/net/src/test/java/org/onosproject/incubator/net/virtual/impl/VirtualNetworkPacketManagerTest.java
@@ -50,12 +50,12 @@
import org.onosproject.net.NetTestTools;
import org.onosproject.net.flow.DefaultTrafficSelector;
import org.onosproject.net.flow.DefaultTrafficTreatment;
+import org.onosproject.net.flow.FlowRule;
+import org.onosproject.net.flow.FlowRuleBatchOperation;
import org.onosproject.net.flow.TrafficSelector;
import org.onosproject.net.flowobjective.FlowObjectiveServiceAdapter;
import org.onosproject.net.flowobjective.ForwardingObjective;
import org.onosproject.net.flowobjective.Objective;
-import org.onosproject.net.flow.FlowRule;
-import org.onosproject.net.flow.FlowRuleBatchOperation;
import org.onosproject.net.intent.FakeIntentManager;
import org.onosproject.net.intent.TestableIntentService;
import org.onosproject.net.packet.DefaultOutboundPacket;
@@ -80,15 +80,18 @@
import static org.onosproject.net.packet.PacketPriority.CONTROL;
import static org.onosproject.net.packet.PacketPriority.REACTIVE;
+/**
+ * Junit tests for VirtualNetworkPacketManager using SimpleVirtualPacketStore.
+ */
public class VirtualNetworkPacketManagerTest extends VirtualNetworkTestUtil {
private static final int PROCESSOR_PRIORITY = 1;
- private VirtualNetworkManager manager;
- private DistributedVirtualNetworkStore virtualNetworkManagerStore;
+ protected VirtualNetworkManager manager;
+ protected DistributedVirtualNetworkStore virtualNetworkManagerStore;
private CoreService coreService = new TestCoreService();
private TestableIntentService intentService = new FakeIntentManager();
- private TestServiceDirectory testDirectory;
+ protected TestServiceDirectory testDirectory;
private EventDeliveryService eventDeliveryService;
private VirtualProviderManager providerRegistryService;
@@ -96,9 +99,9 @@
private VirtualNetwork vnet2;
private VirtualPacketProvider provider = new TestPacketProvider();
- private VirtualNetworkPacketStore packetStore = new SimpleVirtualPacketStore();
+ protected VirtualNetworkPacketStore packetStore = new SimpleVirtualPacketStore();
- private VirtualNetworkPacketManager packetManager1;
+ protected VirtualNetworkPacketManager packetManager1;
private VirtualNetworkPacketManager packetManager2;
private ApplicationId appId = new TestApplicationId("VirtualPacketManagerTest");
@@ -106,13 +109,13 @@
private VirtualFlowRuleProvider flowRuleProvider = new TestFlowRuleProvider();
private SimpleVirtualFlowRuleStore flowRuleStore;
private SimpleVirtualFlowObjectiveStore flowObjectiveStore;
+ protected StorageService storageService = new TestStorageService();
@Before
public void setUp() throws TestUtils.TestUtilsException {
virtualNetworkManagerStore = new DistributedVirtualNetworkStore();
TestUtils.setField(virtualNetworkManagerStore, "coreService", coreService);
- StorageService storageService = new TestStorageService();
TestUtils.setField(virtualNetworkManagerStore, "storageService", storageService);
virtualNetworkManagerStore.activate();
@@ -293,7 +296,7 @@
testFlowObjectiveService.validateObjectives(vnet1Devices, ts, CONTROL, REMOVE);
}
- private static OutboundPacket emittedPacket = null;
+ protected OutboundPacket emittedPacket = null;
/**
* Core service test class.
diff --git a/incubator/net/src/test/java/org/onosproject/incubator/net/virtual/impl/VirtualNetworkPacketManagerWithDistStoreTest.java b/incubator/net/src/test/java/org/onosproject/incubator/net/virtual/impl/VirtualNetworkPacketManagerWithDistStoreTest.java
new file mode 100644
index 0000000..0159ae2
--- /dev/null
+++ b/incubator/net/src/test/java/org/onosproject/incubator/net/virtual/impl/VirtualNetworkPacketManagerWithDistStoreTest.java
@@ -0,0 +1,116 @@
+/*
+ * Copyright 2017-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.incubator.net.virtual.impl;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.onlab.junit.TestUtils;
+import org.onlab.osgi.ComponentContextAdapter;
+import org.onosproject.cfg.ComponentConfigAdapter;
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.ClusterServiceAdapter;
+import org.onosproject.cluster.NodeId;
+import org.onosproject.incubator.store.virtual.impl.DistributedVirtualPacketStore;
+import org.onosproject.mastership.MastershipServiceAdapter;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.flow.DefaultTrafficTreatment;
+import org.onosproject.net.packet.DefaultOutboundPacket;
+import org.onosproject.net.packet.OutboundPacket;
+import org.onosproject.store.cluster.messaging.ClusterCommunicationServiceAdapter;
+import org.onosproject.store.cluster.messaging.MessageSubject;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+
+import static org.junit.Assert.assertNull;
+
+/**
+ * Junit tests for VirtualNetworkPacketManager using DistributedVirtualPacketStore..
+ * This test class extends VirtualNetworkPacketManagerTest - all the tests defined in
+ * VirtualNetworkPacketManagerTest will run using DistributedVirtualPacketStore.
+ */
+public class VirtualNetworkPacketManagerWithDistStoreTest extends VirtualNetworkPacketManagerTest {
+
+ private DistributedVirtualPacketStore distStore;
+ private ClusterService clusterService = new ClusterServiceAdapter();
+
+ @Before
+ public void setUp() throws TestUtils.TestUtilsException {
+ setUpDistPacketStore();
+ super.setUp();
+ TestUtils.setField(packetManager1, "storageService", storageService);
+ }
+
+ private void setUpDistPacketStore() throws TestUtils.TestUtilsException {
+ distStore = new DistributedVirtualPacketStore();
+ TestUtils.setField(distStore, "cfgService", new ComponentConfigAdapter());
+ TestUtils.setField(distStore, "storageService", storageService);
+ TestUtils.setField(distStore, "clusterService", clusterService);
+ TestUtils.setField(distStore, "communicationService", new TestClusterCommunicationService());
+ TestUtils.setField(distStore, "mastershipService", new TestMastershipService());
+
+ distStore.activate(new ComponentContextAdapter());
+ packetStore = distStore; // super.setUp() will cause Distributed store to be used.
+ }
+
+ @After
+ public void tearDown() {
+ distStore.deactivate();
+ }
+
+ @Override
+ @Test
+ @Ignore("Ignore until there is MastershipService support for virtual devices")
+ public void emitTest() {
+ super.emitTest();
+ }
+
+ /**
+ * Tests the correct usage of emit() for a outbound packet - master of packet's
+ * sendThrough is not local node.
+ */
+ @Test
+ @Ignore("Ignore until there is MastershipService support for virtual devices")
+ public void emit2Test() {
+ OutboundPacket packet =
+ new DefaultOutboundPacket(VDID2, DefaultTrafficTreatment.emptyTreatment(), ByteBuffer.allocate(5));
+ packetManager1.emit(packet);
+ assertNull("Packet should not have been emmitted", emittedPacket);
+ }
+
+ private final class TestMastershipService extends MastershipServiceAdapter {
+ @Override
+ public NodeId getMasterFor(DeviceId deviceId) {
+ if (VDID1.equals(deviceId)) {
+ return clusterService.getLocalNode().id();
+ }
+ return new NodeId("abc");
+ }
+ }
+
+ private final class TestClusterCommunicationService extends ClusterCommunicationServiceAdapter {
+ @Override
+ public <M> CompletableFuture<Void> unicast(M message, MessageSubject subject,
+ Function<M, byte[]> encoder, NodeId toNodeId) {
+ return new CompletableFuture<>();
+ }
+ }
+
+}
diff --git a/incubator/store/src/main/java/org/onosproject/incubator/store/virtual/impl/DistributedVirtualPacketStore.java b/incubator/store/src/main/java/org/onosproject/incubator/store/virtual/impl/DistributedVirtualPacketStore.java
new file mode 100644
index 0000000..18a8cc3
--- /dev/null
+++ b/incubator/store/src/main/java/org/onosproject/incubator/store/virtual/impl/DistributedVirtualPacketStore.java
@@ -0,0 +1,414 @@
+/*
+ * Copyright 2017-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.incubator.store.virtual.impl;
+
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Modified;
+import org.apache.felix.scr.annotations.Property;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.apache.felix.scr.annotations.Service;
+import org.onlab.util.KryoNamespace;
+import org.onosproject.cfg.ComponentConfigService;
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.NodeId;
+import org.onosproject.incubator.net.virtual.NetworkId;
+import org.onosproject.incubator.net.virtual.VirtualNetworkPacketStore;
+import org.onosproject.mastership.MastershipService;
+import org.onosproject.net.flow.TrafficSelector;
+import org.onosproject.net.packet.OutboundPacket;
+import org.onosproject.net.packet.PacketEvent;
+import org.onosproject.net.packet.PacketPriority;
+import org.onosproject.net.packet.PacketRequest;
+import org.onosproject.net.packet.PacketStoreDelegate;
+import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
+import org.onosproject.store.cluster.messaging.MessageSubject;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.serializers.StoreSerializer;
+import org.onosproject.store.service.ConsistentMap;
+import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.StorageService;
+import org.osgi.service.component.ComponentContext;
+import org.slf4j.Logger;
+
+import java.util.Dictionary;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Strings.isNullOrEmpty;
+import static java.util.concurrent.Executors.newFixedThreadPool;
+import static org.onlab.util.Tools.get;
+import static org.onlab.util.Tools.groupedThreads;
+import static org.onosproject.net.packet.PacketEvent.Type.EMIT;
+import static org.slf4j.LoggerFactory.getLogger;
+
+/**
+ * Distributed virtual packet store implementation allowing packets to be sent to
+ * remote instances. Implementation is based on DistributedPacketStore class.
+ */
+@Component(immediate = true, enabled = false)
+@Service
+public class DistributedVirtualPacketStore
+ extends AbstractVirtualStore<PacketEvent, PacketStoreDelegate>
+ implements VirtualNetworkPacketStore {
+
+ private final Logger log = getLogger(getClass());
+
+ private static final String FORMAT = "Setting: messageHandlerThreadPoolSize={}";
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected MastershipService mastershipService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected ClusterService clusterService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected ClusterCommunicationService communicationService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected StorageService storageService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected ComponentConfigService cfgService;
+
+ private PacketRequestTracker tracker;
+
+ private static final MessageSubject PACKET_OUT_SUBJECT =
+ new MessageSubject("virtual-packet-out");
+
+ private static final StoreSerializer SERIALIZER = StoreSerializer.using(KryoNamespaces.API);
+
+ private ExecutorService messageHandlingExecutor;
+
+ private static final int DEFAULT_MESSAGE_HANDLER_THREAD_POOL_SIZE = 4;
+ @Property(name = "messageHandlerThreadPoolSize", intValue = DEFAULT_MESSAGE_HANDLER_THREAD_POOL_SIZE,
+ label = "Size of thread pool to assign message handler")
+ private static int messageHandlerThreadPoolSize = DEFAULT_MESSAGE_HANDLER_THREAD_POOL_SIZE;
+
+ @Activate
+ public void activate(ComponentContext context) {
+ cfgService.registerProperties(getClass());
+
+ modified(context);
+
+ messageHandlingExecutor = Executors.newFixedThreadPool(
+ messageHandlerThreadPoolSize,
+ groupedThreads("onos/store/packet", "message-handlers", log));
+
+ communicationService.<OutboundPacketWrapper>addSubscriber(PACKET_OUT_SUBJECT,
+ SERIALIZER::decode,
+ packetWrapper -> notifyDelegate(packetWrapper.networkId,
+ new PacketEvent(EMIT,
+ packetWrapper.outboundPacket)),
+ messageHandlingExecutor);
+
+ tracker = new PacketRequestTracker();
+
+ log.info("Started");
+ }
+
+ @Deactivate
+ public void deactivate() {
+ cfgService.unregisterProperties(getClass(), false);
+ communicationService.removeSubscriber(PACKET_OUT_SUBJECT);
+ messageHandlingExecutor.shutdown();
+ tracker = null;
+ log.info("Stopped");
+ }
+
+ @Modified
+ public void modified(ComponentContext context) {
+ Dictionary<?, ?> properties = context != null ? context.getProperties() : new Properties();
+
+ int newMessageHandlerThreadPoolSize;
+
+ try {
+ String s = get(properties, "messageHandlerThreadPoolSize");
+
+ newMessageHandlerThreadPoolSize =
+ isNullOrEmpty(s) ? messageHandlerThreadPoolSize : Integer.parseInt(s.trim());
+
+ } catch (NumberFormatException e) {
+ log.warn(e.getMessage());
+ newMessageHandlerThreadPoolSize = messageHandlerThreadPoolSize;
+ }
+
+ // Any change in the following parameters implies thread pool restart
+ if (newMessageHandlerThreadPoolSize != messageHandlerThreadPoolSize) {
+ setMessageHandlerThreadPoolSize(newMessageHandlerThreadPoolSize);
+ restartMessageHandlerThreadPool();
+ }
+
+ log.info(FORMAT, messageHandlerThreadPoolSize);
+ }
+
+ @Override
+ public void emit(NetworkId networkId, OutboundPacket packet) {
+ NodeId myId = clusterService.getLocalNode().id();
+ // TODO revive this when there is MastershipService support for virtual devices
+// NodeId master = mastershipService.getMasterFor(packet.sendThrough());
+//
+// if (master == null) {
+// log.warn("No master found for {}", packet.sendThrough());
+// return;
+// }
+//
+// log.debug("master {} found for {}", myId, packet.sendThrough());
+// if (myId.equals(master)) {
+// notifyDelegate(networkId, new PacketEvent(EMIT, packet));
+// return;
+// }
+//
+// communicationService.unicast(packet, PACKET_OUT_SUBJECT, SERIALIZER::encode, master)
+// .whenComplete((r, error) -> {
+// if (error != null) {
+// log.warn("Failed to send packet-out to {}", master, error);
+// }
+// });
+ }
+
+ @Override
+ public void requestPackets(NetworkId networkId, PacketRequest request) {
+ tracker.add(networkId, request);
+
+ }
+
+ @Override
+ public void cancelPackets(NetworkId networkId, PacketRequest request) {
+ tracker.remove(networkId, request);
+ }
+
+ @Override
+ public List<PacketRequest> existingRequests(NetworkId networkId) {
+ return tracker.requests(networkId);
+ }
+
+ private final class PacketRequestTracker {
+
+ private ConsistentMap<NetworkId, Map<RequestKey, Set<PacketRequest>>> distRequests;
+ private Map<NetworkId, Map<RequestKey, Set<PacketRequest>>> requests;
+
+ private PacketRequestTracker() {
+ distRequests = storageService.<NetworkId, Map<RequestKey, Set<PacketRequest>>>consistentMapBuilder()
+ .withName("onos-virtual-packet-requests")
+ .withSerializer(Serializer.using(KryoNamespace.newBuilder()
+ .register(KryoNamespaces.API)
+ .register(RequestKey.class)
+ .register(NetworkId.class)
+ .build()))
+ .build();
+ requests = distRequests.asJavaMap();
+ }
+
+ private void add(NetworkId networkId, PacketRequest request) {
+ AtomicBoolean firstRequest = addInternal(networkId, request);
+ PacketStoreDelegate delegate = delegateMap.get(networkId);
+ if (firstRequest.get() && delegate != null) {
+ // The instance that makes the first request will push to all devices
+ delegate.requestPackets(request);
+ }
+ }
+
+ private AtomicBoolean addInternal(NetworkId networkId, PacketRequest request) {
+ AtomicBoolean firstRequest = new AtomicBoolean(false);
+ AtomicBoolean changed = new AtomicBoolean(true);
+ Map<RequestKey, Set<PacketRequest>> requestsForNetwork = getMap(networkId);
+ requestsForNetwork.compute(key(request), (s, existingRequests) -> {
+ // Reset to false just in case this is a retry due to
+ // ConcurrentModificationException
+ firstRequest.set(false);
+ if (existingRequests == null) {
+ firstRequest.set(true);
+ return ImmutableSet.of(request);
+ } else if (!existingRequests.contains(request)) {
+ firstRequest.set(true);
+ return ImmutableSet.<PacketRequest>builder()
+ .addAll(existingRequests)
+ .add(request)
+ .build();
+ } else {
+ changed.set(false);
+ return existingRequests;
+ }
+ });
+ if (changed.get()) {
+ requests.put(networkId, requestsForNetwork);
+ }
+ return firstRequest;
+ }
+
+ private void remove(NetworkId networkId, PacketRequest request) {
+ AtomicBoolean removedLast = removeInternal(networkId, request);
+ PacketStoreDelegate delegate = delegateMap.get(networkId);
+ if (removedLast.get() && delegate != null) {
+ // The instance that removes the last request will remove from all devices
+ delegate.cancelPackets(request);
+ }
+ }
+
+ private AtomicBoolean removeInternal(NetworkId networkId, PacketRequest request) {
+ AtomicBoolean removedLast = new AtomicBoolean(false);
+ AtomicBoolean changed = new AtomicBoolean(true);
+ Map<RequestKey, Set<PacketRequest>> requestsForNetwork = getMap(networkId);
+ requestsForNetwork.computeIfPresent(key(request), (s, existingRequests) -> {
+ // Reset to false just in case this is a retry due to
+ // ConcurrentModificationException
+ removedLast.set(false);
+ if (existingRequests.contains(request)) {
+ Set<PacketRequest> newRequests = Sets.newHashSet(existingRequests);
+ newRequests.remove(request);
+ if (newRequests.size() > 0) {
+ return ImmutableSet.copyOf(newRequests);
+ } else {
+ removedLast.set(true);
+ return null;
+ }
+ } else {
+ changed.set(false);
+ return existingRequests;
+ }
+ });
+ if (changed.get()) {
+ requests.put(networkId, requestsForNetwork);
+ }
+ return removedLast;
+ }
+
+ private List<PacketRequest> requests(NetworkId networkId) {
+ Map<RequestKey, Set<PacketRequest>> requestsForNetwork = getMap(networkId);
+ List<PacketRequest> list = Lists.newArrayList();
+ requestsForNetwork.values().forEach(v -> list.addAll(v));
+ list.sort((o1, o2) -> o1.priority().priorityValue() - o2.priority().priorityValue());
+ return list;
+ }
+
+ /*
+ * Gets PacketRequests for specified networkId.
+ */
+ private Map<RequestKey, Set<PacketRequest>> getMap(NetworkId networkId) {
+ return requests.computeIfAbsent(networkId, networkId1 -> {
+ log.debug("Creating new map for {}", networkId1);
+ Map newMap = Maps.newHashMap();
+ return newMap;
+ });
+ }
+ }
+
+ /**
+ * Creates a new request key from a packet request.
+ *
+ * @param request packet request
+ * @return request key
+ */
+ private static RequestKey key(PacketRequest request) {
+ return new RequestKey(request.selector(), request.priority());
+ }
+
+ /**
+ * Key of a packet request.
+ */
+ private static final class RequestKey {
+ private final TrafficSelector selector;
+ private final PacketPriority priority;
+
+ private RequestKey(TrafficSelector selector, PacketPriority priority) {
+ this.selector = selector;
+ this.priority = priority;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(selector, priority);
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (other == this) {
+ return true;
+ }
+
+ if (!(other instanceof RequestKey)) {
+ return false;
+ }
+
+ RequestKey that = (RequestKey) other;
+
+ return Objects.equals(selector, that.selector) &&
+ Objects.equals(priority, that.priority);
+ }
+ }
+
+ private static OutboundPacketWrapper wrapper(NetworkId networkId, OutboundPacket outboundPacket) {
+ return new OutboundPacketWrapper(networkId, outboundPacket);
+ }
+
+ /*
+ * OutboundPacket in
+ */
+ private static final class OutboundPacketWrapper {
+ private NetworkId networkId;
+ private OutboundPacket outboundPacket;
+
+ private OutboundPacketWrapper(NetworkId networkId, OutboundPacket outboundPacket) {
+ this.networkId = networkId;
+ this.outboundPacket = outboundPacket;
+ }
+
+ }
+
+ /**
+ * Sets thread pool size of message handler.
+ *
+ * @param poolSize
+ */
+ private void setMessageHandlerThreadPoolSize(int poolSize) {
+ checkArgument(poolSize >= 0, "Message handler pool size must be 0 or more");
+ messageHandlerThreadPoolSize = poolSize;
+ }
+
+ /**
+ * Restarts thread pool of message handler.
+ */
+ private void restartMessageHandlerThreadPool() {
+ ExecutorService prevExecutor = messageHandlingExecutor;
+ messageHandlingExecutor = newFixedThreadPool(getMessageHandlerThreadPoolSize(),
+ groupedThreads("DistPktStore", "messageHandling-%d", log));
+ prevExecutor.shutdown();
+ }
+
+ /**
+ * Gets current thread pool size of message handler.
+ *
+ * @return messageHandlerThreadPoolSize
+ */
+ private int getMessageHandlerThreadPoolSize() {
+ return messageHandlerThreadPoolSize;
+ }
+}
diff --git a/incubator/store/src/main/java/org/onosproject/incubator/store/virtual/impl/SimpleVirtualPacketStore.java b/incubator/store/src/main/java/org/onosproject/incubator/store/virtual/impl/SimpleVirtualPacketStore.java
index 01e8a9d..84caf80 100644
--- a/incubator/store/src/main/java/org/onosproject/incubator/store/virtual/impl/SimpleVirtualPacketStore.java
+++ b/incubator/store/src/main/java/org/onosproject/incubator/store/virtual/impl/SimpleVirtualPacketStore.java
@@ -118,8 +118,10 @@
@Override
public List<PacketRequest> existingRequests(NetworkId networkId) {
List<PacketRequest> list = Lists.newArrayList();
- requests.get(networkId).values().forEach(list::addAll);
- list.sort((o1, o2) -> o1.priority().priorityValue() - o2.priority().priorityValue());
+ if (requests.get(networkId) != null) {
+ requests.get(networkId).values().forEach(list::addAll);
+ list.sort((o1, o2) -> o1.priority().priorityValue() - o2.priority().priorityValue());
+ }
return list;
}
}