blob: 15461dfb63900171ba2e71c16c2c6cb77bb26a1a [file] [log] [blame]
Jian Li0b481122021-01-17 04:26:18 +09001/*
2 * Copyright 2021-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.kubevirtnetworking.impl;
17
18import com.google.common.collect.ImmutableSet;
19import org.onlab.packet.MacAddress;
20import org.onlab.util.KryoNamespace;
21import org.onosproject.core.ApplicationId;
22import org.onosproject.core.CoreService;
23import org.onosproject.kubevirtnetworking.api.DefaultKubevirtPort;
24import org.onosproject.kubevirtnetworking.api.KubevirtPort;
25import org.onosproject.kubevirtnetworking.api.KubevirtPortEvent;
26import org.onosproject.kubevirtnetworking.api.KubevirtPortStore;
27import org.onosproject.kubevirtnetworking.api.KubevirtPortStoreDelegate;
Jian Li8f944d42021-03-23 00:43:29 +090028import org.onosproject.net.DeviceId;
Jian Li0b481122021-01-17 04:26:18 +090029import org.onosproject.store.AbstractStore;
30import org.onosproject.store.serializers.KryoNamespaces;
31import org.onosproject.store.service.ConsistentMap;
32import org.onosproject.store.service.MapEvent;
33import org.onosproject.store.service.MapEventListener;
34import org.onosproject.store.service.Serializer;
35import org.onosproject.store.service.StorageService;
36import org.onosproject.store.service.Versioned;
37import org.osgi.service.component.annotations.Activate;
38import org.osgi.service.component.annotations.Component;
39import org.osgi.service.component.annotations.Deactivate;
40import org.osgi.service.component.annotations.Reference;
41import org.osgi.service.component.annotations.ReferenceCardinality;
42import org.slf4j.Logger;
43
44import java.util.Collection;
Jian Li8f944d42021-03-23 00:43:29 +090045import java.util.Objects;
Jian Li0b481122021-01-17 04:26:18 +090046import java.util.Set;
47import java.util.concurrent.ExecutorService;
48
49import static com.google.common.base.Preconditions.checkArgument;
50import static java.util.concurrent.Executors.newSingleThreadExecutor;
51import static org.onlab.util.Tools.groupedThreads;
52import static org.onosproject.kubevirtnetworking.api.KubevirtPortEvent.Type.KUBEVIRT_PORT_CREATED;
Jian Li8f944d42021-03-23 00:43:29 +090053import static org.onosproject.kubevirtnetworking.api.KubevirtPortEvent.Type.KUBEVIRT_PORT_DEVICE_ADDED;
Jian Li0b481122021-01-17 04:26:18 +090054import static org.onosproject.kubevirtnetworking.api.KubevirtPortEvent.Type.KUBEVIRT_PORT_REMOVED;
Jian Li8f944d42021-03-23 00:43:29 +090055import static org.onosproject.kubevirtnetworking.api.KubevirtPortEvent.Type.KUBEVIRT_PORT_SECURITY_GROUP_ADDED;
56import static org.onosproject.kubevirtnetworking.api.KubevirtPortEvent.Type.KUBEVIRT_PORT_SECURITY_GROUP_REMOVED;
Jian Li0b481122021-01-17 04:26:18 +090057import static org.onosproject.kubevirtnetworking.api.KubevirtPortEvent.Type.KUBEVIRT_PORT_UPDATED;
58import static org.slf4j.LoggerFactory.getLogger;
59
60/**
61 * Implementation of kubevirt pod store using consistent map.
62 */
63@Component(immediate = true, service = KubevirtPortStore.class)
64public class DistributedKubevirtPortStore
65 extends AbstractStore<KubevirtPortEvent, KubevirtPortStoreDelegate>
66 implements KubevirtPortStore {
67
68 private final Logger log = getLogger(getClass());
69
70 private static final String ERR_NOT_FOUND = " does not exist";
71 private static final String ERR_DUPLICATE = " already exists";
72 private static final String APP_ID = "org.onosproject.kubevirtnetwork";
73
74 private static final KryoNamespace
75 SERIALIZER_KUBEVIRT_PORT = KryoNamespace.newBuilder()
76 .register(KryoNamespaces.API)
77 .register(KubevirtPort.class)
78 .register(DefaultKubevirtPort.class)
79 .register(Collection.class)
80 .build();
81
82 @Reference(cardinality = ReferenceCardinality.MANDATORY)
83 protected CoreService coreService;
84
85 @Reference(cardinality = ReferenceCardinality.MANDATORY)
86 protected StorageService storageService;
87
88 private final ExecutorService eventExecutor = newSingleThreadExecutor(
89 groupedThreads(this.getClass().getSimpleName(), "event-handler", log));
90
91 private final MapEventListener<String, KubevirtPort> portMapListener =
92 new KubevirtPortMapListener();
93
94 private ConsistentMap<String, KubevirtPort> portStore;
95
96 @Activate
97 protected void activate() {
98 ApplicationId appId = coreService.registerApplication(APP_ID);
99 portStore = storageService.<String, KubevirtPort>consistentMapBuilder()
100 .withSerializer(Serializer.using(SERIALIZER_KUBEVIRT_PORT))
101 .withName("kubevirt-portstore")
102 .withApplicationId(appId)
103 .build();
104 portStore.addListener(portMapListener);
105 log.info("Started");
106 }
107
108 @Deactivate
109 protected void deactivate() {
110 portStore.removeListener(portMapListener);
111 eventExecutor.shutdown();
112 log.info("Stopped");
113 }
114
115 @Override
116 public void createPort(KubevirtPort port) {
117 portStore.compute(port.macAddress().toString(), (mac, existing) -> {
118 final String error = port.macAddress().toString() + ERR_DUPLICATE;
119 checkArgument(existing == null, error);
120 return port;
121 });
122 }
123
124 @Override
125 public void updatePort(KubevirtPort port) {
126 portStore.compute(port.macAddress().toString(), (mac, existing) -> {
127 final String error = port.macAddress().toString() + ERR_NOT_FOUND;
128 checkArgument(existing != null, error);
129 return port;
130 });
131 }
132
133 @Override
134 public KubevirtPort removePort(MacAddress mac) {
135 Versioned<KubevirtPort> port = portStore.remove(mac.toString());
136 if (port == null) {
137 final String error = mac.toString() + ERR_NOT_FOUND;
138 throw new IllegalArgumentException(error);
139 }
140 return port.value();
141 }
142
143 @Override
144 public KubevirtPort port(MacAddress mac) {
145 return portStore.asJavaMap().get(mac.toString());
146 }
147
148 @Override
149 public Set<KubevirtPort> ports() {
150 return ImmutableSet.copyOf(portStore.asJavaMap().values());
151 }
152
153 @Override
154 public void clear() {
155 portStore.clear();
156 }
157
158 private class KubevirtPortMapListener implements MapEventListener<String, KubevirtPort> {
159
160 @Override
161 public void event(MapEvent<String, KubevirtPort> event) {
162 switch (event.type()) {
163 case INSERT:
164 log.debug("Kubevirt port created");
165 eventExecutor.execute(() ->
166 notifyDelegate(new KubevirtPortEvent(
167 KUBEVIRT_PORT_CREATED, event.newValue().value())));
168 break;
169 case UPDATE:
170 log.debug("Kubevirt port updated");
171 eventExecutor.execute(() ->
172 notifyDelegate(new KubevirtPortEvent(
173 KUBEVIRT_PORT_UPDATED, event.newValue().value())));
Jian Li8f944d42021-03-23 00:43:29 +0900174 processSecurityGroupEvent(event.oldValue().value(), event.newValue().value());
175 processDeviceEvent(event.oldValue().value(), event.newValue().value());
Jian Li0b481122021-01-17 04:26:18 +0900176 break;
177 case REMOVE:
178 log.debug("Kubevirt port removed");
179 // if the event object has invalid port value, we do not
180 // propagate KUBEVIRT_PORT_REMOVED event.
181 if (event.oldValue() != null && event.oldValue().value() != null) {
Jian Li810f58c2021-02-27 01:10:50 +0900182 eventExecutor.execute(() ->
183 notifyDelegate(new KubevirtPortEvent(
184 KUBEVIRT_PORT_REMOVED, event.oldValue().value())));
Jian Li0b481122021-01-17 04:26:18 +0900185 }
186 break;
187 default:
188 // do nothing
189 break;
190 }
191 }
Jian Li8f944d42021-03-23 00:43:29 +0900192
193 private void processSecurityGroupEvent(KubevirtPort oldPort, KubevirtPort newPort) {
194 Set<String> oldSecurityGroups = oldPort.securityGroups() == null ?
195 ImmutableSet.of() : oldPort.securityGroups();
196 Set<String> newSecurityGroups = newPort.securityGroups() == null ?
197 ImmutableSet.of() : newPort.securityGroups();
198
199 oldSecurityGroups.stream()
200 .filter(sgId -> !Objects.requireNonNull(
201 newPort.securityGroups()).contains(sgId))
202 .forEach(sgId -> notifyDelegate(new KubevirtPortEvent(
203 KUBEVIRT_PORT_SECURITY_GROUP_REMOVED, newPort, sgId
204 )));
205
206 newSecurityGroups.stream()
207 .filter(sgId -> !oldPort.securityGroups().contains(sgId))
208 .forEach(sgId -> notifyDelegate(new KubevirtPortEvent(
209 KUBEVIRT_PORT_SECURITY_GROUP_ADDED, newPort, sgId
210 )));
211 }
212
213 private void processDeviceEvent(KubevirtPort oldPort, KubevirtPort newPort) {
214 DeviceId oldDeviceId = oldPort.deviceId();
215 DeviceId newDeviceId = newPort.deviceId();
216
217 if (oldDeviceId == null && newDeviceId != null) {
218 notifyDelegate(new KubevirtPortEvent(
219 KUBEVIRT_PORT_DEVICE_ADDED, newPort, newDeviceId
220 ));
221 }
222 }
Jian Li0b481122021-01-17 04:26:18 +0900223 }
224}