blob: 5ddc8724425bb40267e1802e8c02fc2c740e978c [file] [log] [blame]
Jian Li7d111d72019-04-12 13:58:44 +09001/*
2 * Copyright 2019-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.k8snetworking.impl;
17
18import org.onlab.packet.Ethernet;
19import org.onlab.packet.IpPrefix;
20import org.onosproject.cluster.ClusterService;
21import org.onosproject.cluster.LeadershipService;
22import org.onosproject.cluster.NodeId;
23import org.onosproject.core.ApplicationId;
24import org.onosproject.core.CoreService;
25import org.onosproject.k8snetworking.api.K8sFlowRuleService;
26import org.onosproject.k8snetworking.api.K8sNetwork;
27import org.onosproject.k8snetworking.api.K8sNetworkEvent;
28import org.onosproject.k8snetworking.api.K8sNetworkListener;
29import org.onosproject.k8snetworking.api.K8sNetworkService;
30import org.onosproject.k8snode.api.K8sNode;
31import org.onosproject.k8snode.api.K8sNodeEvent;
32import org.onosproject.k8snode.api.K8sNodeListener;
33import org.onosproject.k8snode.api.K8sNodeService;
34import org.onosproject.net.PortNumber;
35import org.onosproject.net.device.DeviceService;
36import org.onosproject.net.driver.DriverService;
37import org.onosproject.net.flow.DefaultTrafficSelector;
38import org.onosproject.net.flow.DefaultTrafficTreatment;
39import org.onosproject.net.flow.TrafficSelector;
40import org.onosproject.net.flow.TrafficTreatment;
41import org.onosproject.net.packet.PacketService;
42import org.osgi.service.component.annotations.Activate;
43import org.osgi.service.component.annotations.Component;
44import org.osgi.service.component.annotations.Deactivate;
45import org.osgi.service.component.annotations.Reference;
46import org.osgi.service.component.annotations.ReferenceCardinality;
47import org.slf4j.Logger;
48
49import java.util.Objects;
50import java.util.concurrent.ExecutorService;
51
52import static java.util.concurrent.Executors.newSingleThreadExecutor;
53import static org.onlab.util.Tools.groupedThreads;
54import static org.onosproject.k8snetworking.api.Constants.K8S_NETWORKING_APP_ID;
55import static org.onosproject.k8snetworking.api.Constants.PRIORITY_GATEWAY_RULE;
56import static org.onosproject.k8snetworking.api.Constants.ROUTING_TABLE;
57import static org.onosproject.k8snetworking.util.K8sNetworkingUtil.tunnelPortNumByNetId;
58import static org.onosproject.k8snetworking.util.RulePopulatorUtil.buildExtension;
59import static org.slf4j.LoggerFactory.getLogger;
60
61/**
62 * Populates switching flow rules on OVS for providing the connectivity between
63 * container and network gateway.
64 */
65@Component(immediate = true)
66public class K8sSwitchingGatewayHandler {
67
68 private final Logger log = getLogger(getClass());
69
70 private static final int GW_IP_PREFIX = 32;
71
72 @Reference(cardinality = ReferenceCardinality.MANDATORY)
73 protected CoreService coreService;
74
75 @Reference(cardinality = ReferenceCardinality.MANDATORY)
76 protected ClusterService clusterService;
77
78 @Reference(cardinality = ReferenceCardinality.MANDATORY)
79 protected LeadershipService leadershipService;
80
81 @Reference(cardinality = ReferenceCardinality.MANDATORY)
82 protected DeviceService deviceService;
83
84 @Reference(cardinality = ReferenceCardinality.MANDATORY)
85 protected DriverService driverService;
86
87 @Reference(cardinality = ReferenceCardinality.MANDATORY)
88 protected PacketService packetService;
89
90 @Reference(cardinality = ReferenceCardinality.MANDATORY)
91 protected K8sFlowRuleService k8sFlowRuleService;
92
93 @Reference(cardinality = ReferenceCardinality.MANDATORY)
94 protected K8sNetworkService k8sNetworkService;
95
96 @Reference(cardinality = ReferenceCardinality.MANDATORY)
97 protected K8sNodeService k8sNodeService;
98
99 private final ExecutorService eventExecutor = newSingleThreadExecutor(
100 groupedThreads(this.getClass().getSimpleName(), "event-handler"));
101 private final InternalK8sNetworkListener k8sNetworkListener =
102 new InternalK8sNetworkListener();
103 private final InternalK8sNodeListener k8sNodeListener =
104 new InternalK8sNodeListener();
105
106 private ApplicationId appId;
107 private NodeId localNodeId;
108
109 @Activate
110 protected void activate() {
111 appId = coreService.registerApplication(K8S_NETWORKING_APP_ID);
112 k8sNetworkService.addListener(k8sNetworkListener);
113 k8sNodeService.addListener(k8sNodeListener);
114 localNodeId = clusterService.getLocalNode().id();
115 leadershipService.runForLeadership(appId.name());
116
117 log.info("Started");
118 }
119
120 @Deactivate
121 protected void deactivate() {
122 k8sNodeService.removeListener(k8sNodeListener);
123 k8sNetworkService.removeListener(k8sNetworkListener);
124 leadershipService.withdraw(appId.name());
125 eventExecutor.shutdown();
126
127 log.info("Stopped");
128 }
129
130 private void setGatewayRule(K8sNetwork k8sNetwork, boolean install) {
131 TrafficSelector.Builder sBuilder = DefaultTrafficSelector.builder()
132 .matchEthType(Ethernet.TYPE_IPV4)
133 .matchIPDst(IpPrefix.valueOf(k8sNetwork.gatewayIp(), GW_IP_PREFIX));
134
135 for (K8sNode node : k8sNodeService.completeNodes()) {
136 TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder();
137
138 if (node.hostname().equals(k8sNetwork.name())) {
Jian Lieb488ea2019-04-16 01:50:02 +0900139 tBuilder.setEthDst(node.intgBridgeMac())
Jian Li7d111d72019-04-12 13:58:44 +0900140 .setOutput(PortNumber.LOCAL);
141 } else {
142 PortNumber portNum = tunnelPortNumByNetId(k8sNetwork.networkId(),
143 k8sNetworkService, node);
144 K8sNode localNode = k8sNodeService.node(k8sNetwork.name());
145
146 tBuilder.extension(buildExtension(
147 deviceService,
148 node.intgBridge(),
149 localNode.dataIp().getIp4Address()),
150 node.intgBridge())
151 .setOutput(portNum);
152 }
153
154 k8sFlowRuleService.setRule(
155 appId,
156 node.intgBridge(),
157 sBuilder.build(),
158 tBuilder.build(),
159 PRIORITY_GATEWAY_RULE,
160 ROUTING_TABLE,
161 install);
162 }
163 }
164
165 private class InternalK8sNetworkListener implements K8sNetworkListener {
166
167 private boolean isRelevantHelper() {
168 return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
169 }
170
171 @Override
172 public void event(K8sNetworkEvent event) {
173 switch (event.type()) {
174 case K8S_NETWORK_CREATED:
175 case K8S_NETWORK_UPDATED:
176 eventExecutor.execute(() -> processNetworkCreation(event));
177 break;
178 case K8S_NETWORK_REMOVED:
179 eventExecutor.execute(() -> processNetworkRemoval(event));
180 break;
181 default:
182 break;
183 }
184 }
185
186 private void processNetworkCreation(K8sNetworkEvent event) {
187 if (!isRelevantHelper()) {
188 return;
189 }
190
191 setGatewayRule(event.subject(), true);
192 }
193
194 private void processNetworkRemoval(K8sNetworkEvent event) {
195 if (!isRelevantHelper()) {
196 return;
197 }
198
199 setGatewayRule(event.subject(), false);
200 }
201 }
202
203 private class InternalK8sNodeListener implements K8sNodeListener {
204 private boolean isRelevantHelper() {
205 return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
206 }
207
208 @Override
209 public void event(K8sNodeEvent event) {
210 switch (event.type()) {
211 case K8S_NODE_COMPLETE:
212 eventExecutor.execute(() -> processNodeCompletion(event.subject()));
213 break;
214 case K8S_NODE_INCOMPLETE:
215 default:
216 break;
217 }
218 }
219
220 private void processNodeCompletion(K8sNode node) {
221 log.info("COMPLETE node {} is detected", node.hostname());
222
223 if (!isRelevantHelper()) {
224 return;
225 }
226
227 k8sNetworkService.networks().forEach(n -> setGatewayRule(n, true));
228 }
229 }
230}