blob: 1c045f32823977e968cf5b39bd98b9b3f6e9a2d3 [file] [log] [blame]
Jian Lidaa7d6a2021-04-13 17:22:56 +09001/*
2 * Copyright 2021-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.kubevirtnetworking.impl;
17
18import com.google.common.collect.ImmutableSet;
Daniel Park05a94582021-05-12 10:57:02 +090019import org.onlab.packet.IpAddress;
Jian Lidaa7d6a2021-04-13 17:22:56 +090020import org.onlab.util.KryoNamespace;
21import org.onosproject.core.ApplicationId;
22import org.onosproject.core.CoreService;
23import org.onosproject.kubevirtnetworking.api.DefaultKubevirtLoadBalancer;
24import org.onosproject.kubevirtnetworking.api.DefaultKubevirtLoadBalancerRule;
25import org.onosproject.kubevirtnetworking.api.KubevirtLoadBalancer;
26import org.onosproject.kubevirtnetworking.api.KubevirtLoadBalancerEvent;
27import org.onosproject.kubevirtnetworking.api.KubevirtLoadBalancerRule;
28import org.onosproject.kubevirtnetworking.api.KubevirtLoadBalancerStore;
29import org.onosproject.kubevirtnetworking.api.KubevirtLoadBalancerStoreDelegate;
30import org.onosproject.store.AbstractStore;
31import org.onosproject.store.serializers.KryoNamespaces;
32import org.onosproject.store.service.ConsistentMap;
33import org.onosproject.store.service.MapEvent;
34import org.onosproject.store.service.MapEventListener;
35import org.onosproject.store.service.Serializer;
36import org.onosproject.store.service.StorageService;
37import org.onosproject.store.service.Versioned;
38import org.osgi.service.component.annotations.Activate;
39import org.osgi.service.component.annotations.Component;
40import org.osgi.service.component.annotations.Deactivate;
41import org.osgi.service.component.annotations.Reference;
42import org.osgi.service.component.annotations.ReferenceCardinality;
43import org.slf4j.Logger;
44
45import java.util.Collection;
Daniel Park05a94582021-05-12 10:57:02 +090046import java.util.HashSet;
Jian Lidaa7d6a2021-04-13 17:22:56 +090047import java.util.Set;
48import java.util.concurrent.ExecutorService;
49
50import static com.google.common.base.Preconditions.checkArgument;
51import static java.util.concurrent.Executors.newSingleThreadExecutor;
52import static org.onlab.util.Tools.groupedThreads;
53import static org.onosproject.kubevirtnetworking.api.KubevirtLoadBalancerEvent.Type.KUBEVIRT_LOAD_BALANCER_CREATED;
Daniel Park05a94582021-05-12 10:57:02 +090054import static org.onosproject.kubevirtnetworking.api.KubevirtLoadBalancerEvent.Type.KUBEVIRT_LOAD_BALANCER_MEMBER_ADDED;
55import static org.onosproject.kubevirtnetworking.api.KubevirtLoadBalancerEvent.Type.KUBEVIRT_LOAD_BALANCER_MEMBER_REMOVED;
Jian Lidaa7d6a2021-04-13 17:22:56 +090056import static org.onosproject.kubevirtnetworking.api.KubevirtLoadBalancerEvent.Type.KUBEVIRT_LOAD_BALANCER_REMOVED;
57import static org.onosproject.kubevirtnetworking.api.KubevirtLoadBalancerEvent.Type.KUBEVIRT_LOAD_BALANCER_UPDATED;
58import static org.slf4j.LoggerFactory.getLogger;
59
60/**
61 * Implementation of kubevirt load balancer store using consistent map.
62 */
63@Component(immediate = true, service = KubevirtLoadBalancerStore.class)
64public class DistributedKubevirtLoadBalancerStore
65 extends AbstractStore<KubevirtLoadBalancerEvent, KubevirtLoadBalancerStoreDelegate>
66 implements KubevirtLoadBalancerStore {
67
68 private final Logger log = getLogger(getClass());
69
70 private static final String ERR_NOT_FOUND = " does not exist";
71 private static final String ERR_DUPLICATE = " already exists";
72
73 private static final String APP_ID = "org.onosproject.kubevirtnetwork";
74
75 private static final KryoNamespace
76 SERIALIZER_KUBEVIRT_LOAD_BALANCER = KryoNamespace.newBuilder()
77 .register(KryoNamespaces.API)
78 .register(KubevirtLoadBalancer.class)
79 .register(DefaultKubevirtLoadBalancer.class)
80 .register(KubevirtLoadBalancerRule.class)
81 .register(DefaultKubevirtLoadBalancerRule.class)
82 .register(Collection.class)
83 .build();
84
85 @Reference(cardinality = ReferenceCardinality.MANDATORY)
86 protected CoreService coreService;
87
88 @Reference(cardinality = ReferenceCardinality.MANDATORY)
89 protected StorageService storageService;
90
91 private final ExecutorService eventExecutor = newSingleThreadExecutor(
92 groupedThreads(this.getClass().getSimpleName(), "event-handler", log));
93
94 private final MapEventListener<String, KubevirtLoadBalancer> loadBalancerMapListener =
95 new KubevirtLoadBalancerMapListener();
96
97 private ConsistentMap<String, KubevirtLoadBalancer> loadBalancerStore;
98
99 @Activate
100 protected void activate() {
101 ApplicationId appId = coreService.registerApplication(APP_ID);
102 loadBalancerStore = storageService.<String, KubevirtLoadBalancer>consistentMapBuilder()
103 .withSerializer(Serializer.using(SERIALIZER_KUBEVIRT_LOAD_BALANCER))
104 .withName("kubevirt-loadbalancerstore")
105 .withApplicationId(appId)
106 .build();
107 loadBalancerStore.addListener(loadBalancerMapListener);
108 log.info("Started");
109 }
110
111 @Deactivate
112 protected void deactivate() {
113 loadBalancerStore.removeListener(loadBalancerMapListener);
114 eventExecutor.shutdown();
115 log.info("Stopped");
116 }
117
118 @Override
119 public void createLoadBalancer(KubevirtLoadBalancer lb) {
120 loadBalancerStore.compute(lb.name(), (name, existing) -> {
121 final String error = lb.name() + ERR_DUPLICATE;
122 checkArgument(existing == null, error);
123 return lb;
124 });
125 }
126
127 @Override
128 public void updateLoadBalancer(KubevirtLoadBalancer lb) {
129 loadBalancerStore.compute(lb.name(), (name, existing) -> {
130 final String error = lb.name() + ERR_NOT_FOUND;
131 checkArgument(existing != null, error);
132 return lb;
133 });
134 }
135
136 @Override
137 public KubevirtLoadBalancer removeLoadBalancer(String name) {
138 Versioned<KubevirtLoadBalancer> lb = loadBalancerStore.remove(name);
139 if (lb == null) {
140 final String error = name + ERR_NOT_FOUND;
141 throw new IllegalArgumentException(error);
142 }
143 return lb.value();
144 }
145
146 @Override
147 public KubevirtLoadBalancer loadBalancer(String name) {
148 return loadBalancerStore.asJavaMap().get(name);
149 }
150
151 @Override
152 public Set<KubevirtLoadBalancer> loadBalancers() {
153 return ImmutableSet.copyOf(loadBalancerStore.asJavaMap().values());
154 }
155
156 @Override
157 public void clear() {
158 loadBalancerStore.clear();
159 }
160
161 private class KubevirtLoadBalancerMapListener
162 implements MapEventListener<String, KubevirtLoadBalancer> {
163
164 @Override
165 public void event(MapEvent<String, KubevirtLoadBalancer> event) {
166 switch (event.type()) {
167 case INSERT:
168 log.debug("Kubevirt load balancer created");
169 eventExecutor.execute(() ->
170 notifyDelegate(new KubevirtLoadBalancerEvent(
171 KUBEVIRT_LOAD_BALANCER_CREATED, event.newValue().value())));
172 break;
173 case UPDATE:
174 log.debug("Kubevirt load balancer updated");
Daniel Park05a94582021-05-12 10:57:02 +0900175 eventExecutor.execute(() -> processLoadBalancerMapUpdate(event));
Jian Lidaa7d6a2021-04-13 17:22:56 +0900176 break;
177 case REMOVE:
178 log.debug("Kubevirt load balancer removed");
179 eventExecutor.execute(() ->
180 notifyDelegate(new KubevirtLoadBalancerEvent(
181 KUBEVIRT_LOAD_BALANCER_REMOVED, event.oldValue().value())));
182 break;
183 default:
184 // do nothing
185 break;
186 }
187 }
Daniel Park05a94582021-05-12 10:57:02 +0900188
189 private void processLoadBalancerMapUpdate(MapEvent<String, KubevirtLoadBalancer> event) {
190 KubevirtLoadBalancer oldLb = event.oldValue().value();
191 KubevirtLoadBalancer newLb = event.newValue().value();
192
193 Set<IpAddress> added = new HashSet<>(newLb.members());
194 Set<IpAddress> oldSet = oldLb.members();
195
196 added.removeAll(oldSet);
197
198 if (added.size() > 0) {
199 notifyDelegate(new KubevirtLoadBalancerEvent(
200 KUBEVIRT_LOAD_BALANCER_MEMBER_ADDED,
201 newLb,
202 added
203 ));
204 }
205
206 Set<IpAddress> removed = new HashSet<>(oldLb.members());
207 Set<IpAddress> newSet = newLb.members();
208 removed.removeAll(newSet);
209
210 if (removed.size() > 0) {
211 notifyDelegate(new KubevirtLoadBalancerEvent(
212 KUBEVIRT_LOAD_BALANCER_MEMBER_REMOVED,
213 newLb,
214 removed
215 ));
216 }
217
218 notifyDelegate(new KubevirtLoadBalancerEvent(
219 KUBEVIRT_LOAD_BALANCER_UPDATED,
220 newLb,
221 oldLb
222 ));
223 }
Jian Lidaa7d6a2021-04-13 17:22:56 +0900224 }
225}