blob: 3367d1f0dace599979d683898799d29513ccdf2e [file] [log] [blame]
Jian Li0b481122021-01-17 04:26:18 +09001/*
2 * Copyright 2021-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.kubevirtnetworking.impl;
17
18import com.google.common.collect.ImmutableSet;
19import org.onlab.packet.MacAddress;
20import org.onlab.util.KryoNamespace;
21import org.onosproject.core.ApplicationId;
22import org.onosproject.core.CoreService;
23import org.onosproject.kubevirtnetworking.api.DefaultKubevirtPort;
24import org.onosproject.kubevirtnetworking.api.KubevirtPort;
25import org.onosproject.kubevirtnetworking.api.KubevirtPortEvent;
26import org.onosproject.kubevirtnetworking.api.KubevirtPortStore;
27import org.onosproject.kubevirtnetworking.api.KubevirtPortStoreDelegate;
Jian Li8f944d42021-03-23 00:43:29 +090028import org.onosproject.net.DeviceId;
Jian Li0b481122021-01-17 04:26:18 +090029import org.onosproject.store.AbstractStore;
30import org.onosproject.store.serializers.KryoNamespaces;
31import org.onosproject.store.service.ConsistentMap;
32import org.onosproject.store.service.MapEvent;
33import org.onosproject.store.service.MapEventListener;
34import org.onosproject.store.service.Serializer;
35import org.onosproject.store.service.StorageService;
36import org.onosproject.store.service.Versioned;
37import org.osgi.service.component.annotations.Activate;
38import org.osgi.service.component.annotations.Component;
39import org.osgi.service.component.annotations.Deactivate;
40import org.osgi.service.component.annotations.Reference;
41import org.osgi.service.component.annotations.ReferenceCardinality;
42import org.slf4j.Logger;
43
44import java.util.Collection;
Jian Li8f944d42021-03-23 00:43:29 +090045import java.util.Objects;
Jian Li0b481122021-01-17 04:26:18 +090046import java.util.Set;
47import java.util.concurrent.ExecutorService;
48
49import static com.google.common.base.Preconditions.checkArgument;
50import static java.util.concurrent.Executors.newSingleThreadExecutor;
51import static org.onlab.util.Tools.groupedThreads;
52import static org.onosproject.kubevirtnetworking.api.KubevirtPortEvent.Type.KUBEVIRT_PORT_CREATED;
Jian Li8f944d42021-03-23 00:43:29 +090053import static org.onosproject.kubevirtnetworking.api.KubevirtPortEvent.Type.KUBEVIRT_PORT_DEVICE_ADDED;
Jian Li969abd82022-10-17 18:28:19 +090054import static org.onosproject.kubevirtnetworking.api.KubevirtPortEvent.Type.KUBEVIRT_PORT_MIGRATED;
Jian Li0b481122021-01-17 04:26:18 +090055import static org.onosproject.kubevirtnetworking.api.KubevirtPortEvent.Type.KUBEVIRT_PORT_REMOVED;
Jian Li8f944d42021-03-23 00:43:29 +090056import static org.onosproject.kubevirtnetworking.api.KubevirtPortEvent.Type.KUBEVIRT_PORT_SECURITY_GROUP_ADDED;
57import static org.onosproject.kubevirtnetworking.api.KubevirtPortEvent.Type.KUBEVIRT_PORT_SECURITY_GROUP_REMOVED;
Jian Li0b481122021-01-17 04:26:18 +090058import static org.onosproject.kubevirtnetworking.api.KubevirtPortEvent.Type.KUBEVIRT_PORT_UPDATED;
59import static org.slf4j.LoggerFactory.getLogger;
60
61/**
62 * Implementation of kubevirt pod store using consistent map.
63 */
64@Component(immediate = true, service = KubevirtPortStore.class)
65public class DistributedKubevirtPortStore
66 extends AbstractStore<KubevirtPortEvent, KubevirtPortStoreDelegate>
67 implements KubevirtPortStore {
68
69 private final Logger log = getLogger(getClass());
70
71 private static final String ERR_NOT_FOUND = " does not exist";
72 private static final String ERR_DUPLICATE = " already exists";
73 private static final String APP_ID = "org.onosproject.kubevirtnetwork";
74
75 private static final KryoNamespace
76 SERIALIZER_KUBEVIRT_PORT = KryoNamespace.newBuilder()
77 .register(KryoNamespaces.API)
78 .register(KubevirtPort.class)
79 .register(DefaultKubevirtPort.class)
80 .register(Collection.class)
81 .build();
82
83 @Reference(cardinality = ReferenceCardinality.MANDATORY)
84 protected CoreService coreService;
85
86 @Reference(cardinality = ReferenceCardinality.MANDATORY)
87 protected StorageService storageService;
88
89 private final ExecutorService eventExecutor = newSingleThreadExecutor(
90 groupedThreads(this.getClass().getSimpleName(), "event-handler", log));
91
92 private final MapEventListener<String, KubevirtPort> portMapListener =
93 new KubevirtPortMapListener();
94
95 private ConsistentMap<String, KubevirtPort> portStore;
96
97 @Activate
98 protected void activate() {
99 ApplicationId appId = coreService.registerApplication(APP_ID);
100 portStore = storageService.<String, KubevirtPort>consistentMapBuilder()
101 .withSerializer(Serializer.using(SERIALIZER_KUBEVIRT_PORT))
102 .withName("kubevirt-portstore")
103 .withApplicationId(appId)
104 .build();
105 portStore.addListener(portMapListener);
106 log.info("Started");
107 }
108
109 @Deactivate
110 protected void deactivate() {
111 portStore.removeListener(portMapListener);
112 eventExecutor.shutdown();
113 log.info("Stopped");
114 }
115
116 @Override
117 public void createPort(KubevirtPort port) {
118 portStore.compute(port.macAddress().toString(), (mac, existing) -> {
119 final String error = port.macAddress().toString() + ERR_DUPLICATE;
120 checkArgument(existing == null, error);
121 return port;
122 });
123 }
124
125 @Override
126 public void updatePort(KubevirtPort port) {
127 portStore.compute(port.macAddress().toString(), (mac, existing) -> {
128 final String error = port.macAddress().toString() + ERR_NOT_FOUND;
129 checkArgument(existing != null, error);
130 return port;
131 });
132 }
133
134 @Override
135 public KubevirtPort removePort(MacAddress mac) {
136 Versioned<KubevirtPort> port = portStore.remove(mac.toString());
137 if (port == null) {
138 final String error = mac.toString() + ERR_NOT_FOUND;
139 throw new IllegalArgumentException(error);
140 }
141 return port.value();
142 }
143
144 @Override
145 public KubevirtPort port(MacAddress mac) {
146 return portStore.asJavaMap().get(mac.toString());
147 }
148
149 @Override
150 public Set<KubevirtPort> ports() {
151 return ImmutableSet.copyOf(portStore.asJavaMap().values());
152 }
153
154 @Override
155 public void clear() {
156 portStore.clear();
157 }
158
159 private class KubevirtPortMapListener implements MapEventListener<String, KubevirtPort> {
160
161 @Override
162 public void event(MapEvent<String, KubevirtPort> event) {
163 switch (event.type()) {
164 case INSERT:
165 log.debug("Kubevirt port created");
166 eventExecutor.execute(() ->
167 notifyDelegate(new KubevirtPortEvent(
168 KUBEVIRT_PORT_CREATED, event.newValue().value())));
169 break;
170 case UPDATE:
171 log.debug("Kubevirt port updated");
172 eventExecutor.execute(() ->
173 notifyDelegate(new KubevirtPortEvent(
174 KUBEVIRT_PORT_UPDATED, event.newValue().value())));
Jian Li8f944d42021-03-23 00:43:29 +0900175 processSecurityGroupEvent(event.oldValue().value(), event.newValue().value());
176 processDeviceEvent(event.oldValue().value(), event.newValue().value());
Jian Li0b481122021-01-17 04:26:18 +0900177 break;
178 case REMOVE:
179 log.debug("Kubevirt port removed");
180 // if the event object has invalid port value, we do not
181 // propagate KUBEVIRT_PORT_REMOVED event.
182 if (event.oldValue() != null && event.oldValue().value() != null) {
Jian Li810f58c2021-02-27 01:10:50 +0900183 eventExecutor.execute(() ->
184 notifyDelegate(new KubevirtPortEvent(
185 KUBEVIRT_PORT_REMOVED, event.oldValue().value())));
Jian Li0b481122021-01-17 04:26:18 +0900186 }
187 break;
188 default:
189 // do nothing
190 break;
191 }
192 }
Jian Li8f944d42021-03-23 00:43:29 +0900193
194 private void processSecurityGroupEvent(KubevirtPort oldPort, KubevirtPort newPort) {
195 Set<String> oldSecurityGroups = oldPort.securityGroups() == null ?
196 ImmutableSet.of() : oldPort.securityGroups();
197 Set<String> newSecurityGroups = newPort.securityGroups() == null ?
198 ImmutableSet.of() : newPort.securityGroups();
199
200 oldSecurityGroups.stream()
201 .filter(sgId -> !Objects.requireNonNull(
202 newPort.securityGroups()).contains(sgId))
203 .forEach(sgId -> notifyDelegate(new KubevirtPortEvent(
204 KUBEVIRT_PORT_SECURITY_GROUP_REMOVED, newPort, sgId
205 )));
206
207 newSecurityGroups.stream()
208 .filter(sgId -> !oldPort.securityGroups().contains(sgId))
209 .forEach(sgId -> notifyDelegate(new KubevirtPortEvent(
210 KUBEVIRT_PORT_SECURITY_GROUP_ADDED, newPort, sgId
211 )));
212 }
213
214 private void processDeviceEvent(KubevirtPort oldPort, KubevirtPort newPort) {
215 DeviceId oldDeviceId = oldPort.deviceId();
216 DeviceId newDeviceId = newPort.deviceId();
217
218 if (oldDeviceId == null && newDeviceId != null) {
219 notifyDelegate(new KubevirtPortEvent(
220 KUBEVIRT_PORT_DEVICE_ADDED, newPort, newDeviceId
221 ));
222 }
Jian Li969abd82022-10-17 18:28:19 +0900223
224 if (oldDeviceId != null && newDeviceId != null && !oldDeviceId.equals(newDeviceId)) {
225 notifyDelegate(new KubevirtPortEvent(
226 KUBEVIRT_PORT_MIGRATED, newPort, oldPort, newDeviceId
227 ));
228 }
Jian Li8f944d42021-03-23 00:43:29 +0900229 }
Jian Li0b481122021-01-17 04:26:18 +0900230 }
231}