blob: 1747ada64a4b73d4312b0ee4ed2b8e9d6625cd6b [file] [log] [blame]
Jian Lie2a04ce2020-07-01 19:07:02 +09001/*
2 * Copyright 2020-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.k8snode.impl;
17
18import com.google.common.collect.ImmutableSet;
19import org.onlab.packet.IpAddress;
20import org.onlab.util.Tools;
21import org.onosproject.cluster.ClusterService;
22import org.onosproject.cluster.LeadershipService;
23import org.onosproject.core.ApplicationId;
24import org.onosproject.core.CoreService;
25import org.onosproject.event.ListenerRegistry;
26import org.onosproject.k8snode.api.K8sHost;
27import org.onosproject.k8snode.api.K8sHostAdminService;
28import org.onosproject.k8snode.api.K8sHostEvent;
29import org.onosproject.k8snode.api.K8sHostListener;
30import org.onosproject.k8snode.api.K8sHostService;
31import org.onosproject.k8snode.api.K8sHostStore;
32import org.onosproject.k8snode.api.K8sHostStoreDelegate;
Jian Li077b07e2020-09-01 16:55:25 +090033import org.onosproject.net.DeviceId;
Jian Lie2a04ce2020-07-01 19:07:02 +090034import org.onosproject.net.device.DeviceService;
35import org.onosproject.store.service.StorageService;
36import org.osgi.service.component.ComponentContext;
37import org.osgi.service.component.annotations.Activate;
38import org.osgi.service.component.annotations.Component;
39import org.osgi.service.component.annotations.Deactivate;
40import org.osgi.service.component.annotations.Modified;
41import org.osgi.service.component.annotations.Reference;
42import org.osgi.service.component.annotations.ReferenceCardinality;
43import org.slf4j.Logger;
44
45import java.util.Dictionary;
46import java.util.Objects;
47import java.util.Set;
48import java.util.concurrent.ExecutorService;
49import java.util.stream.Collectors;
50
51import static com.google.common.base.Preconditions.checkArgument;
52import static com.google.common.base.Preconditions.checkNotNull;
53import static java.util.concurrent.Executors.newSingleThreadExecutor;
54import static org.onlab.util.Tools.groupedThreads;
55import static org.onosproject.k8snode.api.K8sHostState.COMPLETE;
56import static org.onosproject.k8snode.impl.OsgiPropertyConstants.OVSDB_PORT;
57import static org.onosproject.k8snode.impl.OsgiPropertyConstants.OVSDB_PORT_NUM_DEFAULT;
58import static org.slf4j.LoggerFactory.getLogger;
59
60/**
61 * Service administering the inventory of kubernetes hosts.
62 */
63@Component(
64 immediate = true,
65 service = {K8sHostService.class, K8sHostAdminService.class},
66 property = {
67 OVSDB_PORT + ":Integer=" + OVSDB_PORT_NUM_DEFAULT
68 }
69)
70public class K8sHostManager
71 extends ListenerRegistry<K8sHostEvent, K8sHostListener>
72 implements K8sHostService, K8sHostAdminService {
73
74 private final Logger log = getLogger(getClass());
75
76 private static final String MSG_HOST = "Kubernetes host %s %s";
77 private static final String MSG_CREATED = "created";
78 private static final String MSG_UPDATED = "updated";
79 private static final String MSG_REMOVED = "removed";
80
81 private static final String ERR_NULL_HOST = "Kubernetes host cannot be null";
82 private static final String ERR_NULL_HOST_IP = "Kubernetes host IP cannot be null";
83
84 @Reference(cardinality = ReferenceCardinality.MANDATORY)
85 protected K8sHostStore hostStore;
86
87 @Reference(cardinality = ReferenceCardinality.MANDATORY)
88 protected CoreService coreService;
89
90 @Reference(cardinality = ReferenceCardinality.MANDATORY)
91 protected ClusterService clusterService;
92
93 @Reference(cardinality = ReferenceCardinality.MANDATORY)
94 protected LeadershipService leadershipService;
95
96 @Reference(cardinality = ReferenceCardinality.MANDATORY)
97 protected StorageService storageService;
98
99 @Reference(cardinality = ReferenceCardinality.MANDATORY)
100 protected DeviceService deviceService;
101
102 /** OVSDB server listen port. */
103 private int ovsdbPortNum = OVSDB_PORT_NUM_DEFAULT;
104
105 private final ExecutorService eventExecutor = newSingleThreadExecutor(
106 groupedThreads(this.getClass().getSimpleName(), "event-handler", log));
107
108 private final K8sHostStoreDelegate delegate = new K8sHostManager.InternalHostStoreDelegate();
109
110 private ApplicationId appId;
111
112 @Activate
113 protected void activate() {
114 appId = coreService.registerApplication(APP_ID);
115 hostStore.setDelegate(delegate);
116
117 leadershipService.runForLeadership(appId.name());
118 log.info("Started");
119 }
120
121 @Deactivate
122 protected void deactivate() {
123 hostStore.unsetDelegate(delegate);
124
125 leadershipService.withdraw(appId.name());
126 eventExecutor.shutdown();
127
128 log.info("Stopped");
129 }
130
131 @Modified
132 protected void modified(ComponentContext context) {
133 Dictionary<?, ?> properties = context.getProperties();
134 int updatedOvsdbPort = Tools.getIntegerProperty(properties, OVSDB_PORT);
135 if (!Objects.equals(updatedOvsdbPort, ovsdbPortNum)) {
136 ovsdbPortNum = updatedOvsdbPort;
137 }
138
139 log.info("Modified");
140 }
141
142 @Override
143 public void createHost(K8sHost host) {
144 checkNotNull(host, ERR_NULL_HOST);
145
146 hostStore.createHost(host);
147
148 log.info(String.format(MSG_HOST, host.hostIp().toString(), MSG_CREATED));
149 }
150
151 @Override
152 public void updateHost(K8sHost host) {
153 checkNotNull(host, ERR_NULL_HOST);
154
155 hostStore.updateHost(host);
156
157 log.info(String.format(MSG_HOST, host.hostIp().toString(), MSG_UPDATED));
158 }
159
160 @Override
161 public K8sHost removeHost(IpAddress hostIp) {
162 checkArgument(hostIp != null, ERR_NULL_HOST_IP);
163
164 K8sHost host = hostStore.removeHost(hostIp);
165 log.info(String.format(MSG_HOST, hostIp.toString(), MSG_REMOVED));
166
167 return host;
168 }
169
170 @Override
171 public Set<K8sHost> hosts() {
172 return hostStore.hosts();
173 }
174
175 @Override
176 public Set<K8sHost> completeHosts() {
177 Set<K8sHost> hosts = hostStore.hosts().stream()
178 .filter(h -> Objects.equals(h.state(), COMPLETE))
179 .collect(Collectors.toSet());
180 return ImmutableSet.copyOf(hosts);
181 }
182
183 @Override
184 public K8sHost host(IpAddress hostIp) {
185 return hostStore.hosts().stream()
186 .filter(h -> Objects.equals(h.hostIp(), hostIp))
187 .findFirst().orElse(null);
188 }
189
Jian Li077b07e2020-09-01 16:55:25 +0900190 @Override
191 public K8sHost host(DeviceId deviceId) {
192 return hostStore.hosts().stream()
193 .filter(host -> Objects.equals(host.ovsdb(), deviceId))
194 .findFirst().orElse(null);
195 }
196
197 @Override
198 public K8sHost hostByTunBridge(DeviceId deviceId) {
199 for (K8sHost host : hostStore.hosts()) {
200 long cnt = host.tunBridges().stream().filter(
201 br -> br.dpid().equals(deviceId.toString())).count();
202 if (cnt > 0) {
203 return host;
204 }
205 }
206 return null;
207 }
208
Jian Lie2a04ce2020-07-01 19:07:02 +0900209 private class InternalHostStoreDelegate implements K8sHostStoreDelegate {
210
211 @Override
212 public void notify(K8sHostEvent event) {
213 if (event != null) {
214 log.trace("send kubernetes host event {}", event);
215 process(event);
216 }
217 }
218 }
219}