blob: b94c7c4354c5da95b92f4317ba47fef509dc5454 [file] [log] [blame]
Jian Li43244382021-01-09 00:19:02 +09001/*
2 * Copyright 2021-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.kubevirtnetworking.impl;
17
18import org.onosproject.cluster.ClusterService;
19import org.onosproject.cluster.LeadershipService;
Jian Lidc08c952022-03-23 13:28:01 +090020import org.onosproject.cluster.NodeId;
Jian Li43244382021-01-09 00:19:02 +090021import org.onosproject.core.ApplicationId;
22import org.onosproject.core.CoreService;
23import org.onosproject.kubevirtnetworking.api.KubevirtFlowRuleService;
24import org.onosproject.kubevirtnode.api.KubevirtNode;
Jian Lidc08c952022-03-23 13:28:01 +090025import org.onosproject.kubevirtnode.api.KubevirtNodeEvent;
26import org.onosproject.kubevirtnode.api.KubevirtNodeListener;
Jian Li43244382021-01-09 00:19:02 +090027import org.onosproject.kubevirtnode.api.KubevirtNodeService;
28import org.onosproject.mastership.MastershipService;
29import org.onosproject.net.DeviceId;
30import org.onosproject.net.Port;
31import org.onosproject.net.PortNumber;
32import org.onosproject.net.device.DeviceEvent;
33import org.onosproject.net.device.DeviceListener;
34import org.onosproject.net.device.DeviceService;
35import org.onosproject.net.driver.DriverService;
36import org.onosproject.net.flow.DefaultTrafficSelector;
37import org.onosproject.net.flow.DefaultTrafficTreatment;
38import org.onosproject.net.flow.TrafficSelector;
39import org.onosproject.net.flow.TrafficTreatment;
40import org.osgi.service.component.annotations.Activate;
41import org.osgi.service.component.annotations.Component;
42import org.osgi.service.component.annotations.Deactivate;
43import org.osgi.service.component.annotations.Reference;
44import org.osgi.service.component.annotations.ReferenceCardinality;
45import org.slf4j.Logger;
46
Jian Lidc08c952022-03-23 13:28:01 +090047import java.util.Objects;
Jian Li43244382021-01-09 00:19:02 +090048import java.util.Set;
49import java.util.concurrent.ExecutorService;
50import java.util.stream.Collectors;
51
52import static java.util.concurrent.Executors.newSingleThreadExecutor;
53import static org.onlab.util.Tools.groupedThreads;
Jian Lif89d9602021-04-27 19:05:49 +090054import static org.onosproject.kubevirtnetworking.api.Constants.ACL_INGRESS_TABLE;
55import static org.onosproject.kubevirtnetworking.api.Constants.ARP_TABLE;
Jian Li43244382021-01-09 00:19:02 +090056import static org.onosproject.kubevirtnetworking.api.Constants.KUBEVIRT_NETWORKING_APP_ID;
Jian Li858ccd72021-02-04 17:25:01 +090057import static org.onosproject.kubevirtnetworking.api.Constants.PRIORITY_FORWARDING_RULE;
Jian Li43244382021-01-09 00:19:02 +090058import static org.onosproject.kubevirtnetworking.util.KubevirtNetworkingUtil.structurePortName;
59import static org.onosproject.kubevirtnode.api.Constants.INTEGRATION_TO_PHYSICAL_PREFIX;
60import static org.onosproject.net.AnnotationKeys.PORT_NAME;
61import static org.slf4j.LoggerFactory.getLogger;
62
63/**
Jian Li858ccd72021-02-04 17:25:01 +090064 * Populates switching flow rules on OVS for the provider network (underlay).
Jian Li43244382021-01-09 00:19:02 +090065 */
66@Component(immediate = true)
67public class KubevirtSwitchingPhysicalHandler {
68 private final Logger log = getLogger(getClass());
69
70 @Reference(cardinality = ReferenceCardinality.MANDATORY)
71 protected CoreService coreService;
72 @Reference(cardinality = ReferenceCardinality.MANDATORY)
73 protected MastershipService mastershipService;
74 @Reference(cardinality = ReferenceCardinality.MANDATORY)
75 protected DeviceService deviceService;
76 @Reference(cardinality = ReferenceCardinality.MANDATORY)
77 protected DriverService driverService;
78 @Reference(cardinality = ReferenceCardinality.MANDATORY)
79 protected ClusterService clusterService;
80 @Reference(cardinality = ReferenceCardinality.MANDATORY)
81 protected LeadershipService leadershipService;
82 @Reference(cardinality = ReferenceCardinality.MANDATORY)
83 protected KubevirtFlowRuleService flowRuleService;
84 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Jian Lidc08c952022-03-23 13:28:01 +090085 protected KubevirtNodeService kubevirtNodeService;
Jian Li43244382021-01-09 00:19:02 +090086
87 private final ExecutorService eventExecutor = newSingleThreadExecutor(
88 groupedThreads(this.getClass().getSimpleName(), "event-handler"));
89 private final InternalDeviceListener internalDeviceListener = new InternalDeviceListener();
Jian Lidc08c952022-03-23 13:28:01 +090090 private final InternalKubevirtNodeListener kubevirtNodeListener = new InternalKubevirtNodeListener();
Jian Li43244382021-01-09 00:19:02 +090091 private ApplicationId appId;
Jian Lidc08c952022-03-23 13:28:01 +090092 private NodeId localNodeId;
Jian Li43244382021-01-09 00:19:02 +090093
94 @Activate
95 protected void activate() {
96 appId = coreService.registerApplication(KUBEVIRT_NETWORKING_APP_ID);
Jian Lidc08c952022-03-23 13:28:01 +090097 localNodeId = clusterService.getLocalNode().id();
Jian Li43244382021-01-09 00:19:02 +090098 deviceService.addListener(internalDeviceListener);
Jian Lidc08c952022-03-23 13:28:01 +090099 kubevirtNodeService.addListener(kubevirtNodeListener);
Jian Li43244382021-01-09 00:19:02 +0900100 log.info("Started");
101 }
102
103 @Deactivate
104 protected void deactivate() {
105 eventExecutor.shutdown();
106 deviceService.removeListener(internalDeviceListener);
Jian Lidc08c952022-03-23 13:28:01 +0900107 kubevirtNodeService.removeListener(kubevirtNodeListener);
Jian Li43244382021-01-09 00:19:02 +0900108 log.info("Stopped");
109 }
110
111 private boolean containsPhyPatchPort(KubevirtNode node, Port port) {
112 Set<String> intPatchPorts = node.phyIntfs().stream()
113 .map(pi -> structurePortName(INTEGRATION_TO_PHYSICAL_PREFIX
114 + pi.network())).collect(Collectors.toSet());
115 String portName = port.annotations().value(PORT_NAME);
116 return intPatchPorts.contains(portName);
117 }
118
Jian Lif89d9602021-04-27 19:05:49 +0900119 private void setIngressRuleForPatchPort(DeviceId deviceId,
120 PortNumber portNumber,
121 boolean install) {
Jian Li43244382021-01-09 00:19:02 +0900122 TrafficSelector.Builder selector = DefaultTrafficSelector.builder()
123 .matchInPort(portNumber);
124
125 TrafficTreatment.Builder treatment = DefaultTrafficTreatment.builder()
Jian Lif89d9602021-04-27 19:05:49 +0900126 .transition(ACL_INGRESS_TABLE);
Jian Li43244382021-01-09 00:19:02 +0900127
128 flowRuleService.setRule(
129 appId,
130 deviceId,
131 selector.build(),
132 treatment.build(),
Jian Li858ccd72021-02-04 17:25:01 +0900133 PRIORITY_FORWARDING_RULE,
Jian Lif89d9602021-04-27 19:05:49 +0900134 ARP_TABLE,
Jian Li43244382021-01-09 00:19:02 +0900135 install);
136 }
137
138 private class InternalDeviceListener implements DeviceListener {
139
140 @Override
141 public boolean isRelevant(DeviceEvent event) {
142 Port port = event.port();
143 if (port == null) {
144 return false;
145 }
146
Jian Lidc08c952022-03-23 13:28:01 +0900147 KubevirtNode node = kubevirtNodeService.node(event.subject().id());
Jian Li43244382021-01-09 00:19:02 +0900148 if (node == null) {
149 return false;
150 }
151
152 return containsPhyPatchPort(node, port);
153 }
154
155 private boolean isRelevantHelper(DeviceEvent event) {
156 return mastershipService.isLocalMaster(event.subject().id());
157 }
158
159 @Override
160 public void event(DeviceEvent event) {
161 log.info("Device event occurred with type {}", event.type());
162
163 switch (event.type()) {
164 case PORT_ADDED:
165 case PORT_UPDATED:
166 eventExecutor.execute(() -> processPortAddition(event));
167 break;
168 case PORT_REMOVED:
169 eventExecutor.execute(() -> processPortRemoval(event));
170 break;
171 default:
172 break;
173 }
174 }
175
176 private void processPortAddition(DeviceEvent event) {
177 if (!isRelevantHelper(event)) {
178 return;
179 }
Jian Lif89d9602021-04-27 19:05:49 +0900180 setIngressRuleForPatchPort(event.subject().id(),
Jian Li43244382021-01-09 00:19:02 +0900181 event.port().number(), true);
182 }
183 private void processPortRemoval(DeviceEvent event) {
184 if (!isRelevantHelper(event)) {
185 return;
186 }
Jian Lif89d9602021-04-27 19:05:49 +0900187 setIngressRuleForPatchPort(event.subject().id(),
Jian Li43244382021-01-09 00:19:02 +0900188 event.port().number(), false);
189 }
190 }
Jian Lidc08c952022-03-23 13:28:01 +0900191
192 private class InternalKubevirtNodeListener implements KubevirtNodeListener {
193 private boolean isRelevantHelper() {
194 return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
195 }
196
197 @Override
198 public void event(KubevirtNodeEvent event) {
199 switch (event.type()) {
200 case KUBEVIRT_NODE_COMPLETE:
201 eventExecutor.execute(() -> processNodeCompletion(event.subject()));
202 break;
203 case KUBEVIRT_NODE_INCOMPLETE:
204 default:
205 // do nothing
206 break;
207 }
208 }
209
210 private void processNodeCompletion(KubevirtNode node) {
211 if (!isRelevantHelper()) {
212 return;
213 }
214
215 deviceService.getPorts(node.intgBridge()).forEach(p -> {
216 if (containsPhyPatchPort(node, p)) {
217 setIngressRuleForPatchPort(node.intgBridge(), p.number(), true);
218 }
219 });
220 }
221 }
Jian Li43244382021-01-09 00:19:02 +0900222}