blob: 73dd7a4b555bc2836a046a449fffdeb7a6d43515 [file] [log] [blame]
Jian Lie2a04ce2020-07-01 19:07:02 +09001/*
2 * Copyright 2020-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.k8snode.impl;
17
18import com.google.common.collect.ImmutableSet;
19import org.onlab.packet.IpAddress;
20import org.onlab.util.KryoNamespace;
21import org.onosproject.core.ApplicationId;
22import org.onosproject.core.CoreService;
23import org.onosproject.k8snode.api.DefaultK8sHost;
24import org.onosproject.k8snode.api.K8sHost;
25import org.onosproject.k8snode.api.K8sHostEvent;
26import org.onosproject.k8snode.api.K8sHostState;
27import org.onosproject.k8snode.api.K8sHostStore;
28import org.onosproject.k8snode.api.K8sHostStoreDelegate;
Jian Li077b07e2020-09-01 16:55:25 +090029import org.onosproject.k8snode.api.K8sTunnelBridge;
Jian Lie2a04ce2020-07-01 19:07:02 +090030import org.onosproject.store.AbstractStore;
31import org.onosproject.store.serializers.KryoNamespaces;
32import org.onosproject.store.service.ConsistentMap;
33import org.onosproject.store.service.MapEvent;
34import org.onosproject.store.service.MapEventListener;
35import org.onosproject.store.service.Serializer;
36import org.onosproject.store.service.StorageService;
37import org.onosproject.store.service.Versioned;
38import org.osgi.service.component.annotations.Activate;
39import org.osgi.service.component.annotations.Component;
40import org.osgi.service.component.annotations.Deactivate;
41import org.osgi.service.component.annotations.Reference;
42import org.osgi.service.component.annotations.ReferenceCardinality;
43import org.slf4j.Logger;
44
45import java.util.Collection;
46import java.util.HashSet;
47import java.util.Set;
48import java.util.concurrent.ExecutorService;
49
50import static com.google.common.base.Preconditions.checkArgument;
51import static java.util.concurrent.Executors.newSingleThreadExecutor;
52import static org.onlab.util.Tools.groupedThreads;
53import static org.onosproject.k8snode.api.K8sHostEvent.Type.K8S_HOST_COMPLETE;
54import static org.onosproject.k8snode.api.K8sHostEvent.Type.K8S_HOST_CREATED;
55import static org.onosproject.k8snode.api.K8sHostEvent.Type.K8S_HOST_INCOMPLETE;
56import static org.onosproject.k8snode.api.K8sHostEvent.Type.K8S_HOST_REMOVED;
57import static org.onosproject.k8snode.api.K8sHostEvent.Type.K8S_HOST_UPDATED;
58import static org.onosproject.k8snode.api.K8sHostEvent.Type.K8S_NODES_ADDED;
59import static org.onosproject.k8snode.api.K8sHostEvent.Type.K8S_NODES_REMOVED;
60import static org.onosproject.k8snode.api.K8sHostState.COMPLETE;
61import static org.onosproject.k8snode.api.K8sHostState.INCOMPLETE;
62import static org.slf4j.LoggerFactory.getLogger;
63
64/**
65 * Implementation of kubernetes host store using consistent map.
66 */
67@Component(immediate = true, service = K8sHostStore.class)
68public class DistributedK8sHostStore
69 extends AbstractStore<K8sHostEvent, K8sHostStoreDelegate>
70 implements K8sHostStore {
71
72 private final Logger log = getLogger(getClass());
73
74 private static final String ERR_NOT_FOUND = " does not exist";
75 private static final String ERR_DUPLICATE = " already exists";
76 private static final String APP_ID = "org.onosproject.k8snode";
77
78 private static final KryoNamespace
79 SERIALIZER_K8S_HOST = KryoNamespace.newBuilder()
80 .register(KryoNamespaces.API)
81 .register(K8sHost.class)
82 .register(DefaultK8sHost.class)
83 .register(K8sHostState.class)
Jian Li077b07e2020-09-01 16:55:25 +090084 .register(K8sTunnelBridge.class)
Jian Lie2a04ce2020-07-01 19:07:02 +090085 .register(Collection.class)
86 .build();
87
88 @Reference(cardinality = ReferenceCardinality.MANDATORY)
89 protected CoreService coreService;
90
91 @Reference(cardinality = ReferenceCardinality.MANDATORY)
92 protected StorageService storageService;
93
94 private final ExecutorService eventExecutor = newSingleThreadExecutor(
95 groupedThreads(this.getClass().getSimpleName(), "event-handler", log));
96
97 private final MapEventListener<String, K8sHost> hostMapListener =
98 new K8sHostMapListener();
99 private ConsistentMap<String, K8sHost> hostStore;
100
101 @Activate
102 protected void activate() {
103 ApplicationId appId = coreService.registerApplication(APP_ID);
104 hostStore = storageService.<String, K8sHost>consistentMapBuilder()
105 .withSerializer(Serializer.using(SERIALIZER_K8S_HOST))
106 .withName("k8s-hoststore")
107 .withApplicationId(appId)
108 .build();
109 hostStore.addListener(hostMapListener);
110 log.info("Started");
111 }
112
113 @Deactivate
114 protected void deactivate() {
115 hostStore.removeListener(hostMapListener);
116 eventExecutor.shutdown();
117 log.info("Stopped");
118 }
119
120 @Override
121 public void createHost(K8sHost host) {
122 hostStore.compute(host.hostIp().toString(), (hostIp, existing) -> {
123 final String error = host.hostIp().toString() + ERR_DUPLICATE;
124 checkArgument(existing == null, error);
125 return host;
126 });
127 }
128
129 @Override
130 public void updateHost(K8sHost host) {
131 hostStore.compute(host.hostIp().toString(), (hostIp, existing) -> {
132 final String error = host.hostIp().toString() + ERR_NOT_FOUND;
133 checkArgument(existing != null, error);
134 return host;
135 });
136 }
137
138 @Override
139 public K8sHost removeHost(IpAddress hostIp) {
140 Versioned<K8sHost> host = hostStore.remove(hostIp.toString());
141 if (host == null) {
142 final String error = hostIp.toString() + ERR_NOT_FOUND;
143 throw new IllegalArgumentException(error);
144 }
145 return host.value();
146 }
147
148 @Override
149 public Set<K8sHost> hosts() {
150 return ImmutableSet.copyOf(hostStore.asJavaMap().values());
151 }
152
153 @Override
154 public K8sHost host(IpAddress hostIp) {
155 return hostStore.asJavaMap().get(hostIp.toString());
156 }
157
158 private class K8sHostMapListener
159 implements MapEventListener<String, K8sHost> {
160
161 @Override
162 public void event(MapEvent<String, K8sHost> event) {
163 switch (event.type()) {
164 case INSERT:
165 log.debug("Kubernetes host created {}", event.newValue());
166 eventExecutor.execute(() ->
167 notifyDelegate(new K8sHostEvent(
168 K8S_HOST_CREATED, event.newValue().value()
169 )));
170 break;
171 case UPDATE:
172 log.debug("Kubernetes host updated {}", event.newValue());
173 eventExecutor.execute(() -> {
174 notifyDelegate(new K8sHostEvent(
175 K8S_HOST_UPDATED, event.newValue().value()
176 ));
177
178 if (event.newValue().value().state() == COMPLETE) {
179 notifyDelegate(new K8sHostEvent(
180 K8S_HOST_COMPLETE,
181 event.newValue().value()
182 ));
183 } else if (event.newValue().value().state() == INCOMPLETE) {
184 notifyDelegate(new K8sHostEvent(
185 K8S_HOST_INCOMPLETE,
186 event.newValue().value()
187 ));
188 }
189
190 K8sHost origHost = event.newValue().value();
191 Set<String> oldNodes = event.oldValue().value().nodeNames();
192 Set<String> newNodes = event.newValue().value().nodeNames();
193
194 Set<String> addedNodes = new HashSet<>(newNodes);
195 Set<String> removedNodes = new HashSet<>(oldNodes);
196
197 addedNodes.removeAll(oldNodes);
198 removedNodes.removeAll(newNodes);
199
200 if (addedNodes.size() > 0) {
201 K8sHost addedHost = DefaultK8sHost.builder()
202 .hostIp(origHost.hostIp())
203 .state(origHost.state())
204 .nodeNames(addedNodes)
205 .build();
206 notifyDelegate(new K8sHostEvent(K8S_NODES_ADDED, addedHost));
207 }
208
209 if (removedNodes.size() > 0) {
210 K8sHost removedHost = DefaultK8sHost.builder()
211 .hostIp(origHost.hostIp())
212 .state(origHost.state())
213 .nodeNames(removedNodes)
214 .build();
215 notifyDelegate(new K8sHostEvent(K8S_NODES_REMOVED, removedHost));
216 }
217 });
218 break;
219 case REMOVE:
220 log.debug("Kubernetes host removed {}", event.oldValue());
221 eventExecutor.execute(() ->
222 notifyDelegate(new K8sHostEvent(
223 K8S_HOST_REMOVED, event.oldValue().value()
224 )));
225 break;
226 default:
227 // do nothing
228 break;
229 }
230 }
231 }
232}