blob: 5016811d0bfc9f1263a5030aeb2abf06acd7613e [file] [log] [blame]
Madan Jampaniad3c5262016-01-20 00:50:17 -08001/*
2 * Copyright 2015-2016 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.cluster.impl;
17
Madan Jampani898fcca2016-02-26 12:42:08 -080018import static org.onlab.util.Tools.groupedThreads;
Madan Jampaniad3c5262016-01-20 00:50:17 -080019import static org.slf4j.LoggerFactory.getLogger;
20
21import java.io.File;
22import java.io.IOException;
Madan Jampani898fcca2016-02-26 12:42:08 -080023import java.nio.file.FileSystems;
24import java.nio.file.Path;
25import java.nio.file.StandardWatchEventKinds;
26import java.nio.file.WatchEvent;
27import java.nio.file.WatchKey;
28import java.nio.file.WatchService;
Madan Jampaniad3c5262016-01-20 00:50:17 -080029import java.util.Set;
Madan Jampani898fcca2016-02-26 12:42:08 -080030import java.util.concurrent.ExecutorService;
31import java.util.concurrent.Executors;
Madan Jampaniad3c5262016-01-20 00:50:17 -080032import java.util.concurrent.atomic.AtomicReference;
33
34import org.apache.felix.scr.annotations.Activate;
35import org.apache.felix.scr.annotations.Component;
36import org.apache.felix.scr.annotations.Deactivate;
37import org.apache.felix.scr.annotations.Reference;
38import org.apache.felix.scr.annotations.ReferenceCardinality;
39import org.onlab.packet.IpAddress;
40import org.onosproject.cluster.ClusterMetadata;
41import org.onosproject.cluster.ClusterMetadataProvider;
42import org.onosproject.cluster.ClusterMetadataProviderRegistry;
43import org.onosproject.cluster.ClusterMetadataProviderService;
44import org.onosproject.cluster.ControllerNode;
45import org.onosproject.cluster.DefaultControllerNode;
46import org.onosproject.cluster.DefaultPartition;
47import org.onosproject.cluster.NodeId;
48import org.onosproject.cluster.Partition;
49import org.onosproject.cluster.PartitionId;
50import org.onosproject.net.provider.ProviderId;
51import org.onosproject.store.service.Versioned;
52import org.slf4j.Logger;
53
54import com.fasterxml.jackson.core.JsonGenerator;
55import com.fasterxml.jackson.core.JsonParser;
56import com.fasterxml.jackson.core.JsonProcessingException;
57import com.fasterxml.jackson.databind.DeserializationContext;
58import com.fasterxml.jackson.databind.JsonDeserializer;
59import com.fasterxml.jackson.databind.JsonNode;
60import com.fasterxml.jackson.databind.JsonSerializer;
61import com.fasterxml.jackson.databind.ObjectMapper;
62import com.fasterxml.jackson.databind.SerializerProvider;
63import com.fasterxml.jackson.databind.module.SimpleModule;
64import com.google.common.base.Throwables;
65import com.google.common.collect.Sets;
66import com.google.common.io.Files;
67
68import static com.google.common.base.Preconditions.checkState;
69
70/**
71 * Provider of {@link ClusterMetadata cluster metadata} sourced from a local config file.
72 */
73@Component(immediate = true)
74public class ConfigFileBasedClusterMetadataProvider implements ClusterMetadataProvider {
75
76 private final Logger log = getLogger(getClass());
77
78 // constants for filed names (used in serialization)
79 private static final String ID = "id";
80 private static final String PORT = "port";
81 private static final String IP = "ip";
82
Madan Jampani898fcca2016-02-26 12:42:08 -080083 private static final String CONFIG_DIR = "../config";
84 private static final String CONFIG_FILE_NAME = "cluster.json";
85 private static final File CONFIG_FILE = new File(CONFIG_DIR, CONFIG_FILE_NAME);
Madan Jampaniad3c5262016-01-20 00:50:17 -080086
87 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
88 protected ClusterMetadataProviderRegistry providerRegistry;
89
90 private static final ProviderId PROVIDER_ID = new ProviderId("config", "none");
Madan Jampani898fcca2016-02-26 12:42:08 -080091 private final AtomicReference<Versioned<ClusterMetadata>> cachedMetadata = new AtomicReference<>();
92 private final ExecutorService configFileChangeDetector =
93 Executors.newSingleThreadExecutor(groupedThreads("onos/cluster/metadata/config-watcher", ""));
Madan Jampaniad3c5262016-01-20 00:50:17 -080094
95 private ObjectMapper mapper;
96 private ClusterMetadataProviderService providerService;
97
98 @Activate
99 public void activate() {
100 mapper = new ObjectMapper();
101 SimpleModule module = new SimpleModule();
102 module.addSerializer(NodeId.class, new NodeIdSerializer());
103 module.addDeserializer(NodeId.class, new NodeIdDeserializer());
104 module.addSerializer(ControllerNode.class, new ControllerNodeSerializer());
105 module.addDeserializer(ControllerNode.class, new ControllerNodeDeserializer());
106 module.addDeserializer(Partition.class, new PartitionDeserializer());
107 module.addSerializer(PartitionId.class, new PartitionIdSerializer());
108 module.addDeserializer(PartitionId.class, new PartitionIdDeserializer());
109 mapper.registerModule(module);
110 providerService = providerRegistry.register(this);
Madan Jampani898fcca2016-02-26 12:42:08 -0800111 configFileChangeDetector.execute(() -> {
112 try {
113 watchConfigFile();
114 } catch (IOException e) {
115 log.warn("Failure in setting up a watch for config "
116 + "file updates. updates to {} will be ignored", CONFIG_FILE, e);
117 }
118 });
Madan Jampaniad3c5262016-01-20 00:50:17 -0800119 log.info("Started");
120 }
121
122 @Deactivate
123 public void deactivate() {
Madan Jampani898fcca2016-02-26 12:42:08 -0800124 configFileChangeDetector.shutdown();
Madan Jampaniad3c5262016-01-20 00:50:17 -0800125 providerRegistry.unregister(this);
126 log.info("Stopped");
127 }
128
129 @Override
130 public ProviderId id() {
131 return PROVIDER_ID;
132 }
133
134 @Override
135 public Versioned<ClusterMetadata> getClusterMetadata() {
136 checkState(isAvailable());
137 synchronized (this) {
138 if (cachedMetadata.get() == null) {
Madan Jampani898fcca2016-02-26 12:42:08 -0800139 cachedMetadata.set(fetchMetadata());
Madan Jampaniad3c5262016-01-20 00:50:17 -0800140 }
141 return cachedMetadata.get();
142 }
143 }
144
145 @Override
146 public void setClusterMetadata(ClusterMetadata metadata) {
147 try {
148 Files.createParentDirs(CONFIG_FILE);
149 mapper.writeValue(CONFIG_FILE, metadata);
150 providerService.clusterMetadataChanged(new Versioned<>(metadata, CONFIG_FILE.lastModified()));
151 } catch (IOException e) {
152 Throwables.propagate(e);
153 }
154 }
155
156 @Override
157 public void addActivePartitionMember(PartitionId partitionId, NodeId nodeId) {
158 throw new UnsupportedOperationException();
159 }
160
161 @Override
162 public void removeActivePartitionMember(PartitionId partitionId, NodeId nodeId) {
163 throw new UnsupportedOperationException();
164 }
165
166 @Override
167 public Set<NodeId> getActivePartitionMembers(PartitionId partitionId) {
168 throw new UnsupportedOperationException();
169 }
170
171 @Override
172 public boolean isAvailable() {
173 return CONFIG_FILE.exists();
174 }
175
Madan Jampani898fcca2016-02-26 12:42:08 -0800176 private Versioned<ClusterMetadata> fetchMetadata() {
Madan Jampaniad3c5262016-01-20 00:50:17 -0800177 ClusterMetadata metadata = null;
178 long version = 0;
179 try {
180 metadata = mapper.readValue(CONFIG_FILE, ClusterMetadata.class);
Madan Jampani898fcca2016-02-26 12:42:08 -0800181 version = CONFIG_FILE.lastModified();
Madan Jampaniad3c5262016-01-20 00:50:17 -0800182 } catch (IOException e) {
183 Throwables.propagate(e);
184 }
Madan Jampani898fcca2016-02-26 12:42:08 -0800185 return new Versioned<>(new ClusterMetadata(PROVIDER_ID,
186 metadata.getName(),
187 Sets.newHashSet(metadata.getNodes()),
188 Sets.newHashSet(metadata.getPartitions())),
189 version);
Madan Jampaniad3c5262016-01-20 00:50:17 -0800190 }
191
192 private static class PartitionDeserializer extends JsonDeserializer<Partition> {
193 @Override
194 public Partition deserialize(JsonParser jp, DeserializationContext ctxt)
195 throws IOException, JsonProcessingException {
196 return jp.readValueAs(DefaultPartition.class);
197 }
198 }
199
200 private static class PartitionIdSerializer extends JsonSerializer<PartitionId> {
201 @Override
202 public void serialize(PartitionId partitionId, JsonGenerator jgen, SerializerProvider provider)
203 throws IOException, JsonProcessingException {
204 jgen.writeNumber(partitionId.asInt());
205 }
206 }
207
208 private class PartitionIdDeserializer extends JsonDeserializer<PartitionId> {
209 @Override
210 public PartitionId deserialize(JsonParser jp, DeserializationContext ctxt)
211 throws IOException, JsonProcessingException {
212 JsonNode node = jp.getCodec().readTree(jp);
213 return new PartitionId(node.asInt());
214 }
215 }
216
217 private static class ControllerNodeSerializer extends JsonSerializer<ControllerNode> {
218 @Override
219 public void serialize(ControllerNode node, JsonGenerator jgen, SerializerProvider provider)
220 throws IOException, JsonProcessingException {
221 jgen.writeStartObject();
222 jgen.writeStringField(ID, node.id().toString());
223 jgen.writeStringField(IP, node.ip().toString());
224 jgen.writeNumberField(PORT, node.tcpPort());
225 jgen.writeEndObject();
226 }
227 }
228
229 private static class ControllerNodeDeserializer extends JsonDeserializer<ControllerNode> {
230 @Override
231 public ControllerNode deserialize(JsonParser jp, DeserializationContext ctxt)
232 throws IOException, JsonProcessingException {
233 JsonNode node = jp.getCodec().readTree(jp);
234 NodeId nodeId = new NodeId(node.get(ID).textValue());
235 IpAddress ip = IpAddress.valueOf(node.get(IP).textValue());
236 int port = node.get(PORT).asInt();
237 return new DefaultControllerNode(nodeId, ip, port);
238 }
239 }
240
241 private static class NodeIdSerializer extends JsonSerializer<NodeId> {
242 @Override
243 public void serialize(NodeId nodeId, JsonGenerator jgen, SerializerProvider provider)
244 throws IOException, JsonProcessingException {
245 jgen.writeString(nodeId.toString());
246 }
247 }
248
249 private class NodeIdDeserializer extends JsonDeserializer<NodeId> {
250 @Override
251 public NodeId deserialize(JsonParser jp, DeserializationContext ctxt)
252 throws IOException, JsonProcessingException {
253 JsonNode node = jp.getCodec().readTree(jp);
254 return new NodeId(node.asText());
255 }
256 }
Madan Jampani898fcca2016-02-26 12:42:08 -0800257
258 /**
259 * Monitors the config file for any updates and notifies providerService accordingly.
260 * @throws IOException
261 */
262 private void watchConfigFile() throws IOException {
263 WatchService watcher = FileSystems.getDefault().newWatchService();
264 Path configFilePath = FileSystems.getDefault().getPath(CONFIG_DIR);
265 configFilePath.register(watcher, StandardWatchEventKinds.ENTRY_MODIFY);
266 while (true) {
267 try {
268 final WatchKey watchKey = watcher.take();
269 for (WatchEvent<?> event : watchKey.pollEvents()) {
270 final Path changed = (Path) event.context();
271 log.info("{} was updated", changed);
272 // TODO: Fix concurrency issues
273 Versioned<ClusterMetadata> latestMetadata = fetchMetadata();
274 cachedMetadata.set(latestMetadata);
275 providerService.clusterMetadataChanged(latestMetadata);
276 }
277 if (!watchKey.reset()) {
278 log.debug("WatchKey has been unregistered");
279 break;
280 }
281 } catch (InterruptedException e) {
282 Thread.currentThread().interrupt();
283 break;
284 }
285 }
286 }
Madan Jampaniad3c5262016-01-20 00:50:17 -0800287}