blob: a60354a85e8570f7057cd3589e7ec89960bcabba [file] [log] [blame]
Jian Li324d6dc2019-07-10 10:55:15 +09001/*
2 * Copyright 2019-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.k8snetworking.impl;
17
18import com.google.common.collect.ImmutableSet;
19import io.fabric8.kubernetes.api.model.Namespace;
20import io.fabric8.kubernetes.api.model.NamespaceSpec;
21import io.fabric8.kubernetes.api.model.NamespaceStatus;
22import io.fabric8.kubernetes.api.model.ObjectMeta;
23import org.onlab.util.KryoNamespace;
24import org.onosproject.core.ApplicationId;
25import org.onosproject.core.CoreService;
26import org.onosproject.k8snetworking.api.K8sNamespaceEvent;
27import org.onosproject.k8snetworking.api.K8sNamespaceStore;
28import org.onosproject.k8snetworking.api.K8sNamespaceStoreDelegate;
29import org.onosproject.store.AbstractStore;
30import org.onosproject.store.serializers.KryoNamespaces;
31import org.onosproject.store.service.ConsistentMap;
32import org.onosproject.store.service.MapEvent;
33import org.onosproject.store.service.MapEventListener;
34import org.onosproject.store.service.Serializer;
35import org.onosproject.store.service.StorageService;
36import org.onosproject.store.service.Versioned;
37import org.osgi.service.component.annotations.Activate;
38import org.osgi.service.component.annotations.Component;
39import org.osgi.service.component.annotations.Deactivate;
40import org.osgi.service.component.annotations.Reference;
41import org.osgi.service.component.annotations.ReferenceCardinality;
42import org.slf4j.Logger;
43
44import java.util.Set;
45import java.util.concurrent.ExecutorService;
46
47import static com.google.common.base.Preconditions.checkArgument;
48import static java.util.concurrent.Executors.newSingleThreadExecutor;
49import static org.onlab.util.Tools.groupedThreads;
50import static org.onosproject.k8snetworking.api.K8sNamespaceEvent.Type.K8S_NAMESPACE_CREATED;
51import static org.onosproject.k8snetworking.api.K8sNamespaceEvent.Type.K8S_NAMESPACE_REMOVED;
52import static org.onosproject.k8snetworking.api.K8sNamespaceEvent.Type.K8S_NAMESPACE_UPDATED;
53import static org.slf4j.LoggerFactory.getLogger;
54
55/**
56 * Implementation of kubernetes namespace store using consistent map.
57 */
58@Component(immediate = true, service = K8sNamespaceStore.class)
59public class DistributedK8sNamespaceStore
60 extends AbstractStore<K8sNamespaceEvent, K8sNamespaceStoreDelegate>
61 implements K8sNamespaceStore {
62
63 private final Logger log = getLogger(getClass());
64
65 private static final String ERR_NOT_FOUND = " does not exist";
66 private static final String ERR_DUPLICATE = " already exists";
67 private static final String APP_ID = "org.onosproject.k8snetwork";
68
69 private static final KryoNamespace
70 SERIALIZER_K8S_NAMESPACE = KryoNamespace.newBuilder()
71 .register(KryoNamespaces.API)
72 .register(Namespace.class)
73 .register(ObjectMeta.class)
74 .register(NamespaceSpec.class)
75 .register(NamespaceStatus.class)
76 .build();
77
78 @Reference(cardinality = ReferenceCardinality.MANDATORY)
79 protected CoreService coreService;
80
81 @Reference(cardinality = ReferenceCardinality.MANDATORY)
82 protected StorageService storageService;
83
84 private final ExecutorService eventExecutor = newSingleThreadExecutor(
85 groupedThreads(this.getClass().getSimpleName(), "event-handler", log));
86
87 private final MapEventListener<String, Namespace> namespaceMapListener =
88 new K8sNamespaceMapListener();
89
90 private ConsistentMap<String, Namespace> namespaceStore;
91
92 @Activate
93 protected void activate() {
94 ApplicationId appId = coreService.registerApplication(APP_ID);
95 namespaceStore = storageService.<String, Namespace>consistentMapBuilder()
96 .withSerializer(Serializer.using(SERIALIZER_K8S_NAMESPACE))
97 .withName("k8s-namespace-store")
98 .withApplicationId(appId)
99 .build();
100
101 namespaceStore.addListener(namespaceMapListener);
102 log.info("Started");
103 }
104
105 @Deactivate
106 protected void deactivate() {
107 namespaceStore.removeListener(namespaceMapListener);
108 eventExecutor.shutdown();
109 log.info("Stopped");
110 }
111
112 @Override
113 public void createNamespace(Namespace namespace) {
114 namespaceStore.compute(namespace.getMetadata().getUid(), (uid, existing) -> {
115 final String error = namespace.getMetadata().getUid() + ERR_DUPLICATE;
116 checkArgument(existing == null, error);
117 return namespace;
118 });
119 }
120
121 @Override
122 public void updateNamespace(Namespace namespace) {
123 namespaceStore.compute(namespace.getMetadata().getUid(), (uid, existing) -> {
124 final String error = namespace.getMetadata().getUid() + ERR_NOT_FOUND;
125 checkArgument(existing != null, error);
126 return namespace;
127 });
128 }
129
130 @Override
131 public Namespace removeNamespace(String uid) {
132 Versioned<Namespace> namespace = namespaceStore.remove(uid);
133 if (namespace == null) {
134 final String error = uid + ERR_NOT_FOUND;
135 throw new IllegalArgumentException(error);
136 }
137 return namespace.value();
138 }
139
140 @Override
141 public Namespace namespace(String uid) {
142 return namespaceStore.asJavaMap().get(uid);
143 }
144
145 @Override
146 public Set<Namespace> namespaces() {
147 return ImmutableSet.copyOf(namespaceStore.asJavaMap().values());
148 }
149
150 @Override
151 public void clear() {
152 namespaceStore.clear();
153 }
154
155 private class K8sNamespaceMapListener implements MapEventListener<String, Namespace> {
156
157 @Override
158 public void event(MapEvent<String, Namespace> event) {
159
160 switch (event.type()) {
161 case INSERT:
162 log.debug("Kubernetes namespace created {}", event.newValue());
163 eventExecutor.execute(() ->
164 notifyDelegate(new K8sNamespaceEvent(
165 K8S_NAMESPACE_CREATED, event.newValue().value())));
166 break;
167 case UPDATE:
168 log.debug("Kubernetes namespace updated {}", event.newValue());
169 eventExecutor.execute(() ->
170 notifyDelegate(new K8sNamespaceEvent(
171 K8S_NAMESPACE_UPDATED, event.newValue().value())));
172 break;
173 case REMOVE:
174 log.debug("Kubernetes namespace removed {}", event.oldValue());
175 eventExecutor.execute(() ->
176 notifyDelegate(new K8sNamespaceEvent(
177 K8S_NAMESPACE_REMOVED, event.oldValue().value())));
178 break;
179 default:
180 // do nothing
181 break;
182 }
183 }
184 }
185}