blob: 044d9aeec2d39d9aa8ae5a3537dc91ef5b8cebc0 [file] [log] [blame]
Jian Li3defa842019-02-12 00:31:35 +09001/*
2 * Copyright 2019-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.k8snode.impl;
17
18import com.google.common.collect.ImmutableSet;
19import org.onlab.packet.IpAddress;
20import org.onlab.util.KryoNamespace;
21import org.onosproject.core.ApplicationId;
22import org.onosproject.core.CoreService;
Jian Lie2a04ce2020-07-01 19:07:02 +090023import org.onosproject.k8snode.api.DefaultHostNodesInfo;
Jian Li3defa842019-02-12 00:31:35 +090024import org.onosproject.k8snode.api.DefaultK8sApiConfig;
Jian Lie2a04ce2020-07-01 19:07:02 +090025import org.onosproject.k8snode.api.HostNodesInfo;
Jian Li3defa842019-02-12 00:31:35 +090026import org.onosproject.k8snode.api.K8sApiConfig;
27import org.onosproject.k8snode.api.K8sApiConfigEvent;
28import org.onosproject.k8snode.api.K8sApiConfigStore;
29import org.onosproject.k8snode.api.K8sApiConfigStoreDelegate;
30import org.onosproject.store.AbstractStore;
31import org.onosproject.store.serializers.KryoNamespaces;
32import org.onosproject.store.service.ConsistentMap;
33import org.onosproject.store.service.MapEvent;
34import org.onosproject.store.service.MapEventListener;
35import org.onosproject.store.service.Serializer;
36import org.onosproject.store.service.StorageService;
37import org.onosproject.store.service.Versioned;
38import org.osgi.service.component.annotations.Activate;
39import org.osgi.service.component.annotations.Component;
40import org.osgi.service.component.annotations.Deactivate;
41import org.osgi.service.component.annotations.Reference;
42import org.osgi.service.component.annotations.ReferenceCardinality;
43import org.slf4j.Logger;
44
45import java.util.Collection;
46import java.util.Set;
47import java.util.concurrent.ExecutorService;
48
49import static com.google.common.base.Preconditions.checkArgument;
50import static java.util.concurrent.Executors.newSingleThreadExecutor;
51import static org.onlab.util.Tools.groupedThreads;
52import static org.onosproject.k8snode.api.K8sApiConfigEvent.Type.K8S_API_CONFIG_CREATED;
53import static org.onosproject.k8snode.api.K8sApiConfigEvent.Type.K8S_API_CONFIG_REMOVED;
54import static org.onosproject.k8snode.api.K8sApiConfigEvent.Type.K8S_API_CONFIG_UPDATED;
55import static org.onosproject.k8snode.util.K8sNodeUtil.endpoint;
56import static org.slf4j.LoggerFactory.getLogger;
57
58/**
59 * Implementation of kubernetes API config store using consistent map.
60 */
61@Component(immediate = true, service = K8sApiConfigStore.class)
62public class DistributedK8sApiConfigStore
63 extends AbstractStore<K8sApiConfigEvent, K8sApiConfigStoreDelegate>
64 implements K8sApiConfigStore {
65
66 private final Logger log = getLogger(getClass());
67
68 private static final String ERR_NOT_FOUND = " does not exist";
69 private static final String ERR_DUPLICATE = " already exists";
70 private static final String APP_ID = "org.onosproject.k8snode";
71
72 private static final KryoNamespace
73 SERIALIZER_K8S_API_CONFIG = KryoNamespace.newBuilder()
74 .register(KryoNamespaces.API)
75 .register(K8sApiConfig.class)
76 .register(DefaultK8sApiConfig.class)
Jian Lie2a04ce2020-07-01 19:07:02 +090077 .register(K8sApiConfig.Mode.class)
Jian Li3defa842019-02-12 00:31:35 +090078 .register(K8sApiConfig.Scheme.class)
Jian Li1cee9882019-02-13 11:25:25 +090079 .register(K8sApiConfig.State.class)
Jian Lie2a04ce2020-07-01 19:07:02 +090080 .register(HostNodesInfo.class)
81 .register(DefaultHostNodesInfo.class)
Jian Li3defa842019-02-12 00:31:35 +090082 .register(Collection.class)
83 .build();
84
85 @Reference(cardinality = ReferenceCardinality.MANDATORY)
86 protected CoreService coreService;
87
88 @Reference(cardinality = ReferenceCardinality.MANDATORY)
89 protected StorageService storageService;
90
91 private final ExecutorService eventExecutor = newSingleThreadExecutor(
92 groupedThreads(this.getClass().getSimpleName(), "event-handler", log));
93
94 private final MapEventListener<String, K8sApiConfig> apiConfigMapListener =
95 new K8sApiConfigMapListener();
96 private ConsistentMap<String, K8sApiConfig> apiConfigStore;
97
98 @Activate
99 protected void activate() {
100 ApplicationId appId = coreService.registerApplication(APP_ID);
101 apiConfigStore = storageService.<String, K8sApiConfig>consistentMapBuilder()
102 .withSerializer(Serializer.using(SERIALIZER_K8S_API_CONFIG))
103 .withName("k8s-apiconfig-store")
104 .withApplicationId(appId)
105 .build();
106 apiConfigStore.addListener(apiConfigMapListener);
107 log.info("Started");
108 }
109
110 @Deactivate
111 protected void deactivate() {
112 apiConfigStore.removeListener(apiConfigMapListener);
113 eventExecutor.shutdown();
114 log.info("Stopped");
115 }
116
117 @Override
118 public void createApiConfig(K8sApiConfig config) {
119 String key = endpoint(config);
120 apiConfigStore.compute(key, (endpoint, existing) -> {
121 final String error = key + ERR_DUPLICATE;
122 checkArgument(existing == null, error);
123 return config;
124 });
125 }
126
127 @Override
128 public void updateApiConfig(K8sApiConfig config) {
129 String key = endpoint(config);
130 apiConfigStore.compute(key, (endpoint, existing) -> {
131 final String error = key + ERR_NOT_FOUND;
132 checkArgument(existing != null, error);
133 return config;
134 });
135 }
136
137 @Override
138 public K8sApiConfig removeApiConfig(String endpoint) {
139 Versioned<K8sApiConfig> apiConfig = apiConfigStore.remove(endpoint);
140 if (apiConfig == null) {
141 final String error = endpoint + ERR_NOT_FOUND;
142 throw new IllegalArgumentException(error);
143 }
144 return apiConfig.value();
145 }
146
147 @Override
148 public K8sApiConfig removeApiConfig(K8sApiConfig.Scheme scheme,
149 IpAddress ipAddress, int port) {
150 String key = endpoint(scheme, ipAddress, port);
151 return removeApiConfig(key);
152 }
153
154 @Override
155 public Set<K8sApiConfig> apiConfigs() {
156 return ImmutableSet.copyOf(apiConfigStore.asJavaMap().values());
157 }
158
159 @Override
160 public K8sApiConfig apiConfig(String endpoint) {
161 return apiConfigStore.asJavaMap().get(endpoint);
162 }
163
164 @Override
165 public K8sApiConfig apiConfig(K8sApiConfig.Scheme scheme,
166 IpAddress ipAddress, int port) {
167 String key = endpoint(scheme, ipAddress, port);
168 return apiConfig(key);
169 }
170
171 private class K8sApiConfigMapListener
172 implements MapEventListener<String, K8sApiConfig> {
173
174 @Override
175 public void event(MapEvent<String, K8sApiConfig> event) {
176 switch (event.type()) {
177 case INSERT:
178 log.debug("Kubernetes API config created {}", event.newValue());
179 eventExecutor.execute(() ->
180 notifyDelegate(new K8sApiConfigEvent(
181 K8S_API_CONFIG_CREATED, event.newValue().value()
182 )));
183 break;
184 case UPDATE:
185 log.debug("Kubernetes API config updated {}", event.newValue());
186 eventExecutor.execute(() ->
187 notifyDelegate(new K8sApiConfigEvent(
188 K8S_API_CONFIG_UPDATED, event.newValue().value()
189 )));
190 break;
191 case REMOVE:
192 log.debug("Kubernetes API config removed {}", event.oldValue());
193 eventExecutor.execute(() ->
194 notifyDelegate(new K8sApiConfigEvent(
195 K8S_API_CONFIG_REMOVED, event.oldValue().value()
196 )));
197 break;
198 default:
199 // do nothing
200 break;
201 }
202 }
203 }
204}