blob: 301aa0e9958c9cf89a48e50ad0e1bd5d64ec9580 [file] [log] [blame]
Jian Li7e8f57e2019-01-24 18:31:03 +09001/*
2 * Copyright 2019-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.k8snetworking.impl;
17
18import com.google.common.collect.ImmutableSet;
19import org.onlab.util.KryoNamespace;
20import org.onosproject.core.ApplicationId;
21import org.onosproject.core.CoreService;
22import org.onosproject.k8snetworking.api.DefaultK8sNetwork;
23import org.onosproject.k8snetworking.api.DefaultK8sPort;
24import org.onosproject.k8snetworking.api.K8sNetwork;
25import org.onosproject.k8snetworking.api.K8sNetworkEvent;
26import org.onosproject.k8snetworking.api.K8sNetworkStore;
27import org.onosproject.k8snetworking.api.K8sNetworkStoreDelegate;
28import org.onosproject.k8snetworking.api.K8sPort;
29import org.onosproject.store.AbstractStore;
30import org.onosproject.store.serializers.KryoNamespaces;
31import org.onosproject.store.service.ConsistentMap;
32import org.onosproject.store.service.MapEvent;
33import org.onosproject.store.service.MapEventListener;
34import org.onosproject.store.service.Serializer;
35import org.onosproject.store.service.StorageService;
36import org.onosproject.store.service.Versioned;
37import org.osgi.service.component.annotations.Activate;
38import org.osgi.service.component.annotations.Component;
39import org.osgi.service.component.annotations.Deactivate;
40import org.osgi.service.component.annotations.Reference;
41import org.osgi.service.component.annotations.ReferenceCardinality;
42import org.slf4j.Logger;
43
44import java.util.Collection;
45import java.util.Set;
46import java.util.concurrent.ExecutorService;
47
48import static com.google.common.base.Preconditions.checkArgument;
49import static java.util.concurrent.Executors.newSingleThreadExecutor;
50import static org.onlab.util.Tools.groupedThreads;
51import static org.onosproject.k8snetworking.api.K8sNetworkEvent.Type.K8S_NETWORK_CREATED;
52import static org.onosproject.k8snetworking.api.K8sNetworkEvent.Type.K8S_NETWORK_REMOVED;
53import static org.onosproject.k8snetworking.api.K8sNetworkEvent.Type.K8S_NETWORK_UPDATED;
Jian Li4aa17642019-01-30 00:01:11 +090054import static org.onosproject.k8snetworking.api.K8sNetworkEvent.Type.K8S_PORT_ACTIVATED;
Jian Li7e8f57e2019-01-24 18:31:03 +090055import static org.onosproject.k8snetworking.api.K8sNetworkEvent.Type.K8S_PORT_CREATED;
Jian Li4aa17642019-01-30 00:01:11 +090056import static org.onosproject.k8snetworking.api.K8sNetworkEvent.Type.K8S_PORT_INACTIVATED;
Jian Li7e8f57e2019-01-24 18:31:03 +090057import static org.onosproject.k8snetworking.api.K8sNetworkEvent.Type.K8S_PORT_REMOVED;
58import static org.onosproject.k8snetworking.api.K8sNetworkEvent.Type.K8S_PORT_UPDATED;
Jian Li4aa17642019-01-30 00:01:11 +090059import static org.onosproject.k8snetworking.api.K8sPort.State.ACTIVE;
60import static org.onosproject.k8snetworking.api.K8sPort.State.INACTIVE;
Jian Li7e8f57e2019-01-24 18:31:03 +090061import static org.slf4j.LoggerFactory.getLogger;
62
63/**
64 * Implementation of kubernetes network store using consistent map.
65 */
66@Component(immediate = true, service = K8sNetworkStore.class)
67public class DistributedK8sNetworkStore
68 extends AbstractStore<K8sNetworkEvent, K8sNetworkStoreDelegate>
69 implements K8sNetworkStore {
70
71 private final Logger log = getLogger(getClass());
72
73 private static final String ERR_NOT_FOUND = " does not exist";
74 private static final String ERR_DUPLICATE = " already exists";
75 private static final String APP_ID = "org.onosproject.k8snetwork";
76
77 private static final KryoNamespace
78 SERIALIZER_K8S_NETWORK_PORT = KryoNamespace.newBuilder()
79 .register(KryoNamespaces.API)
80 .register(K8sNetwork.class)
81 .register(K8sNetwork.Type.class)
82 .register(DefaultK8sNetwork.class)
83 .register(K8sPort.class)
84 .register(K8sPort.State.class)
85 .register(DefaultK8sPort.class)
86 .register(Collection.class)
87 .build();
88
89 @Reference(cardinality = ReferenceCardinality.MANDATORY)
90 protected CoreService coreService;
91
92 @Reference(cardinality = ReferenceCardinality.MANDATORY)
93 protected StorageService storageService;
94
95 private final ExecutorService eventExecutor = newSingleThreadExecutor(
96 groupedThreads(this.getClass().getSimpleName(), "event-handler", log));
97
98 private final MapEventListener<String, K8sNetwork> networkMapListener =
99 new K8sNetworkMapListener();
100 private final MapEventListener<String, K8sPort> portMapListener =
101 new K8sPortMapListener();
102
103 private ConsistentMap<String, K8sNetwork> networkStore;
104 private ConsistentMap<String, K8sPort> portStore;
105
106 @Activate
107 protected void activate() {
108 ApplicationId appId = coreService.registerApplication(APP_ID);
109 networkStore = storageService.<String, K8sNetwork>consistentMapBuilder()
110 .withSerializer(Serializer.using(SERIALIZER_K8S_NETWORK_PORT))
111 .withName("k8s-networkstore")
112 .withApplicationId(appId)
113 .build();
114 portStore = storageService.<String, K8sPort>consistentMapBuilder()
115 .withSerializer(Serializer.using(SERIALIZER_K8S_NETWORK_PORT))
116 .withName("k8s-portstore")
117 .withApplicationId(appId)
118 .build();
119 networkStore.addListener(networkMapListener);
120 portStore.addListener(portMapListener);
121 log.info("Started");
122 }
123
124 @Deactivate
125 protected void deactivate() {
126 networkStore.removeListener(networkMapListener);
127 portStore.removeListener(portMapListener);
128 eventExecutor.shutdown();
129 log.info("Stopped");
130 }
131
132 @Override
133 public void createNetwork(K8sNetwork network) {
134 networkStore.compute(network.networkId(), (networkId, existing) -> {
135 final String error = network.networkId() + ERR_DUPLICATE;
136 checkArgument(existing == null, error);
137 return network;
138 });
139 }
140
141 @Override
142 public void updateNetwork(K8sNetwork network) {
143 networkStore.compute(network.networkId(), (networkId, existing) -> {
144 final String error = network.networkId() + ERR_NOT_FOUND;
145 checkArgument(existing != null, error);
146 return network;
147 });
148 }
149
150 @Override
151 public K8sNetwork removeNetwork(String networkId) {
152 Versioned<K8sNetwork> network = networkStore.remove(networkId);
153 if (network == null) {
154 final String error = networkId + ERR_NOT_FOUND;
155 throw new IllegalArgumentException(error);
156 }
157 return network.value();
158 }
159
160 @Override
161 public K8sNetwork network(String networkId) {
162 return networkStore.asJavaMap().get(networkId);
163 }
164
165 @Override
166 public Set<K8sNetwork> networks() {
167 return ImmutableSet.copyOf(networkStore.asJavaMap().values());
168 }
169
170 @Override
171 public void createPort(K8sPort port) {
172 portStore.compute(port.portId(), (portId, existing) -> {
173 final String error = port.portId() + ERR_DUPLICATE;
174 checkArgument(existing == null, error);
175 return port;
176 });
177 }
178
179 @Override
180 public void updatePort(K8sPort port) {
181 portStore.compute(port.portId(), (portId, existing) -> {
182 final String error = port.portId() + ERR_NOT_FOUND;
183 checkArgument(existing != null, error);
184 return port;
185 });
186 }
187
188 @Override
189 public K8sPort removePort(String portId) {
190 Versioned<K8sPort> port = portStore.remove(portId);
191 if (port == null) {
192 final String error = portId + ERR_NOT_FOUND;
193 throw new IllegalArgumentException(error);
194 }
195 return port.value();
196 }
197
198 @Override
199 public Set<K8sPort> ports() {
200 return ImmutableSet.copyOf(portStore.asJavaMap().values());
201 }
202
203 @Override
204 public K8sPort port(String portId) {
205 return portStore.asJavaMap().get(portId);
206 }
207
208 @Override
209 public void clear() {
210 portStore.clear();
211 networkStore.clear();
212 }
213
214 private class K8sNetworkMapListener implements MapEventListener<String, K8sNetwork> {
215
216 @Override
217 public void event(MapEvent<String, K8sNetwork> event) {
218
219 switch (event.type()) {
220 case INSERT:
221 log.debug("Kubernetes network created {}", event.newValue());
222 eventExecutor.execute(() ->
223 notifyDelegate(new K8sNetworkEvent(
224 K8S_NETWORK_CREATED, event.newValue().value())));
225 break;
226 case UPDATE:
227 log.debug("Kubernetes network updated {}", event.newValue());
228 eventExecutor.execute(() ->
229 notifyDelegate(new K8sNetworkEvent(
230 K8S_NETWORK_UPDATED, event.newValue().value())));
231 break;
232 case REMOVE:
233 log.debug("Kubernetes network removed {}", event.oldValue());
234 eventExecutor.execute(() ->
235 notifyDelegate(new K8sNetworkEvent(
236 K8S_NETWORK_REMOVED, event.oldValue().value())));
237 break;
238 default:
239 // do nothing
240 break;
241 }
242 }
243 }
244
245 private class K8sPortMapListener implements MapEventListener<String, K8sPort> {
246
247 @Override
248 public void event(MapEvent<String, K8sPort> event) {
249
250 switch (event.type()) {
251 case INSERT:
252 log.debug("Kubernetes port created");
253 eventExecutor.execute(() ->
254 notifyDelegate(new K8sNetworkEvent(
255 K8S_PORT_CREATED,
256 network(event.newValue().value().networkId()),
257 event.newValue().value())));
258 break;
259 case UPDATE:
260 log.debug("Kubernetes port updated");
Jian Li4aa17642019-01-30 00:01:11 +0900261 eventExecutor.execute(() -> processPortUpdate(event));
Jian Li7e8f57e2019-01-24 18:31:03 +0900262 break;
263 case REMOVE:
264 log.debug("Kubernetes port removed");
Jian Li970c6e52019-12-24 21:39:33 +0900265
266 // if the event object has invalid port value, we do not
267 // propagate K8S_PORT_REMOVED event.
268 if (event.oldValue() != null &&
269 event.oldValue().value() != null) {
270 notifyDelegate(new K8sNetworkEvent(
271 K8S_PORT_REMOVED,
272 network(event.oldValue().value().networkId()),
273 event.oldValue().value()));
274 }
275
Jian Li7e8f57e2019-01-24 18:31:03 +0900276 break;
277 default:
278 // do nothing
279 break;
280 }
281 }
Jian Li4aa17642019-01-30 00:01:11 +0900282
283 private void processPortUpdate(MapEvent<String, K8sPort> event) {
284 K8sPort.State oldState = event.oldValue().value().state();
285 K8sPort.State newState = event.newValue().value().state();
286
287 eventExecutor.execute(() ->
288 notifyDelegate(new K8sNetworkEvent(
289 K8S_PORT_UPDATED,
290 network(event.newValue().value().networkId()),
291 event.newValue().value())));
292
293 if (oldState == INACTIVE && newState == ACTIVE) {
294 notifyDelegate(new K8sNetworkEvent(
295 K8S_PORT_ACTIVATED,
296 network(event.newValue().value().networkId()),
297 event.newValue().value()));
298 }
299
300 if (oldState == ACTIVE && newState == INACTIVE) {
301 notifyDelegate(new K8sNetworkEvent(
302 K8S_PORT_INACTIVATED,
303 network(event.newValue().value().networkId()),
304 event.newValue().value()));
305 }
306 }
Jian Li7e8f57e2019-01-24 18:31:03 +0900307 }
308}