Initial implementation of svc IP to pod IP translation using DNAT
Change-Id: I6e2f6936636e929ad60150cc67aa6316eef32911
diff --git a/apps/k8s-networking/api/src/main/java/org/onosproject/k8snetworking/api/Constants.java b/apps/k8s-networking/api/src/main/java/org/onosproject/k8snetworking/api/Constants.java
index 763bae6..647c8e7 100644
--- a/apps/k8s-networking/api/src/main/java/org/onosproject/k8snetworking/api/Constants.java
+++ b/apps/k8s-networking/api/src/main/java/org/onosproject/k8snetworking/api/Constants.java
@@ -70,6 +70,7 @@
public static final int CT_TABLE = 45;
public static final int ACL_RECIRC_TABLE = 43;
public static final int JUMP_TABLE = 50;
+ public static final int NAT_TABLE = 51;
public static final int ROUTING_TABLE = 60;
public static final int STAT_OUTBOUND_TABLE = 70;
public static final int VTAP_OUTBOUND_TABLE = 71;
diff --git a/apps/k8s-networking/api/src/main/java/org/onosproject/k8snetworking/api/K8sGroupRuleService.java b/apps/k8s-networking/api/src/main/java/org/onosproject/k8snetworking/api/K8sGroupRuleService.java
new file mode 100644
index 0000000..fc935c0
--- /dev/null
+++ b/apps/k8s-networking/api/src/main/java/org/onosproject/k8snetworking/api/K8sGroupRuleService.java
@@ -0,0 +1,42 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.k8snetworking.api;
+
+import org.onosproject.core.ApplicationId;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.group.GroupBucket;
+import org.onosproject.net.group.GroupDescription.Type;
+
+import java.util.List;
+
+/**
+ * Service for setting group rules.
+ */
+public interface K8sGroupRuleService {
+
+ /**
+ * Configures the group table rule.
+ *
+ * @param appId application ID
+ * @param deviceId device ID
+ * @param groupId group ID
+ * @param type group type
+ * @param buckets a list of group buckets
+ * @param install true for rule addition, false for rule removal
+ */
+ void setRule(ApplicationId appId, DeviceId deviceId, int groupId,
+ Type type, List<GroupBucket> buckets, boolean install);
+}
diff --git a/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/cli/K8sEndpointsListCommand.java b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/cli/K8sEndpointsListCommand.java
index 6b4c997..018cfb2 100644
--- a/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/cli/K8sEndpointsListCommand.java
+++ b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/cli/K8sEndpointsListCommand.java
@@ -34,6 +34,7 @@
public class K8sEndpointsListCommand extends AbstractShellCommand {
private static final String FORMAT = "%-50s%-50s%-20s";
+ private static final String PORT_PROTOCOL_SEPARATOR = "/";
@Override
protected void doExecute() {
@@ -46,17 +47,18 @@
for (Endpoints endpoints : endpointses) {
List<String> ips = Lists.newArrayList();
- List<Integer> ports = Lists.newArrayList();
+ List<String> portWithProtocol = Lists.newArrayList();
endpoints.getSubsets().forEach(e -> {
e.getAddresses().forEach(a -> ips.add(a.getIp()));
- e.getPorts().forEach(p -> ports.add(p.getPort()));
+ e.getPorts().forEach(p -> portWithProtocol.add(p.getPort() +
+ PORT_PROTOCOL_SEPARATOR + p.getProtocol()));
});
print(FORMAT,
endpoints.getMetadata().getName(),
ips.isEmpty() ? "" : ips,
- ports.isEmpty() ? "" : ports);
+ portWithProtocol.isEmpty() ? "" : portWithProtocol);
}
}
}
diff --git a/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/cli/K8sPodListCommand.java b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/cli/K8sPodListCommand.java
index c1bb526..a144397 100644
--- a/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/cli/K8sPodListCommand.java
+++ b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/cli/K8sPodListCommand.java
@@ -41,7 +41,7 @@
List<Pod> pods = Lists.newArrayList(service.pods());
pods.sort(Comparator.comparing(p -> p.getMetadata().getName()));
- print(FORMAT, "Name", "Namespace", "IP", "Containers");
+ print(FORMAT, "Name", "Namespace", "IP Address", "Containers");
for (Pod pod : pods) {
diff --git a/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/cli/K8sPortListCommand.java b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/cli/K8sPortListCommand.java
index 28cd8a7..e344971 100644
--- a/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/cli/K8sPortListCommand.java
+++ b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/cli/K8sPortListCommand.java
@@ -59,7 +59,7 @@
if (outputJson()) {
print("%s", json(ports));
} else {
- print(FORMAT, "ID", "Network", "MAC", "Fixed IPs");
+ print(FORMAT, "ID", "Network", "MAC Address", "Fixed IPs");
for (K8sPort port: ports) {
K8sNetwork k8sNet = service.network(port.networkId());
print(FORMAT, port.portId(),
diff --git a/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/cli/K8sServiceListCommand.java b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/cli/K8sServiceListCommand.java
index 800a1ab..ade0b01 100644
--- a/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/cli/K8sServiceListCommand.java
+++ b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/cli/K8sServiceListCommand.java
@@ -33,6 +33,7 @@
public class K8sServiceListCommand extends AbstractShellCommand {
private static final String FORMAT = "%-50s%-30s%-30s";
+ private static final String PORT_PROTOCOL_SEPARATOR = "/";
@Override
protected void doExecute() {
@@ -45,14 +46,16 @@
for (io.fabric8.kubernetes.api.model.Service svc : services) {
- List<Integer> ports = Lists.newArrayList();
+ List<String> portWithProtocol = Lists.newArrayList();
- svc.getSpec().getPorts().forEach(p -> ports.add(p.getPort()));
+ svc.getSpec().getPorts().forEach(p ->
+ portWithProtocol.add(p.getPort() +
+ PORT_PROTOCOL_SEPARATOR + p.getProtocol()));
print(FORMAT,
svc.getMetadata().getName(),
svc.getSpec().getClusterIP(),
- ports.isEmpty() ? "" : ports);
+ portWithProtocol.isEmpty() ? "" : portWithProtocol);
}
}
}
diff --git a/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sApiServerProxyHandler.java b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sApiServerProxyHandler.java
deleted file mode 100644
index ef1acaa..0000000
--- a/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sApiServerProxyHandler.java
+++ /dev/null
@@ -1,217 +0,0 @@
-/*
- * Copyright 2019-present Open Networking Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.k8snetworking.impl;
-
-import org.onlab.packet.Ethernet;
-import org.onlab.packet.IPv4;
-import org.onlab.packet.IpAddress;
-import org.onlab.packet.IpPrefix;
-import org.onlab.packet.TpPort;
-import org.onosproject.cluster.ClusterService;
-import org.onosproject.cluster.LeadershipService;
-import org.onosproject.cluster.NodeId;
-import org.onosproject.core.ApplicationId;
-import org.onosproject.core.CoreService;
-import org.onosproject.k8snetworking.api.K8sFlowRuleService;
-import org.onosproject.k8snode.api.K8sNode;
-import org.onosproject.k8snode.api.K8sNodeEvent;
-import org.onosproject.k8snode.api.K8sNodeListener;
-import org.onosproject.k8snode.api.K8sNodeService;
-import org.onosproject.net.PortNumber;
-import org.onosproject.net.flow.DefaultTrafficSelector;
-import org.onosproject.net.flow.DefaultTrafficTreatment;
-import org.onosproject.net.flow.TrafficSelector;
-import org.onosproject.net.flow.TrafficTreatment;
-import org.onosproject.net.packet.PacketService;
-import org.osgi.service.component.annotations.Activate;
-import org.osgi.service.component.annotations.Component;
-import org.osgi.service.component.annotations.Deactivate;
-import org.osgi.service.component.annotations.Reference;
-import org.osgi.service.component.annotations.ReferenceCardinality;
-import org.slf4j.Logger;
-
-import java.util.Objects;
-import java.util.concurrent.ExecutorService;
-
-import static java.util.concurrent.Executors.newSingleThreadExecutor;
-import static org.onlab.util.Tools.groupedThreads;
-import static org.onosproject.k8snetworking.api.Constants.FORWARDING_TABLE;
-import static org.onosproject.k8snetworking.api.Constants.K8S_NETWORKING_APP_ID;
-import static org.onosproject.k8snetworking.api.Constants.PRIORITY_TRANSLATION_RULE;
-import static org.onosproject.k8snetworking.api.Constants.STAT_OUTBOUND_TABLE;
-import static org.slf4j.LoggerFactory.getLogger;
-
-/**
- * Handles kubernetes API server requests from pods.
- */
-@Component(immediate = true)
-public class K8sApiServerProxyHandler {
- protected final Logger log = getLogger(getClass());
-
- private static final String API_SERVER_CLUSTER_IP = "10.96.0.1";
- private static final int API_SERVER_CLUSTER_PORT = 443;
- private static final String API_SERVER_IP = "10.10.10.1";
- private static final int API_SERVER_PORT = 6443;
- private static final int PREFIX_LENGTH = 32;
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY)
- protected CoreService coreService;
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY)
- protected PacketService packetService;
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY)
- protected ClusterService clusterService;
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY)
- protected LeadershipService leadershipService;
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY)
- protected K8sNodeService k8sNodeService;
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY)
- protected K8sFlowRuleService k8sFlowRuleService;
-
- private final ExecutorService eventExecutor = newSingleThreadExecutor(
- groupedThreads(this.getClass().getSimpleName(), "event-handler", log));
- private final K8sNodeListener k8sNodeListener = new InternalNodeEventListener();
-
- private ApplicationId appId;
- private NodeId localNodeId;
-
- @Activate
- protected void activate() {
- appId = coreService.registerApplication(K8S_NETWORKING_APP_ID);
- localNodeId = clusterService.getLocalNode().id();
- k8sNodeService.addListener(k8sNodeListener);
- leadershipService.runForLeadership(appId.name());
-
- log.info("Started");
- }
-
- @Deactivate
- protected void deactivate() {
- k8sNodeService.removeListener(k8sNodeListener);
- leadershipService.withdraw(appId.name());
- eventExecutor.shutdown();
-
- log.info("Stopped");
- }
-
- private class InternalNodeEventListener implements K8sNodeListener {
-
- private boolean isRelevantHelper() {
- return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
- }
-
- @Override
- public void event(K8sNodeEvent event) {
- K8sNode k8sNode = event.subject();
- switch (event.type()) {
- case K8S_NODE_COMPLETE:
- eventExecutor.execute(() -> processNodeCompletion(k8sNode));
- break;
- case K8S_NODE_INCOMPLETE:
- eventExecutor.execute(() -> processNodeIncompletion(k8sNode));
- break;
- default:
- break;
- }
- }
-
- private void processNodeCompletion(K8sNode k8sNode) {
- if (!isRelevantHelper()) {
- return;
- }
-
- setRequestTranslationRule(k8sNode, true);
- setResponseTranslationRule(k8sNode, true);
- }
-
- private void processNodeIncompletion(K8sNode k8sNode) {
- if (!isRelevantHelper()) {
- return;
- }
-
- setRequestTranslationRule(k8sNode, false);
- setResponseTranslationRule(k8sNode, false);
- }
-
- /**
- * Installs k8s API server rule for receiving all API request packets.
- *
- * @param k8sNode kubernetes node
- * @param install installation flag
- */
- private void setRequestTranslationRule(K8sNode k8sNode, boolean install) {
- TrafficSelector selector = DefaultTrafficSelector.builder()
- .matchEthType(Ethernet.TYPE_IPV4)
- .matchIPProtocol(IPv4.PROTOCOL_TCP)
- .matchIPDst(IpPrefix.valueOf(
- IpAddress.valueOf(API_SERVER_CLUSTER_IP), PREFIX_LENGTH))
- .matchTcpDst(TpPort.tpPort(API_SERVER_CLUSTER_PORT))
- .build();
-
- TrafficTreatment treatment = DefaultTrafficTreatment.builder()
- .setIpDst(IpAddress.valueOf(API_SERVER_IP))
- .setTcpDst(TpPort.tpPort(API_SERVER_PORT))
- .setOutput(PortNumber.LOCAL)
- .build();
-
- k8sFlowRuleService.setRule(
- appId,
- k8sNode.intgBridge(),
- selector,
- treatment,
- PRIORITY_TRANSLATION_RULE,
- STAT_OUTBOUND_TABLE,
- install
- );
- }
-
- /**
- * Installs k8s API server rule for receiving all API response packets.
- *
- * @param k8sNode kubernetes node
- * @param install installation flag
- */
- private void setResponseTranslationRule(K8sNode k8sNode, boolean install) {
- TrafficSelector selector = DefaultTrafficSelector.builder()
- .matchEthType(Ethernet.TYPE_IPV4)
- .matchIPProtocol(IPv4.PROTOCOL_TCP)
- .matchIPSrc(IpPrefix.valueOf(
- IpAddress.valueOf(API_SERVER_IP), PREFIX_LENGTH))
- .matchTcpSrc(TpPort.tpPort(API_SERVER_PORT))
- .build();
-
- TrafficTreatment treatment = DefaultTrafficTreatment.builder()
- .setIpSrc(IpAddress.valueOf(API_SERVER_CLUSTER_IP))
- .setTcpSrc(TpPort.tpPort(API_SERVER_CLUSTER_PORT))
- .transition(FORWARDING_TABLE)
- .build();
-
- k8sFlowRuleService.setRule(
- appId,
- k8sNode.intgBridge(),
- selector,
- treatment,
- PRIORITY_TRANSLATION_RULE,
- STAT_OUTBOUND_TABLE,
- install
- );
- }
- }
-}
diff --git a/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sFlowRuleManager.java b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sFlowRuleManager.java
index 9bfd8ea..e62b6a1 100644
--- a/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sFlowRuleManager.java
+++ b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sFlowRuleManager.java
@@ -230,7 +230,10 @@
// we need JUMP table for bypassing routing table which contains large
// amount of flow rules which might cause performance degradation during
// table lookup
- setupJumpTable(k8sNode);
+ // setupJumpTable(k8sNode);
+
+ // for routing and outbound table transition
+ connectTables(deviceId, ROUTING_TABLE, STAT_OUTBOUND_TABLE);
// for outbound table transition
connectTables(deviceId, STAT_OUTBOUND_TABLE, VTAP_OUTBOUND_TABLE);
@@ -276,34 +279,14 @@
applyRule(flowRule, true);
}
- private void setupHostGwRule(K8sNetwork k8sNetwork) {
- TrafficSelector.Builder sBuilder = DefaultTrafficSelector.builder();
- sBuilder.matchEthType(Ethernet.TYPE_IPV4)
- .matchIPDst(IpPrefix.valueOf(k8sNetwork.gatewayIp(), 32));
-
- TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder();
- tBuilder.setOutput(PortNumber.LOCAL);
-
- for (K8sNode node : k8sNodeService.completeNodes()) {
- FlowRule flowRule = DefaultFlowRule.builder()
- .forDevice(node.intgBridge())
- .withSelector(sBuilder.build())
- .withTreatment(tBuilder.build())
- .withPriority(HIGH_PRIORITY)
- .fromApp(appId)
- .makePermanent()
- .forTable(JUMP_TABLE)
- .build();
- applyRule(flowRule, true);
- }
-
- sBuilder = DefaultTrafficSelector.builder();
- sBuilder.matchEthType(Ethernet.TYPE_IPV4)
+ private void setupHostRoutingRule(K8sNetwork k8sNetwork) {
+ TrafficSelector.Builder sBuilder = DefaultTrafficSelector.builder()
+ .matchEthType(Ethernet.TYPE_IPV4)
.matchIPSrc(IpPrefix.valueOf(k8sNetwork.gatewayIp(), 32))
.matchIPDst(IpPrefix.valueOf(k8sNetwork.cidr()));
- tBuilder = DefaultTrafficTreatment.builder();
- tBuilder.setTunnelId(Long.valueOf(k8sNetwork.segmentId()))
+ TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder()
+ .setTunnelId(Long.valueOf(k8sNetwork.segmentId()))
.transition(STAT_OUTBOUND_TABLE);
for (K8sNode node : k8sNodeService.completeNodes()) {
@@ -314,7 +297,29 @@
.withPriority(HIGH_PRIORITY)
.fromApp(appId)
.makePermanent()
- .forTable(JUMP_TABLE)
+ .forTable(ROUTING_TABLE)
+ .build();
+ applyRule(flowRule, true);
+ }
+ }
+
+ private void setupGatewayRoutingRule(K8sNetwork k8sNetwork) {
+ TrafficSelector.Builder sBuilder = DefaultTrafficSelector.builder()
+ .matchEthType(Ethernet.TYPE_IPV4)
+ .matchIPDst(IpPrefix.valueOf(k8sNetwork.gatewayIp(), 32));
+
+ TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder()
+ .setOutput(PortNumber.LOCAL);
+
+ for (K8sNode node : k8sNodeService.completeNodes()) {
+ FlowRule flowRule = DefaultFlowRule.builder()
+ .forDevice(node.intgBridge())
+ .withSelector(sBuilder.build())
+ .withTreatment(tBuilder.build())
+ .withPriority(HIGH_PRIORITY)
+ .fromApp(appId)
+ .makePermanent()
+ .forTable(ROUTING_TABLE)
.build();
applyRule(flowRule, true);
}
@@ -346,7 +351,10 @@
}
initializePipeline(node);
- k8sNetworkService.networks().forEach(K8sFlowRuleManager.this::setupHostGwRule);
+ k8sNetworkService.networks().forEach(n -> {
+ setupHostRoutingRule(n);
+ setupGatewayRoutingRule(n);
+ });
}
}
@@ -375,7 +383,8 @@
return;
}
- setupHostGwRule(network);
+ setupHostRoutingRule(network);
+ setupGatewayRoutingRule(network);
}
}
}
diff --git a/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sGroupRuleManager.java b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sGroupRuleManager.java
new file mode 100644
index 0000000..aa2b90f
--- /dev/null
+++ b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sGroupRuleManager.java
@@ -0,0 +1,82 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.k8snetworking.impl;
+
+import org.onosproject.core.ApplicationId;
+import org.onosproject.core.CoreService;
+import org.onosproject.k8snetworking.api.K8sGroupRuleService;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.group.DefaultGroupDescription;
+import org.onosproject.net.group.GroupBucket;
+import org.onosproject.net.group.GroupBuckets;
+import org.onosproject.net.group.GroupDescription;
+import org.onosproject.net.group.GroupDescription.Type;
+import org.onosproject.net.group.GroupService;
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Deactivate;
+import org.osgi.service.component.annotations.Reference;
+import org.osgi.service.component.annotations.ReferenceCardinality;
+import org.slf4j.Logger;
+
+import java.util.List;
+
+import static org.onosproject.k8snetworking.api.Constants.K8S_NETWORKING_APP_ID;
+import static org.onosproject.k8snetworking.util.K8sNetworkingUtil.getGroupKey;
+import static org.slf4j.LoggerFactory.getLogger;
+
+/**
+ * Sets group table rules directly using GroupService.
+ */
+@Component(immediate = true, service = K8sGroupRuleService.class)
+public class K8sGroupRuleManager implements K8sGroupRuleService {
+
+ private final Logger log = getLogger(getClass());
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected GroupService groupService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected CoreService coreService;
+
+ private ApplicationId appId;
+
+ @Activate
+ protected void activate() {
+ appId = coreService.registerApplication(K8S_NETWORKING_APP_ID);
+ coreService.registerApplication(K8S_NETWORKING_APP_ID);
+
+ log.info("Started");
+ }
+
+ @Deactivate
+ protected void deactivate() {
+ log.info("Stopped");
+ }
+
+ @Override
+ public void setRule(ApplicationId appId, DeviceId deviceId, int groupId,
+ Type type, List<GroupBucket> buckets, boolean install) {
+ GroupDescription groupDesc = new DefaultGroupDescription(deviceId,
+ type, new GroupBuckets(buckets), getGroupKey(groupId), groupId, appId);
+
+ if (install) {
+ groupService.addGroup(groupDesc);
+ } else {
+ groupService.removeGroup(deviceId, getGroupKey(groupId), appId);
+ }
+ }
+}
diff --git a/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sServiceHandler.java b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sServiceHandler.java
new file mode 100644
index 0000000..dfdba64
--- /dev/null
+++ b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sServiceHandler.java
@@ -0,0 +1,437 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.k8snetworking.impl;
+
+import com.google.common.collect.Lists;
+import io.fabric8.kubernetes.api.model.EndpointAddress;
+import io.fabric8.kubernetes.api.model.EndpointPort;
+import io.fabric8.kubernetes.api.model.EndpointSubset;
+import io.fabric8.kubernetes.api.model.Endpoints;
+import io.fabric8.kubernetes.api.model.Service;
+import org.onlab.packet.Ethernet;
+import org.onlab.packet.IPv4;
+import org.onlab.packet.IpAddress;
+import org.onlab.packet.IpPrefix;
+import org.onlab.packet.TpPort;
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.LeadershipService;
+import org.onosproject.cluster.NodeId;
+import org.onosproject.core.ApplicationId;
+import org.onosproject.core.CoreService;
+import org.onosproject.core.GroupId;
+import org.onosproject.k8snetworking.api.K8sEndpointsService;
+import org.onosproject.k8snetworking.api.K8sFlowRuleService;
+import org.onosproject.k8snetworking.api.K8sGroupRuleService;
+import org.onosproject.k8snetworking.api.K8sNetworkService;
+import org.onosproject.k8snetworking.api.K8sServiceEvent;
+import org.onosproject.k8snetworking.api.K8sServiceListener;
+import org.onosproject.k8snetworking.api.K8sServiceService;
+import org.onosproject.k8snetworking.util.RulePopulatorUtil;
+import org.onosproject.k8snetworking.util.RulePopulatorUtil.NiciraConnTrackTreatmentBuilder;
+import org.onosproject.k8snode.api.K8sNode;
+import org.onosproject.k8snode.api.K8sNodeEvent;
+import org.onosproject.k8snode.api.K8sNodeListener;
+import org.onosproject.k8snode.api.K8sNodeService;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.device.DeviceService;
+import org.onosproject.net.driver.DriverService;
+import org.onosproject.net.flow.DefaultTrafficSelector;
+import org.onosproject.net.flow.DefaultTrafficTreatment;
+import org.onosproject.net.flow.TrafficSelector;
+import org.onosproject.net.flow.TrafficTreatment;
+import org.onosproject.net.flow.criteria.ExtensionSelector;
+import org.onosproject.net.flow.instructions.ExtensionTreatment;
+import org.onosproject.net.group.GroupBucket;
+import org.onosproject.store.service.AtomicCounter;
+import org.onosproject.store.service.StorageService;
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Deactivate;
+import org.osgi.service.component.annotations.Reference;
+import org.osgi.service.component.annotations.ReferenceCardinality;
+import org.slf4j.Logger;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ExecutorService;
+import java.util.stream.Collectors;
+
+import static java.util.concurrent.Executors.newSingleThreadExecutor;
+import static org.onlab.util.Tools.groupedThreads;
+import static org.onosproject.k8snetworking.api.Constants.JUMP_TABLE;
+import static org.onosproject.k8snetworking.api.Constants.K8S_NETWORKING_APP_ID;
+import static org.onosproject.k8snetworking.api.Constants.NAT_TABLE;
+import static org.onosproject.k8snetworking.api.Constants.PRIORITY_CT_RULE;
+import static org.onosproject.k8snetworking.api.Constants.ROUTING_TABLE;
+import static org.onosproject.k8snetworking.util.K8sNetworkingUtil.nodeIpGatewayIpMap;
+import static org.onosproject.k8snetworking.util.RulePopulatorUtil.CT_NAT_DST_FLAG;
+import static org.onosproject.k8snetworking.util.RulePopulatorUtil.buildGroupBucket;
+import static org.onosproject.k8snetworking.util.RulePopulatorUtil.buildResubmitExtension;
+import static org.onosproject.k8snetworking.util.RulePopulatorUtil.computeCtMaskFlag;
+import static org.onosproject.k8snetworking.util.RulePopulatorUtil.computeCtStateFlag;
+import static org.onosproject.k8snetworking.util.RulePopulatorUtil.niciraConnTrackTreatmentBuilder;
+import static org.onosproject.net.group.GroupDescription.Type.SELECT;
+import static org.slf4j.LoggerFactory.getLogger;
+
+/**
+ * Handles the service IP to pod IP related translation traffic.
+ */
+@Component(immediate = true)
+public class K8sServiceHandler {
+
+ private final Logger log = getLogger(getClass());
+
+ // TODO: need to inject service IP CIDR through REST
+ private static final String SERVICE_IP_CIDR = "10.96.0.0/24";
+
+ private static final int HOST_CIDR_NUM = 32;
+
+ private static final String NONE = "None";
+ private static final String CLUSTER_IP = "ClusterIP";
+ private static final String TCP = "TCP";
+
+ private static final String GROUP_ID_COUNTER_NAME = "group-id-counter";
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected CoreService coreService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected LeadershipService leadershipService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected ClusterService clusterService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected DriverService driverService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected DeviceService deviceService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected StorageService storageService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected K8sNetworkService k8sNetworkService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected K8sFlowRuleService k8sFlowRuleService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected K8sGroupRuleService k8sGroupRuleService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected K8sNodeService k8sNodeService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected K8sEndpointsService k8sEndpointsService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected K8sServiceService k8sServiceService;
+
+ private final ExecutorService eventExecutor = newSingleThreadExecutor(
+ groupedThreads(this.getClass().getSimpleName(), "event-handler", log));
+ private final InternalNodeEventListener internalNodeEventListener =
+ new InternalNodeEventListener();
+ private final InternalK8sServiceListener internalK8sServiceListener =
+ new InternalK8sServiceListener();
+
+ private AtomicCounter groupIdCounter;
+
+ private ApplicationId appId;
+ private NodeId localNodeId;
+
+ @Activate
+ protected void activate() {
+ appId = coreService.registerApplication(K8S_NETWORKING_APP_ID);
+ localNodeId = clusterService.getLocalNode().id();
+ leadershipService.runForLeadership(appId.name());
+ k8sNodeService.addListener(internalNodeEventListener);
+ k8sServiceService.addListener(internalK8sServiceListener);
+
+ groupIdCounter = storageService.getAtomicCounter(GROUP_ID_COUNTER_NAME);
+
+ log.info("Started");
+ }
+
+ @Deactivate
+ protected void deactivate() {
+ leadershipService.withdraw(appId.name());
+ k8sNodeService.removeListener(internalNodeEventListener);
+ k8sServiceService.removeListener(internalK8sServiceListener);
+ eventExecutor.shutdown();
+
+ log.info("Stopped");
+ }
+
+ private void setServiceNatRules(DeviceId deviceId, boolean install) {
+ // -trk CT rules
+ long ctUntrack = computeCtStateFlag(false, false, false);
+ long ctMaskUntrack = computeCtMaskFlag(true, false, false);
+
+ k8sNetworkService.networks().forEach(n -> {
+ // TODO: need to provide a way to add multiple service IP CIDR ranges
+ setUntrack(deviceId, ctUntrack, ctMaskUntrack, n.cidr(), SERVICE_IP_CIDR,
+ JUMP_TABLE, NAT_TABLE, PRIORITY_CT_RULE, install);
+ setUntrack(deviceId, ctUntrack, ctMaskUntrack, n.cidr(), n.cidr(),
+ JUMP_TABLE, ROUTING_TABLE, PRIORITY_CT_RULE, install);
+ });
+
+ // +trk-new CT rules
+ long ctTrackUnnew = computeCtStateFlag(true, false, false);
+ long ctMaskTrackUnnew = computeCtMaskFlag(true, true, false);
+
+ setTrackEstablish(deviceId, ctTrackUnnew, ctMaskTrackUnnew,
+ NAT_TABLE, ROUTING_TABLE, PRIORITY_CT_RULE, install);
+
+ // +trk+new CT rules
+ long ctTrackNew = computeCtStateFlag(true, true, false);
+ long ctMaskTrackNew = computeCtMaskFlag(true, true, false);
+
+ k8sServiceService.services().stream()
+ .filter(s -> CLUSTER_IP.equals(s.getSpec().getType()))
+ .forEach(s -> setGroupFlowRules(deviceId, ctTrackNew,
+ ctMaskTrackNew, s, install));
+ }
+
+ private void setGroupFlowRules(DeviceId deviceId, long ctState, long ctMask,
+ Service service, boolean install) {
+ int groupId = (int) groupIdCounter.incrementAndGet();
+
+ List<GroupBucket> buckets = Lists.newArrayList();
+
+ String serviceName = service.getMetadata().getName();
+ String serviceIp = service.getSpec().getClusterIP();
+
+ // TODO: multi-ports case should be addressed
+ Integer servicePort = service.getSpec().getPorts().get(0).getPort();
+
+ List<Endpoints> endpointses = k8sEndpointsService.endpointses()
+ .stream()
+ .filter(ep -> serviceName.equals(ep.getMetadata().getName()))
+ .collect(Collectors.toList());
+
+ Map<String, String> nodeIpGatewayIpMap =
+ nodeIpGatewayIpMap(k8sNodeService, k8sNetworkService);
+
+ for (Endpoints endpoints : endpointses) {
+ for (EndpointSubset endpointSubset : endpoints.getSubsets()) {
+ List<EndpointPort> ports = endpointSubset.getPorts()
+ .stream()
+ .filter(p -> p.getProtocol().equals(TCP))
+ .collect(Collectors.toList());
+
+ for (EndpointAddress address : endpointSubset.getAddresses()) {
+ String podIp = nodeIpGatewayIpMap.containsKey(address.getIp()) ?
+ nodeIpGatewayIpMap.get(address.getIp()) : address.getIp();
+
+ NiciraConnTrackTreatmentBuilder connTreatmentBuilder =
+ niciraConnTrackTreatmentBuilder(driverService, deviceId)
+ .commit(true)
+ .natAction(true)
+ .natIp(IpAddress.valueOf(podIp))
+ .natFlag(CT_NAT_DST_FLAG);
+
+ ports.forEach(p -> {
+ ExtensionTreatment ctNatTreatment = connTreatmentBuilder
+ .natPort(TpPort.tpPort(p.getPort())).build();
+ ExtensionTreatment resubmitTreatment = buildResubmitExtension(
+ deviceService.getDevice(deviceId), ROUTING_TABLE);
+ TrafficTreatment treatment = DefaultTrafficTreatment.builder()
+ .extension(ctNatTreatment, deviceId)
+ .extension(resubmitTreatment, deviceId)
+ .build();
+ buckets.add(buildGroupBucket(treatment, SELECT, (short) -1));
+ });
+ }
+ }
+ }
+
+ if (!buckets.isEmpty()) {
+ k8sGroupRuleService.setRule(appId, deviceId, groupId, SELECT, buckets, install);
+
+ setTrackNew(deviceId, ctState, ctMask, IpAddress.valueOf(serviceIp),
+ TpPort.tpPort(servicePort), NAT_TABLE, groupId,
+ PRIORITY_CT_RULE, install);
+ }
+ }
+
+ private void setUntrack(DeviceId deviceId, long ctState, long ctMask,
+ String srcCidr, String dstCidr, int installTable,
+ int transitTable, int priority, boolean install) {
+ ExtensionSelector esCtSate = RulePopulatorUtil
+ .buildCtExtensionSelector(driverService, deviceId, ctState, ctMask);
+ TrafficSelector selector = DefaultTrafficSelector.builder()
+ .matchEthType(Ethernet.TYPE_IPV4)
+ .matchIPSrc(IpPrefix.valueOf(srcCidr))
+ .matchIPDst(IpPrefix.valueOf(dstCidr))
+ .extension(esCtSate, deviceId)
+ .build();
+
+ NiciraConnTrackTreatmentBuilder connTreatmentBuilder =
+ niciraConnTrackTreatmentBuilder(driverService, deviceId)
+ .natAction(false)
+ .commit(false)
+ .table((short) transitTable);
+
+ TrafficTreatment treatment = DefaultTrafficTreatment.builder()
+ .extension(connTreatmentBuilder.build(), deviceId)
+ .build();
+
+ k8sFlowRuleService.setRule(
+ appId,
+ deviceId,
+ selector,
+ treatment,
+ priority,
+ installTable,
+ install);
+ }
+
+ private void setTrackNew(DeviceId deviceId, long ctState, long ctMask,
+ IpAddress dstIp, TpPort dstPort, int installTable,
+ int groupId, int priority, boolean install) {
+ ExtensionSelector esCtSate = RulePopulatorUtil
+ .buildCtExtensionSelector(driverService, deviceId, ctState, ctMask);
+ TrafficSelector selector = DefaultTrafficSelector.builder()
+ .matchEthType(Ethernet.TYPE_IPV4)
+ .matchIPDst(IpPrefix.valueOf(dstIp, HOST_CIDR_NUM))
+ .matchIPProtocol(IPv4.PROTOCOL_TCP)
+ .matchTcpDst(dstPort)
+ .extension(esCtSate, deviceId)
+ .build();
+ TrafficTreatment treatment = DefaultTrafficTreatment.builder()
+ .group(GroupId.valueOf(groupId))
+ .build();
+
+ k8sFlowRuleService.setRule(
+ appId,
+ deviceId,
+ selector,
+ treatment,
+ priority,
+ installTable,
+ install);
+ }
+
+ private void setTrackEstablish(DeviceId deviceId, long ctState, long ctMask,
+ int installTable, int transitTable,
+ int priority, boolean install) {
+ ExtensionSelector esCtSate = RulePopulatorUtil
+ .buildCtExtensionSelector(driverService, deviceId, ctState, ctMask);
+ TrafficSelector selector = DefaultTrafficSelector.builder()
+ .extension(esCtSate, deviceId)
+ .build();
+
+ TrafficTreatment treatment = DefaultTrafficTreatment.builder()
+ .transition(transitTable)
+ .build();
+
+ k8sFlowRuleService.setRule(
+ appId,
+ deviceId,
+ selector,
+ treatment,
+ priority,
+ installTable,
+ install);
+ }
+
+ private class InternalK8sServiceListener implements K8sServiceListener {
+
+ private boolean isRelevantHelper() {
+ return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
+ }
+
+ @Override
+ public void event(K8sServiceEvent event) {
+ switch (event.type()) {
+ case K8S_SERVICE_CREATED:
+ eventExecutor.execute(() -> processServiceCreation(event.subject()));
+ break;
+ case K8S_SERVICE_REMOVED:
+ eventExecutor.execute(() -> processServiceRemoval(event.subject()));
+ break;
+ default:
+ // do nothing
+ break;
+ }
+ }
+
+ private void processServiceCreation(Service service) {
+ if (!isRelevantHelper()) {
+ return;
+ }
+
+ long ctTrackNew = computeCtStateFlag(true, true, false);
+ long ctMaskTrackNew = computeCtMaskFlag(true, true, false);
+
+ k8sNodeService.completeNodes().forEach(n ->
+ setGroupFlowRules(n.intgBridge(), ctTrackNew,
+ ctMaskTrackNew, service, true));
+ }
+
+ private void processServiceRemoval(Service service) {
+ if (!isRelevantHelper()) {
+ return;
+ }
+
+ long ctTrackNew = computeCtStateFlag(true, true, false);
+ long ctMaskTrackNew = computeCtMaskFlag(true, true, false);
+
+ k8sNodeService.completeNodes().forEach(n ->
+ setGroupFlowRules(n.intgBridge(), ctTrackNew,
+ ctMaskTrackNew, service, false)); }
+ }
+
+ private class InternalNodeEventListener implements K8sNodeListener {
+
+ private boolean isRelevantHelper() {
+ return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
+ }
+
+ @Override
+ public void event(K8sNodeEvent event) {
+ K8sNode k8sNode = event.subject();
+ switch (event.type()) {
+ case K8S_NODE_COMPLETE:
+ eventExecutor.execute(() -> processNodeCompletion(k8sNode));
+ break;
+ case K8S_NODE_INCOMPLETE:
+ eventExecutor.execute(() -> processNodeIncompletion(k8sNode));
+ break;
+ default:
+ break;
+ }
+ }
+
+ private void processNodeCompletion(K8sNode node) {
+ if (!isRelevantHelper()) {
+ return;
+ }
+
+ setServiceNatRules(node.intgBridge(), true);
+ }
+
+ private void processNodeIncompletion(K8sNode node) {
+ if (!isRelevantHelper()) {
+ return;
+ }
+
+ setServiceNatRules(node.intgBridge(), false);
+ }
+ }
+}
diff --git a/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sSwitchingHandler.java b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sSwitchingHandler.java
index 53af926..88cb589 100644
--- a/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sSwitchingHandler.java
+++ b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sSwitchingHandler.java
@@ -17,6 +17,7 @@
import com.google.common.base.Strings;
import org.onlab.packet.Ethernet;
+import org.onlab.packet.IpPrefix;
import org.onosproject.cfg.ComponentConfigService;
import org.onosproject.cfg.ConfigProperty;
import org.onosproject.cluster.LeadershipService;
@@ -116,6 +117,8 @@
appId = coreService.registerApplication(K8S_NETWORKING_APP_ID);
k8sNetworkService.addListener(k8sNetworkListener);
+ setGatewayRulesForTunnel(true);
+
log.info("Started");
}
@@ -124,6 +127,8 @@
k8sNetworkService.removeListener(k8sNetworkListener);
eventExecutor.shutdown();
+ setGatewayRulesForTunnel(false);
+
log.info("Stopped");
}
@@ -240,6 +245,35 @@
install);
}
+ private void setGatewayRulesForTunnel(boolean install) {
+ k8sNetworkService.networks().forEach(n -> {
+ // switching rules for the instPorts in the same node
+ TrafficSelector selector = DefaultTrafficSelector.builder()
+ // TODO: need to handle IPv6 in near future
+ .matchEthType(Ethernet.TYPE_IPV4)
+ .matchIPDst(IpPrefix.valueOf(n.gatewayIp(), 32))
+ .matchTunnelId(Long.valueOf(n.segmentId()))
+ .build();
+
+ TrafficTreatment treatment = DefaultTrafficTreatment.builder()
+ .setOutput(PortNumber.LOCAL)
+ .build();
+
+ // FIXME: need to find a way to install the gateway rules into
+ // right OVS
+ k8sNodeService.completeNodes().forEach(node -> {
+ k8sFlowRuleService.setRule(
+ appId,
+ node.intgBridge(),
+ selector,
+ treatment,
+ PRIORITY_SWITCHING_RULE,
+ FORWARDING_TABLE,
+ install);
+ });
+ });
+ }
+
/**
* Obtains the VNI from the given kubernetes port.
*
@@ -261,7 +295,8 @@
K8sNetwork k8sNet = k8sNetworkService.network(port.networkId());
if (k8sNet == null) {
- log.warn("Network {} is not found from port {}.", port.networkId(), port.portId());
+ log.warn("Network {} is not found from port {}.",
+ port.networkId(), port.portId());
return;
}
@@ -272,7 +307,8 @@
setNetworkRulesForTunnel(port, install);
break;
default:
- log.warn("The given network type {} is not supported.", k8sNet.type().name());
+ log.warn("The given network type {} is not supported.",
+ k8sNet.type().name());
break;
}
}
diff --git a/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/util/K8sNetworkingUtil.java b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/util/K8sNetworkingUtil.java
index 2e838e8..2647d70 100644
--- a/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/util/K8sNetworkingUtil.java
+++ b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/util/K8sNetworkingUtil.java
@@ -19,6 +19,7 @@
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.Maps;
import io.fabric8.kubernetes.client.ConfigBuilder;
import io.fabric8.kubernetes.client.DefaultKubernetesClient;
import io.fabric8.kubernetes.client.KubernetesClient;
@@ -31,13 +32,17 @@
import org.onosproject.k8snode.api.K8sApiConfig;
import org.onosproject.k8snode.api.K8sApiConfigService;
import org.onosproject.k8snode.api.K8sNode;
+import org.onosproject.k8snode.api.K8sNodeService;
import org.onosproject.net.PortNumber;
+import org.onosproject.net.group.DefaultGroupKey;
+import org.onosproject.net.group.GroupKey;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashSet;
+import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
@@ -168,6 +173,16 @@
}
/**
+ * Obtains flow group key from the given id.
+ *
+ * @param groupId flow group identifier
+ * @return flow group key
+ */
+ public static GroupKey getGroupKey(int groupId) {
+ return new DefaultGroupKey((Integer.toString(groupId)).getBytes());
+ }
+
+ /**
* Generates endpoint URL by referring to scheme, ipAddress and port.
*
* @param scheme scheme
@@ -248,4 +263,25 @@
return client;
}
+
+ /**
+ * Obtains the kubernetes node IP and kubernetes network gateway IP map.
+ *
+ * @param nodeService kubernetes node service
+ * @param networkService kubernetes network service
+ * @return kubernetes node IP and kubernetes network gateway IP map
+ */
+ public static Map<String, String> nodeIpGatewayIpMap(K8sNodeService nodeService,
+ K8sNetworkService networkService) {
+ Map<String, String> ipMap = Maps.newConcurrentMap();
+
+ nodeService.completeNodes().forEach(n -> {
+ K8sNetwork network = networkService.network(n.hostname());
+ if (network != null) {
+ ipMap.put(n.dataIp().toString(), network.gatewayIp().toString());
+ }
+ });
+
+ return ipMap;
+ }
}
diff --git a/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/util/RulePopulatorUtil.java b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/util/RulePopulatorUtil.java
index d5366ae..8192015 100644
--- a/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/util/RulePopulatorUtil.java
+++ b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/util/RulePopulatorUtil.java
@@ -16,14 +16,30 @@
package org.onosproject.k8snetworking.util;
import org.onlab.packet.Ip4Address;
+import org.onlab.packet.IpAddress;
+import org.onlab.packet.TpPort;
import org.onosproject.net.Device;
import org.onosproject.net.DeviceId;
+import org.onosproject.net.behaviour.ExtensionSelectorResolver;
import org.onosproject.net.behaviour.ExtensionTreatmentResolver;
import org.onosproject.net.device.DeviceService;
+import org.onosproject.net.driver.DriverHandler;
+import org.onosproject.net.driver.DriverService;
+import org.onosproject.net.flow.TrafficTreatment;
+import org.onosproject.net.flow.criteria.ExtensionSelector;
+import org.onosproject.net.flow.criteria.ExtensionSelectorType;
import org.onosproject.net.flow.instructions.ExtensionPropertyException;
import org.onosproject.net.flow.instructions.ExtensionTreatment;
+import org.onosproject.net.flow.instructions.ExtensionTreatmentType;
+import org.onosproject.net.group.DefaultGroupBucket;
+import org.onosproject.net.group.GroupBucket;
+import org.onosproject.net.group.GroupDescription.Type;
import org.slf4j.Logger;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.onosproject.net.flow.instructions.ExtensionTreatmentType.ExtensionTreatmentTypes.NICIRA_RESUBMIT_TABLE;
import static org.onosproject.net.flow.instructions.ExtensionTreatmentType.ExtensionTreatmentTypes.NICIRA_SET_TUNNEL_DST;
import static org.slf4j.LoggerFactory.getLogger;
@@ -36,10 +52,145 @@
private static final String TUNNEL_DST = "tunnelDst";
+ private static final String CT_FLAGS = "flags";
+ private static final String CT_ZONE = "zone";
+ private static final String CT_TABLE = "recircTable";
+ private static final String CT_STATE = "ctState";
+ private static final String CT_STATE_MASK = "ctStateMask";
+ private static final String CT_PRESENT_FLAGS = "presentFlags";
+ private static final String CT_IPADDRESS_MIN = "ipAddressMin";
+ private static final String CT_IPADDRESS_MAX = "ipAddressMax";
+ private static final String CT_PORT_MIN = "portMin";
+ private static final String CT_PORT_MAX = "portMax";
+ private static final String CT_NESTED_ACTIONS = "nestedActions";
+
+ public static final int CT_NAT_SRC_FLAG = 0;
+ public static final int CT_NAT_DST_FLAG = 1;
+ public static final int CT_NAT_PERSISTENT_FLAG = 2;
+ public static final int CT_NAT_PROTO_HASH_FLAG = 3;
+ public static final int CT_NAT_PROTO_RANDOM_FLAG = 4;
+
+ private static final int ADDRESS_V4_MIN_FLAG = 0;
+ private static final int ADDRESS_V4_MAX_FLAG = 1;
+ private static final int ADDRESS_V6_MIN_FLAG = 2;
+ private static final int ADDRESS_V6_MAX_FLAG = 3;
+ private static final int PORT_MIN_FLAG = 4;
+ private static final int PORT_MAX_FLAG = 5;
+
+ public static final long CT_STATE_NONE = 0;
+ public static final long CT_STATE_NEW = 0x01;
+ public static final long CT_STATE_EST = 0x02;
+ public static final long CT_STATE_NOT_TRK = 0x20;
+ public static final long CT_STATE_TRK = 0x20;
+
+ private static final String TABLE_EXTENSION = "table";
+
+ // not intended for direct invocation from external
private RulePopulatorUtil() {
}
/**
+ * Returns a builder for OVS Connection Tracking feature actions.
+ *
+ * @param ds DriverService
+ * @param id DeviceId
+ * @return a builder for OVS Connection Tracking feature actions
+ */
+ public static NiciraConnTrackTreatmentBuilder
+ niciraConnTrackTreatmentBuilder(DriverService ds, DeviceId id) {
+ return new NiciraConnTrackTreatmentBuilder(ds, id);
+ }
+
+ /**
+ * Builds OVS ConnTrack matches.
+ *
+ * @param driverService driver service
+ * @param deviceId device ID
+ * @param ctState connection tracking sate masking value
+ * @param ctSateMask connection tracking sate masking value
+ * @return OVS ConnTrack extension match
+ */
+ public static ExtensionSelector buildCtExtensionSelector(DriverService driverService,
+ DeviceId deviceId,
+ long ctState,
+ long ctSateMask) {
+ DriverHandler handler = driverService.createHandler(deviceId);
+ ExtensionSelectorResolver esr = handler.behaviour(ExtensionSelectorResolver.class);
+
+ ExtensionSelector extensionSelector = esr.getExtensionSelector(
+ ExtensionSelectorType.ExtensionSelectorTypes.NICIRA_MATCH_CONNTRACK_STATE.type());
+ try {
+ extensionSelector.setPropertyValue(CT_STATE, ctState);
+ extensionSelector.setPropertyValue(CT_STATE_MASK, ctSateMask);
+ } catch (Exception e) {
+ log.error("Failed to set nicira match CT state because of {}", e);
+ return null;
+ }
+
+ return extensionSelector;
+ }
+
+ /**
+ * Computes ConnTack State flag values.
+ *
+ * @param isTracking true for +trk, false for -trk
+ * @param isNew true for +new, false for -new
+ * @param isEstablished true for +est, false for -est
+ * @return ConnTrack State flags
+ */
+ public static long computeCtStateFlag(boolean isTracking,
+ boolean isNew,
+ boolean isEstablished) {
+ long ctStateFlag = 0x00;
+
+ if (isTracking) {
+ ctStateFlag = ctStateFlag | CT_STATE_TRK;
+ }
+
+ if (isNew) {
+ ctStateFlag = ctStateFlag | CT_STATE_TRK;
+ ctStateFlag = ctStateFlag | CT_STATE_NEW;
+ }
+
+ if (isEstablished) {
+ ctStateFlag = ctStateFlag | CT_STATE_TRK;
+ ctStateFlag = ctStateFlag | CT_STATE_EST;
+ }
+
+ return ctStateFlag;
+ }
+
+ /**
+ * Computes ConnTrack State mask values.
+ *
+ * @param isTracking true for setting +trk/-trk value, false for otherwise
+ * @param isNew true for setting +new/-new value, false for otherwise
+ * @param isEstablished true for setting +est/-est value, false for otherwise
+ * @return ConnTrack State Mask value
+ */
+ public static long computeCtMaskFlag(boolean isTracking,
+ boolean isNew,
+ boolean isEstablished) {
+ long ctMaskFlag = 0x00;
+
+ if (isTracking) {
+ ctMaskFlag = ctMaskFlag | CT_STATE_TRK;
+ }
+
+ if (isNew) {
+ ctMaskFlag = ctMaskFlag | CT_STATE_TRK;
+ ctMaskFlag = ctMaskFlag | CT_STATE_NEW;
+ }
+
+ if (isEstablished) {
+ ctMaskFlag = ctMaskFlag | CT_STATE_TRK;
+ ctMaskFlag = ctMaskFlag | CT_STATE_EST;
+ }
+
+ return ctMaskFlag;
+ }
+
+ /**
* Returns tunnel destination extension treatment object.
*
* @param deviceService driver service
@@ -72,4 +223,231 @@
return null;
}
}
+
+ /**
+ * Returns the group bucket with given traffic treatment and group type.
+ *
+ * @param treatment traffic treatment
+ * @param type group type
+ * @param weight weight (only for select type)
+ * @return group bucket
+ */
+ public static GroupBucket buildGroupBucket(TrafficTreatment treatment,
+ Type type, short weight) {
+ switch (type) {
+ case ALL:
+ return DefaultGroupBucket.createAllGroupBucket(treatment);
+ case SELECT:
+ if (weight == -1) {
+ return DefaultGroupBucket.createSelectGroupBucket(treatment);
+ } else {
+ return DefaultGroupBucket.createSelectGroupBucket(treatment, weight);
+ }
+ case INDIRECT:
+ return DefaultGroupBucket.createIndirectGroupBucket(treatment);
+ default:
+ return null;
+ }
+ }
+
+ /**
+ * Returns the nicira resubmit extension treatment with given table ID.
+ *
+ * @param device device identifier
+ * @param tableId table identifier
+ * @return resubmit extension treatment
+ */
+ public static ExtensionTreatment buildResubmitExtension(Device device, int tableId) {
+ if (device == null || !device.is(ExtensionTreatmentResolver.class)) {
+ log.warn("Nicira extension treatment is not supported");
+ return null;
+ }
+
+ ExtensionTreatmentResolver resolver = device.as(ExtensionTreatmentResolver.class);
+ ExtensionTreatment treatment =
+ resolver.getExtensionInstruction(NICIRA_RESUBMIT_TABLE.type());
+
+ try {
+ treatment.setPropertyValue(TABLE_EXTENSION, ((short) tableId));
+ return treatment;
+ } catch (ExtensionPropertyException e) {
+ log.error("Failed to set nicira resubmit extension treatment for {}",
+ device.id());
+ return null;
+ }
+ }
+
+ /**
+ * Builder class for OVS Connection Tracking feature actions.
+ */
+ public static final class NiciraConnTrackTreatmentBuilder {
+
+ private DriverService driverService;
+ private DeviceId deviceId;
+ private IpAddress natAddress = null;
+ private TpPort natPort = null;
+ private int zone;
+ private boolean commit;
+ private short table = -1;
+ private boolean natAction;
+ private int natFlag;
+
+ // private constructor
+ private NiciraConnTrackTreatmentBuilder(DriverService driverService,
+ DeviceId deviceId) {
+ this.driverService = driverService;
+ this.deviceId = deviceId;
+ }
+
+ /**
+ * Sets commit flag.
+ *
+ * @param c true if commit, false if not.
+ * @return NiciraConnTrackTreatmentBuilder object
+ */
+ public NiciraConnTrackTreatmentBuilder commit(boolean c) {
+ this.commit = c;
+ return this;
+ }
+
+ /**
+ * Sets zone number.
+ *
+ * @param z zone number
+ * @return NiciraConnTrackTreatmentBuilder object
+ */
+ public NiciraConnTrackTreatmentBuilder zone(int z) {
+ this.zone = z;
+ return this;
+ }
+
+ /**
+ * Sets recirculation table number.
+ *
+ * @param t table number to restart
+ * @return NiciraConnTrackTreatmentBuilder object
+ */
+ public NiciraConnTrackTreatmentBuilder table(short t) {
+ this.table = t;
+ return this;
+ }
+
+ /**
+ * Sets IP address for NAT.
+ *
+ * @param ip NAT IP address
+ * @return NiciraConnTrackTreatmentBuilder object
+ */
+ public NiciraConnTrackTreatmentBuilder natIp(IpAddress ip) {
+ this.natAddress = ip;
+ return this;
+ }
+
+ /**
+ * Sets port for NAT.
+ *
+ * @param port port number
+ * @return NiciraConnTrackTreatmentBuilder object
+ */
+ public NiciraConnTrackTreatmentBuilder natPort(TpPort port) {
+ this.natPort = port;
+ return this;
+ }
+
+ /**
+ * Sets NAT flags.
+ * SRC NAT: 1 << 0
+ * DST NAT: 1 << 1
+ * PERSISTENT NAT: 1 << 2
+ * PROTO_HASH NAT: 1 << 3
+ * PROTO_RANDOM NAT : 1 << 4
+ *
+ * @param flag flag value
+ * @return NiciraConnTrackTreatmentBuilder object
+ */
+ public NiciraConnTrackTreatmentBuilder natFlag(int flag) {
+ this.natFlag = 1 << flag;
+ return this;
+ }
+
+ /**
+ * Sets the flag for NAT action.
+ *
+ * @param nat nat action is included if true, no nat action otherwise
+ * @return NiciraConnTrackTreatmentBuilder object
+ */
+ public NiciraConnTrackTreatmentBuilder natAction(boolean nat) {
+ this.natAction = nat;
+ return this;
+ }
+
+ /**
+ * Builds extension treatment for OVS ConnTack and NAT feature.
+ *
+ * @return ExtensionTreatment object
+ */
+ public ExtensionTreatment build() {
+ DriverHandler handler = driverService.createHandler(deviceId);
+ ExtensionTreatmentResolver etr =
+ handler.behaviour(ExtensionTreatmentResolver.class);
+
+ ExtensionTreatment natTreatment = etr.getExtensionInstruction(
+ ExtensionTreatmentType.ExtensionTreatmentTypes.NICIRA_NAT.type());
+ try {
+
+ natTreatment.setPropertyValue(CT_FLAGS, this.natFlag);
+
+ natTreatment.setPropertyValue(CT_PRESENT_FLAGS,
+ buildPresentFlag(natPort != null, natAddress != null));
+
+ if (natAddress != null) {
+ natTreatment.setPropertyValue(CT_IPADDRESS_MIN, natAddress);
+ natTreatment.setPropertyValue(CT_IPADDRESS_MAX, natAddress);
+ }
+
+ if (natPort != null) {
+ natTreatment.setPropertyValue(CT_PORT_MIN, natPort.toInt());
+ natTreatment.setPropertyValue(CT_PORT_MAX, natPort.toInt());
+ }
+
+ } catch (Exception e) {
+ log.error("Failed to set NAT due to error : {}", e);
+ return null;
+ }
+
+ ExtensionTreatment ctTreatment = etr.getExtensionInstruction(
+ ExtensionTreatmentType.ExtensionTreatmentTypes.NICIRA_CT.type());
+ try {
+ List<ExtensionTreatment> nat = new ArrayList<>();
+ if (natAction) {
+ nat.add(natTreatment);
+ }
+ ctTreatment.setPropertyValue(CT_FLAGS, commit ? 1 : 0);
+ ctTreatment.setPropertyValue(CT_ZONE, zone);
+ ctTreatment.setPropertyValue(CT_TABLE, table > -1 ? table : 0xff);
+ ctTreatment.setPropertyValue(CT_NESTED_ACTIONS, nat);
+ } catch (Exception e) {
+ log.error("Failed to set CT due to error : {}", e);
+ return null;
+ }
+
+ return ctTreatment;
+ }
+
+ private int buildPresentFlag(boolean isPortPresent, boolean isAddressPresent) {
+
+ int presentFlag = 0;
+
+ if (isPortPresent) {
+ presentFlag = 1 << PORT_MIN_FLAG | 1 << PORT_MAX_FLAG;
+ }
+
+ if (isAddressPresent) {
+ // TODO: need to support IPv6 address
+ presentFlag = presentFlag | 1 << ADDRESS_V4_MIN_FLAG | 1 << ADDRESS_V4_MAX_FLAG;
+ }
+
+ return presentFlag;
+ }
+ }
}