blob: 9142e243dde90247ccc85f5d416198f2075ce727 [file] [log] [blame]
Jian Li9b199162019-02-10 18:00:35 +09001/*
2 * Copyright 2019-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.k8snetworking.impl;
17
18import com.google.common.collect.ImmutableSet;
19import org.onlab.util.KryoNamespace;
20import org.onosproject.core.ApplicationId;
21import org.onosproject.core.CoreService;
22import org.onosproject.k8snetworking.api.DefaultK8sIpam;
23import org.onosproject.k8snetworking.api.K8sIpam;
24import org.onosproject.k8snetworking.api.K8sIpamEvent;
25import org.onosproject.k8snetworking.api.K8sIpamStore;
26import org.onosproject.k8snetworking.api.K8sIpamStoreDelegate;
27import org.onosproject.store.AbstractStore;
28import org.onosproject.store.serializers.KryoNamespaces;
29import org.onosproject.store.service.ConsistentMap;
30import org.onosproject.store.service.Serializer;
31import org.onosproject.store.service.StorageService;
32import org.onosproject.store.service.Versioned;
33import org.osgi.service.component.annotations.Activate;
34import org.osgi.service.component.annotations.Component;
35import org.osgi.service.component.annotations.Deactivate;
36import org.osgi.service.component.annotations.Reference;
37import org.osgi.service.component.annotations.ReferenceCardinality;
38import org.slf4j.Logger;
39
40import java.util.Collection;
41import java.util.Set;
42import java.util.concurrent.ExecutorService;
43import java.util.stream.Collectors;
44
45import static com.google.common.base.Preconditions.checkArgument;
46import static java.util.concurrent.Executors.newSingleThreadExecutor;
47import static org.onlab.util.Tools.groupedThreads;
48import static org.slf4j.LoggerFactory.getLogger;
49
50/**
51 * Implementation of kubernetes IP address management store using consistent map.
52 */
53@Component(immediate = true, service = K8sIpamStore.class)
54public class DistributedK8sIpamStore
55 extends AbstractStore<K8sIpamEvent, K8sIpamStoreDelegate>
56 implements K8sIpamStore {
57
58 private final Logger log = getLogger(getClass());
59
60 private static final String ERR_NOT_FOUND = " does not exist";
61 private static final String ERR_DUPLICATE = " already exists";
62 private static final String APP_ID = "org.onosproject.k8snetwork";
63
64 private static final KryoNamespace
65 SERIALIZER_K8S_IPAM = KryoNamespace.newBuilder()
66 .register(KryoNamespaces.API)
67 .register(K8sIpam.class)
68 .register(DefaultK8sIpam.class)
69 .register(Collection.class)
70 .build();
71
72 @Reference(cardinality = ReferenceCardinality.MANDATORY)
73 protected CoreService coreService;
74
75 @Reference(cardinality = ReferenceCardinality.MANDATORY)
76 protected StorageService storageService;
77
78 private final ExecutorService eventExecutor = newSingleThreadExecutor(
79 groupedThreads(this.getClass().getSimpleName(), "event-handler", log));
80
81 private ConsistentMap<String, K8sIpam> allocatedStore;
82 private ConsistentMap<String, K8sIpam> availableStore;
83
84 @Activate
85 protected void activate() {
86 ApplicationId appId = coreService.registerApplication(APP_ID);
87 allocatedStore = storageService.<String, K8sIpam>consistentMapBuilder()
88 .withSerializer(Serializer.using(SERIALIZER_K8S_IPAM))
89 .withName("k8s-ipam-allocated-store")
90 .withApplicationId(appId)
91 .build();
92 availableStore = storageService.<String, K8sIpam>consistentMapBuilder()
93 .withSerializer(Serializer.using(SERIALIZER_K8S_IPAM))
94 .withName("k8s-ipam-available-store")
95 .withApplicationId(appId)
96 .build();
97 log.info("Started");
98 }
99
100 @Deactivate
101 protected void deactivate() {
102 eventExecutor.shutdown();
103 log.info("Stopped");
104 }
105
106 @Override
107 public void createAllocatedIp(K8sIpam ipam) {
108 allocatedStore.compute(ipam.ipamId(), (ipamId, existing) -> {
109 final String error = ipam.ipamId() + ERR_DUPLICATE;
110 checkArgument(existing == null, error);
111 return ipam;
112 });
113 }
114
115 @Override
116 public void updateAllocatedIp(K8sIpam ipam) {
117 allocatedStore.compute(ipam.ipamId(), (ipamId, existing) -> {
118 final String error = ipam.ipamId() + ERR_NOT_FOUND;
119 checkArgument(existing != null, error);
120 return ipam;
121 });
122 }
123
124 @Override
125 public K8sIpam removeAllocatedIp(String ipamId) {
126 Versioned<K8sIpam> ipam = allocatedStore.remove(ipamId);
127 if (ipam == null) {
128 final String error = ipamId + ERR_NOT_FOUND;
129 throw new IllegalArgumentException(error);
130 }
131 return ipam.value();
132 }
133
134 @Override
135 public K8sIpam allocatedIp(String ipamId) {
136 return allocatedStore.asJavaMap().get(ipamId);
137 }
138
139 @Override
140 public Set<K8sIpam> allocatedIps() {
141 return ImmutableSet.copyOf(allocatedStore.asJavaMap().values());
142 }
143
144 @Override
145 public void createAvailableIp(K8sIpam ipam) {
146 availableStore.compute(ipam.ipamId(), (ipamId, existing) -> {
147 final String error = ipam.ipamId() + ERR_DUPLICATE;
148 checkArgument(existing == null, error);
149 return ipam;
150 });
151 }
152
153 @Override
154 public void updateAvailableIp(K8sIpam ipam) {
155 availableStore.compute(ipam.ipamId(), (ipamId, existing) -> {
156 final String error = ipam.ipamId() + ERR_NOT_FOUND;
157 checkArgument(existing != null, error);
158 return ipam;
159 });
160 }
161
162 @Override
163 public K8sIpam removeAvailableIp(String ipamId) {
164 Versioned<K8sIpam> ipam = availableStore.remove(ipamId);
165 if (ipam == null) {
166 final String error = ipamId + ERR_NOT_FOUND;
167 throw new IllegalArgumentException(error);
168 }
169 return ipam.value();
170 }
171
172 @Override
173 public K8sIpam availableIp(String ipamId) {
174 return availableStore.asJavaMap().get(ipamId);
175 }
176
177 @Override
178 public Set<K8sIpam> availableIps() {
179 return ImmutableSet.copyOf(availableStore.asJavaMap().values());
180 }
181
182 @Override
183 public void clear() {
184 allocatedStore.clear();
185 availableStore.clear();
186 }
187
188 @Override
189 public void clear(String networkId) {
190 Set<K8sIpam> ipams = allocatedStore.asJavaMap().values().stream()
191 .filter(i -> i.networkId().equals(networkId))
192 .collect(Collectors.toSet());
193 ipams.forEach(i -> allocatedStore.remove(i.ipamId()));
194 }
195}