blob: 6db036595a581d25dc47aa92f518bf68bec3f5cb [file] [log] [blame]
Madan Jampaniad3c5262016-01-20 00:50:17 -08001/*
Brian O'Connor5ab426f2016-04-09 01:19:45 -07002 * Copyright 2016-present Open Networking Laboratory
Madan Jampaniad3c5262016-01-20 00:50:17 -08003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.cluster.impl;
17
Madan Jampani898fcca2016-02-26 12:42:08 -080018import static org.onlab.util.Tools.groupedThreads;
Madan Jampaniad3c5262016-01-20 00:50:17 -080019import static org.slf4j.LoggerFactory.getLogger;
20
21import java.io.File;
Madan Jampanif172d402016-03-04 00:56:38 -080022import java.io.FileInputStream;
Madan Jampaniad3c5262016-01-20 00:50:17 -080023import java.io.IOException;
Jon Halldb2cf752016-06-28 16:24:45 -070024import java.io.InputStream;
Madan Jampanif172d402016-03-04 00:56:38 -080025import java.net.URL;
26import java.net.URLConnection;
Madan Jampaniad3c5262016-01-20 00:50:17 -080027import java.util.Set;
Madan Jampanif172d402016-03-04 00:56:38 -080028import java.util.concurrent.ScheduledExecutorService;
29import java.util.concurrent.TimeUnit;
Madan Jampaniad3c5262016-01-20 00:50:17 -080030import java.util.concurrent.atomic.AtomicReference;
31
32import org.apache.felix.scr.annotations.Activate;
33import org.apache.felix.scr.annotations.Component;
34import org.apache.felix.scr.annotations.Deactivate;
35import org.apache.felix.scr.annotations.Reference;
36import org.apache.felix.scr.annotations.ReferenceCardinality;
37import org.onlab.packet.IpAddress;
38import org.onosproject.cluster.ClusterMetadata;
39import org.onosproject.cluster.ClusterMetadataProvider;
40import org.onosproject.cluster.ClusterMetadataProviderRegistry;
41import org.onosproject.cluster.ClusterMetadataProviderService;
42import org.onosproject.cluster.ControllerNode;
43import org.onosproject.cluster.DefaultControllerNode;
44import org.onosproject.cluster.DefaultPartition;
45import org.onosproject.cluster.NodeId;
46import org.onosproject.cluster.Partition;
47import org.onosproject.cluster.PartitionId;
48import org.onosproject.net.provider.ProviderId;
49import org.onosproject.store.service.Versioned;
50import org.slf4j.Logger;
51
52import com.fasterxml.jackson.core.JsonGenerator;
53import com.fasterxml.jackson.core.JsonParser;
54import com.fasterxml.jackson.core.JsonProcessingException;
55import com.fasterxml.jackson.databind.DeserializationContext;
56import com.fasterxml.jackson.databind.JsonDeserializer;
57import com.fasterxml.jackson.databind.JsonNode;
58import com.fasterxml.jackson.databind.JsonSerializer;
59import com.fasterxml.jackson.databind.ObjectMapper;
60import com.fasterxml.jackson.databind.SerializerProvider;
61import com.fasterxml.jackson.databind.module.SimpleModule;
62import com.google.common.base.Throwables;
63import com.google.common.collect.Sets;
64import com.google.common.io.Files;
65
66import static com.google.common.base.Preconditions.checkState;
Yuta HIGUCHI1624df12016-07-21 16:54:33 -070067import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
Madan Jampaniad3c5262016-01-20 00:50:17 -080068
69/**
70 * Provider of {@link ClusterMetadata cluster metadata} sourced from a local config file.
71 */
72@Component(immediate = true)
73public class ConfigFileBasedClusterMetadataProvider implements ClusterMetadataProvider {
74
75 private final Logger log = getLogger(getClass());
76
77 // constants for filed names (used in serialization)
78 private static final String ID = "id";
79 private static final String PORT = "port";
80 private static final String IP = "ip";
81
Madan Jampani898fcca2016-02-26 12:42:08 -080082 private static final String CONFIG_DIR = "../config";
83 private static final String CONFIG_FILE_NAME = "cluster.json";
84 private static final File CONFIG_FILE = new File(CONFIG_DIR, CONFIG_FILE_NAME);
Madan Jampaniad3c5262016-01-20 00:50:17 -080085
86 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
87 protected ClusterMetadataProviderRegistry providerRegistry;
88
Madan Jampanif172d402016-03-04 00:56:38 -080089 private static final ProviderId PROVIDER_ID = new ProviderId("file", "none");
Madan Jampani898fcca2016-02-26 12:42:08 -080090 private final AtomicReference<Versioned<ClusterMetadata>> cachedMetadata = new AtomicReference<>();
Madan Jampanif172d402016-03-04 00:56:38 -080091 private final ScheduledExecutorService configFileChangeDetector =
Yuta HIGUCHI1624df12016-07-21 16:54:33 -070092 newSingleThreadScheduledExecutor(groupedThreads("onos/cluster/metadata/config-watcher", "", log));
Madan Jampaniad3c5262016-01-20 00:50:17 -080093
Madan Jampanif172d402016-03-04 00:56:38 -080094 private String metadataUrl;
Madan Jampaniad3c5262016-01-20 00:50:17 -080095 private ObjectMapper mapper;
96 private ClusterMetadataProviderService providerService;
97
98 @Activate
99 public void activate() {
100 mapper = new ObjectMapper();
101 SimpleModule module = new SimpleModule();
102 module.addSerializer(NodeId.class, new NodeIdSerializer());
103 module.addDeserializer(NodeId.class, new NodeIdDeserializer());
104 module.addSerializer(ControllerNode.class, new ControllerNodeSerializer());
105 module.addDeserializer(ControllerNode.class, new ControllerNodeDeserializer());
106 module.addDeserializer(Partition.class, new PartitionDeserializer());
107 module.addSerializer(PartitionId.class, new PartitionIdSerializer());
108 module.addDeserializer(PartitionId.class, new PartitionIdDeserializer());
109 mapper.registerModule(module);
110 providerService = providerRegistry.register(this);
Madan Jampanif172d402016-03-04 00:56:38 -0800111 metadataUrl = System.getProperty("onos.cluster.metadata.uri", "file://" + CONFIG_DIR + "/" + CONFIG_FILE);
112 configFileChangeDetector.scheduleWithFixedDelay(() -> watchUrl(metadataUrl), 100, 500, TimeUnit.MILLISECONDS);
Madan Jampaniad3c5262016-01-20 00:50:17 -0800113 log.info("Started");
114 }
115
116 @Deactivate
117 public void deactivate() {
Madan Jampani898fcca2016-02-26 12:42:08 -0800118 configFileChangeDetector.shutdown();
Madan Jampaniad3c5262016-01-20 00:50:17 -0800119 providerRegistry.unregister(this);
120 log.info("Stopped");
121 }
122
123 @Override
124 public ProviderId id() {
125 return PROVIDER_ID;
126 }
127
128 @Override
129 public Versioned<ClusterMetadata> getClusterMetadata() {
130 checkState(isAvailable());
131 synchronized (this) {
132 if (cachedMetadata.get() == null) {
Madan Jampanif172d402016-03-04 00:56:38 -0800133 cachedMetadata.set(fetchMetadata(metadataUrl));
Madan Jampaniad3c5262016-01-20 00:50:17 -0800134 }
135 return cachedMetadata.get();
136 }
137 }
138
139 @Override
140 public void setClusterMetadata(ClusterMetadata metadata) {
141 try {
142 Files.createParentDirs(CONFIG_FILE);
143 mapper.writeValue(CONFIG_FILE, metadata);
144 providerService.clusterMetadataChanged(new Versioned<>(metadata, CONFIG_FILE.lastModified()));
145 } catch (IOException e) {
146 Throwables.propagate(e);
147 }
148 }
149
150 @Override
151 public void addActivePartitionMember(PartitionId partitionId, NodeId nodeId) {
152 throw new UnsupportedOperationException();
153 }
154
155 @Override
156 public void removeActivePartitionMember(PartitionId partitionId, NodeId nodeId) {
157 throw new UnsupportedOperationException();
158 }
159
160 @Override
161 public Set<NodeId> getActivePartitionMembers(PartitionId partitionId) {
162 throw new UnsupportedOperationException();
163 }
164
165 @Override
166 public boolean isAvailable() {
Madan Jampanif172d402016-03-04 00:56:38 -0800167 try {
168 URL url = new URL(metadataUrl);
169 if (url.getProtocol().equals("file")) {
170 File file = new File(metadataUrl.replaceFirst("file://", ""));
171 return file.exists();
172 } else if (url.getProtocol().equals("http")) {
Jon Halldb2cf752016-06-28 16:24:45 -0700173 try (InputStream file = url.openStream()) {
174 return true;
175 }
Madan Jampanif172d402016-03-04 00:56:38 -0800176 } else {
177 // Unsupported protocol
178 return false;
179 }
180 } catch (Exception e) {
Jon Halldb2cf752016-06-28 16:24:45 -0700181 log.warn("Exception accessing metadata file at {}:", metadataUrl, e);
Madan Jampanif172d402016-03-04 00:56:38 -0800182 return false;
183 }
Madan Jampaniad3c5262016-01-20 00:50:17 -0800184 }
185
Madan Jampanif172d402016-03-04 00:56:38 -0800186 private Versioned<ClusterMetadata> fetchMetadata(String metadataUrl) {
Madan Jampaniad3c5262016-01-20 00:50:17 -0800187 try {
Madan Jampanif172d402016-03-04 00:56:38 -0800188 URL url = new URL(metadataUrl);
189 ClusterMetadata metadata = null;
190 long version = 0;
191 if (url.getProtocol().equals("file")) {
192 File file = new File(metadataUrl.replaceFirst("file://", ""));
193 version = file.lastModified();
194 metadata = mapper.readValue(new FileInputStream(file), ClusterMetadata.class);
195 } else if (url.getProtocol().equals("http")) {
196 URLConnection conn = url.openConnection();
197 version = conn.getLastModified();
198 metadata = mapper.readValue(conn.getInputStream(), ClusterMetadata.class);
199 }
200 return new Versioned<>(new ClusterMetadata(PROVIDER_ID,
201 metadata.getName(),
202 Sets.newHashSet(metadata.getNodes()),
203 Sets.newHashSet(metadata.getPartitions())),
204 version);
Madan Jampaniad3c5262016-01-20 00:50:17 -0800205 } catch (IOException e) {
Madan Jampanif172d402016-03-04 00:56:38 -0800206 throw Throwables.propagate(e);
Madan Jampaniad3c5262016-01-20 00:50:17 -0800207 }
Madan Jampaniad3c5262016-01-20 00:50:17 -0800208 }
209
210 private static class PartitionDeserializer extends JsonDeserializer<Partition> {
211 @Override
212 public Partition deserialize(JsonParser jp, DeserializationContext ctxt)
213 throws IOException, JsonProcessingException {
214 return jp.readValueAs(DefaultPartition.class);
215 }
216 }
217
218 private static class PartitionIdSerializer extends JsonSerializer<PartitionId> {
219 @Override
220 public void serialize(PartitionId partitionId, JsonGenerator jgen, SerializerProvider provider)
221 throws IOException, JsonProcessingException {
222 jgen.writeNumber(partitionId.asInt());
223 }
224 }
225
226 private class PartitionIdDeserializer extends JsonDeserializer<PartitionId> {
227 @Override
228 public PartitionId deserialize(JsonParser jp, DeserializationContext ctxt)
229 throws IOException, JsonProcessingException {
230 JsonNode node = jp.getCodec().readTree(jp);
231 return new PartitionId(node.asInt());
232 }
233 }
234
235 private static class ControllerNodeSerializer extends JsonSerializer<ControllerNode> {
236 @Override
237 public void serialize(ControllerNode node, JsonGenerator jgen, SerializerProvider provider)
238 throws IOException, JsonProcessingException {
239 jgen.writeStartObject();
240 jgen.writeStringField(ID, node.id().toString());
241 jgen.writeStringField(IP, node.ip().toString());
242 jgen.writeNumberField(PORT, node.tcpPort());
243 jgen.writeEndObject();
244 }
245 }
246
247 private static class ControllerNodeDeserializer extends JsonDeserializer<ControllerNode> {
248 @Override
249 public ControllerNode deserialize(JsonParser jp, DeserializationContext ctxt)
250 throws IOException, JsonProcessingException {
251 JsonNode node = jp.getCodec().readTree(jp);
252 NodeId nodeId = new NodeId(node.get(ID).textValue());
253 IpAddress ip = IpAddress.valueOf(node.get(IP).textValue());
254 int port = node.get(PORT).asInt();
255 return new DefaultControllerNode(nodeId, ip, port);
256 }
257 }
258
259 private static class NodeIdSerializer extends JsonSerializer<NodeId> {
260 @Override
261 public void serialize(NodeId nodeId, JsonGenerator jgen, SerializerProvider provider)
262 throws IOException, JsonProcessingException {
263 jgen.writeString(nodeId.toString());
264 }
265 }
266
267 private class NodeIdDeserializer extends JsonDeserializer<NodeId> {
268 @Override
269 public NodeId deserialize(JsonParser jp, DeserializationContext ctxt)
270 throws IOException, JsonProcessingException {
271 JsonNode node = jp.getCodec().readTree(jp);
272 return new NodeId(node.asText());
273 }
274 }
Madan Jampani898fcca2016-02-26 12:42:08 -0800275
276 /**
Madan Jampanif172d402016-03-04 00:56:38 -0800277 * Monitors the metadata url for any updates and notifies providerService accordingly.
Madan Jampani898fcca2016-02-26 12:42:08 -0800278 */
Madan Jampanif172d402016-03-04 00:56:38 -0800279 private void watchUrl(String metadataUrl) {
280 // TODO: We are merely polling the url.
281 // This can be easily addressed for files. For http urls we need to move to a push style protocol.
282 Versioned<ClusterMetadata> latestMetadata = fetchMetadata(metadataUrl);
283 if (cachedMetadata.get() != null && cachedMetadata.get().version() < latestMetadata.version()) {
284 cachedMetadata.set(latestMetadata);
285 providerService.clusterMetadataChanged(latestMetadata);
Madan Jampani898fcca2016-02-26 12:42:08 -0800286 }
287 }
Madan Jampaniad3c5262016-01-20 00:50:17 -0800288}