blob: fd267637bf1c4af8c156b111afa060e2598a9067 [file] [log] [blame]
Jian Li58b33982020-07-01 19:07:02 +09001/*
2 * Copyright 2020-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.k8snode.impl;
17
18import com.google.common.collect.ImmutableSet;
19import org.onlab.packet.IpAddress;
20import org.onlab.util.Tools;
21import org.onosproject.cluster.ClusterService;
22import org.onosproject.cluster.LeadershipService;
23import org.onosproject.core.ApplicationId;
24import org.onosproject.core.CoreService;
25import org.onosproject.event.ListenerRegistry;
26import org.onosproject.k8snode.api.K8sHost;
27import org.onosproject.k8snode.api.K8sHostAdminService;
28import org.onosproject.k8snode.api.K8sHostEvent;
29import org.onosproject.k8snode.api.K8sHostListener;
30import org.onosproject.k8snode.api.K8sHostService;
31import org.onosproject.k8snode.api.K8sHostStore;
32import org.onosproject.k8snode.api.K8sHostStoreDelegate;
33import org.onosproject.net.device.DeviceService;
34import org.onosproject.store.service.StorageService;
35import org.osgi.service.component.ComponentContext;
36import org.osgi.service.component.annotations.Activate;
37import org.osgi.service.component.annotations.Component;
38import org.osgi.service.component.annotations.Deactivate;
39import org.osgi.service.component.annotations.Modified;
40import org.osgi.service.component.annotations.Reference;
41import org.osgi.service.component.annotations.ReferenceCardinality;
42import org.slf4j.Logger;
43
44import java.util.Dictionary;
45import java.util.Objects;
46import java.util.Set;
47import java.util.concurrent.ExecutorService;
48import java.util.stream.Collectors;
49
50import static com.google.common.base.Preconditions.checkArgument;
51import static com.google.common.base.Preconditions.checkNotNull;
52import static java.util.concurrent.Executors.newSingleThreadExecutor;
53import static org.onlab.util.Tools.groupedThreads;
54import static org.onosproject.k8snode.api.K8sHostState.COMPLETE;
55import static org.onosproject.k8snode.impl.OsgiPropertyConstants.OVSDB_PORT;
56import static org.onosproject.k8snode.impl.OsgiPropertyConstants.OVSDB_PORT_NUM_DEFAULT;
57import static org.slf4j.LoggerFactory.getLogger;
58
59/**
60 * Service administering the inventory of kubernetes hosts.
61 */
62@Component(
63 immediate = true,
64 service = {K8sHostService.class, K8sHostAdminService.class},
65 property = {
66 OVSDB_PORT + ":Integer=" + OVSDB_PORT_NUM_DEFAULT
67 }
68)
69public class K8sHostManager
70 extends ListenerRegistry<K8sHostEvent, K8sHostListener>
71 implements K8sHostService, K8sHostAdminService {
72
73 private final Logger log = getLogger(getClass());
74
75 private static final String MSG_HOST = "Kubernetes host %s %s";
76 private static final String MSG_CREATED = "created";
77 private static final String MSG_UPDATED = "updated";
78 private static final String MSG_REMOVED = "removed";
79
80 private static final String ERR_NULL_HOST = "Kubernetes host cannot be null";
81 private static final String ERR_NULL_HOST_IP = "Kubernetes host IP cannot be null";
82
83 @Reference(cardinality = ReferenceCardinality.MANDATORY)
84 protected K8sHostStore hostStore;
85
86 @Reference(cardinality = ReferenceCardinality.MANDATORY)
87 protected CoreService coreService;
88
89 @Reference(cardinality = ReferenceCardinality.MANDATORY)
90 protected ClusterService clusterService;
91
92 @Reference(cardinality = ReferenceCardinality.MANDATORY)
93 protected LeadershipService leadershipService;
94
95 @Reference(cardinality = ReferenceCardinality.MANDATORY)
96 protected StorageService storageService;
97
98 @Reference(cardinality = ReferenceCardinality.MANDATORY)
99 protected DeviceService deviceService;
100
101 /** OVSDB server listen port. */
102 private int ovsdbPortNum = OVSDB_PORT_NUM_DEFAULT;
103
104 private final ExecutorService eventExecutor = newSingleThreadExecutor(
105 groupedThreads(this.getClass().getSimpleName(), "event-handler", log));
106
107 private final K8sHostStoreDelegate delegate = new K8sHostManager.InternalHostStoreDelegate();
108
109 private ApplicationId appId;
110
111 @Activate
112 protected void activate() {
113 appId = coreService.registerApplication(APP_ID);
114 hostStore.setDelegate(delegate);
115
116 leadershipService.runForLeadership(appId.name());
117 log.info("Started");
118 }
119
120 @Deactivate
121 protected void deactivate() {
122 hostStore.unsetDelegate(delegate);
123
124 leadershipService.withdraw(appId.name());
125 eventExecutor.shutdown();
126
127 log.info("Stopped");
128 }
129
130 @Modified
131 protected void modified(ComponentContext context) {
132 Dictionary<?, ?> properties = context.getProperties();
133 int updatedOvsdbPort = Tools.getIntegerProperty(properties, OVSDB_PORT);
134 if (!Objects.equals(updatedOvsdbPort, ovsdbPortNum)) {
135 ovsdbPortNum = updatedOvsdbPort;
136 }
137
138 log.info("Modified");
139 }
140
141 @Override
142 public void createHost(K8sHost host) {
143 checkNotNull(host, ERR_NULL_HOST);
144
145 hostStore.createHost(host);
146
147 log.info(String.format(MSG_HOST, host.hostIp().toString(), MSG_CREATED));
148 }
149
150 @Override
151 public void updateHost(K8sHost host) {
152 checkNotNull(host, ERR_NULL_HOST);
153
154 hostStore.updateHost(host);
155
156 log.info(String.format(MSG_HOST, host.hostIp().toString(), MSG_UPDATED));
157 }
158
159 @Override
160 public K8sHost removeHost(IpAddress hostIp) {
161 checkArgument(hostIp != null, ERR_NULL_HOST_IP);
162
163 K8sHost host = hostStore.removeHost(hostIp);
164 log.info(String.format(MSG_HOST, hostIp.toString(), MSG_REMOVED));
165
166 return host;
167 }
168
169 @Override
170 public Set<K8sHost> hosts() {
171 return hostStore.hosts();
172 }
173
174 @Override
175 public Set<K8sHost> completeHosts() {
176 Set<K8sHost> hosts = hostStore.hosts().stream()
177 .filter(h -> Objects.equals(h.state(), COMPLETE))
178 .collect(Collectors.toSet());
179 return ImmutableSet.copyOf(hosts);
180 }
181
182 @Override
183 public K8sHost host(IpAddress hostIp) {
184 return hostStore.hosts().stream()
185 .filter(h -> Objects.equals(h.hostIp(), hostIp))
186 .findFirst().orElse(null);
187 }
188
189 private class InternalHostStoreDelegate implements K8sHostStoreDelegate {
190
191 @Override
192 public void notify(K8sHostEvent event) {
193 if (event != null) {
194 log.trace("send kubernetes host event {}", event);
195 process(event);
196 }
197 }
198 }
199}