blob: 5225549bd6b0108b14928d7a8e66779a02494e49 [file] [log] [blame]
Madan Jampani84b6b402015-02-25 17:49:54 -08001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.mastership.impl;
17
18import static org.onlab.util.Tools.groupedThreads;
Ayaka Koshibe941f8602015-04-15 14:17:08 -070019import static org.onlab.util.Tools.futureGetOrElse;
Madan Jampani0f6ad142015-05-13 17:10:04 -070020import static org.onosproject.mastership.MastershipEvent.Type.BACKUPS_CHANGED;
Madan Jampani84b6b402015-02-25 17:49:54 -080021import static org.onosproject.mastership.MastershipEvent.Type.MASTER_CHANGED;
22import static org.slf4j.LoggerFactory.getLogger;
23import static com.google.common.base.Preconditions.checkArgument;
24
Madan Jampani84b6b402015-02-25 17:49:54 -080025import java.util.List;
26import java.util.Map;
27import java.util.Set;
Madan Jampanif7536ab2015-05-07 23:23:23 -070028import java.util.concurrent.CompletableFuture;
Madan Jampani84b6b402015-02-25 17:49:54 -080029import java.util.concurrent.ExecutorService;
30import java.util.concurrent.Executors;
Madan Jampanif7536ab2015-05-07 23:23:23 -070031import java.util.concurrent.ScheduledExecutorService;
32import java.util.concurrent.TimeUnit;
Madan Jampani84b6b402015-02-25 17:49:54 -080033import java.util.regex.Matcher;
34import java.util.regex.Pattern;
35import java.util.stream.Collectors;
36
37import org.apache.felix.scr.annotations.Activate;
38import org.apache.felix.scr.annotations.Component;
39import org.apache.felix.scr.annotations.Deactivate;
40import org.apache.felix.scr.annotations.Reference;
41import org.apache.felix.scr.annotations.ReferenceCardinality;
42import org.apache.felix.scr.annotations.Service;
43import org.onlab.util.KryoNamespace;
44import org.onosproject.cluster.ClusterService;
45import org.onosproject.cluster.Leadership;
46import org.onosproject.cluster.LeadershipEvent;
47import org.onosproject.cluster.LeadershipEventListener;
48import org.onosproject.cluster.LeadershipService;
49import org.onosproject.cluster.NodeId;
50import org.onosproject.cluster.RoleInfo;
51import org.onosproject.mastership.MastershipEvent;
52import org.onosproject.mastership.MastershipStore;
53import org.onosproject.mastership.MastershipStoreDelegate;
54import org.onosproject.mastership.MastershipTerm;
55import org.onosproject.net.DeviceId;
56import org.onosproject.net.MastershipRole;
57import org.onosproject.store.AbstractStore;
58import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
Madan Jampani84b6b402015-02-25 17:49:54 -080059import org.onosproject.store.cluster.messaging.MessageSubject;
60import org.onosproject.store.serializers.KryoNamespaces;
61import org.onosproject.store.serializers.KryoSerializer;
62import org.onosproject.store.serializers.StoreSerializer;
63import org.slf4j.Logger;
64
65import com.google.common.base.Objects;
66import com.google.common.collect.Lists;
67import com.google.common.collect.Maps;
68import com.google.common.collect.Sets;
69
70/**
71 * Implementation of the MastershipStore on top of Leadership Service.
72 */
Madan Jampani5756c352015-04-29 00:23:58 -070073@Component(immediate = true, enabled = true)
Madan Jampani84b6b402015-02-25 17:49:54 -080074@Service
75public class ConsistentDeviceMastershipStore
76 extends AbstractStore<MastershipEvent, MastershipStoreDelegate>
77 implements MastershipStore {
78
79 private final Logger log = getLogger(getClass());
80
81 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
82 protected LeadershipService leadershipService;
83
84 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
85 protected ClusterService clusterService;
86
87 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
88 protected ClusterCommunicationService clusterCommunicator;
89
90 private NodeId localNodeId;
91 private final Set<DeviceId> connectedDevices = Sets.newHashSet();
92
93 private static final MessageSubject ROLE_QUERY_SUBJECT =
94 new MessageSubject("mastership-store-device-role-query");
95 private static final MessageSubject ROLE_RELINQUISH_SUBJECT =
96 new MessageSubject("mastership-store-device-role-relinquish");
Madan Jampani9bd1f152015-04-30 23:33:35 -070097 private static final MessageSubject TRANSITION_FROM_MASTER_TO_STANDBY_SUBJECT =
Madan Jampani1af8e132015-04-30 16:41:18 -070098 new MessageSubject("mastership-store-device-mastership-relinquish");
Madan Jampani84b6b402015-02-25 17:49:54 -080099
100 private static final Pattern DEVICE_MASTERSHIP_TOPIC_PATTERN =
Madan Jampani5756c352015-04-29 00:23:58 -0700101 Pattern.compile("device:(.*)");
Madan Jampani84b6b402015-02-25 17:49:54 -0800102
Madan Jampani84b6b402015-02-25 17:49:54 -0800103 private ExecutorService messageHandlingExecutor;
Madan Jampanif7536ab2015-05-07 23:23:23 -0700104 private ScheduledExecutorService transferExecutor;
Madan Jampani84b6b402015-02-25 17:49:54 -0800105 private final LeadershipEventListener leadershipEventListener =
106 new InternalDeviceMastershipEventListener();
107
108 private static final String NODE_ID_NULL = "Node ID cannot be null";
Madan Jampanif7536ab2015-05-07 23:23:23 -0700109 private static final String DEVICE_ID_NULL = "Device ID cannot be null";
110 private static final int WAIT_BEFORE_MASTERSHIP_HANDOFF_MILLIS = 3000;
Madan Jampani84b6b402015-02-25 17:49:54 -0800111
112 public static final StoreSerializer SERIALIZER = new KryoSerializer() {
113 @Override
114 protected void setupKryoPool() {
115 serializerPool = KryoNamespace.newBuilder()
116 .register(KryoNamespaces.API)
117 .register(MastershipRole.class)
118 .register(MastershipEvent.class)
Madan Jampani1af8e132015-04-30 16:41:18 -0700119 .register(MastershipEvent.Type.class)
Madan Jampani84b6b402015-02-25 17:49:54 -0800120 .build();
121 }
122 };
123
124 @Activate
125 public void activate() {
126 messageHandlingExecutor =
Madan Jampanif7536ab2015-05-07 23:23:23 -0700127 Executors.newSingleThreadExecutor(
128 groupedThreads("onos/store/device/mastership", "message-handler"));
129 transferExecutor =
130 Executors.newSingleThreadScheduledExecutor(
131 groupedThreads("onos/store/device/mastership", "mastership-transfer-executor"));
Madan Jampanid46e18f2015-05-04 23:19:33 -0700132 clusterCommunicator.<DeviceId, MastershipRole>addSubscriber(ROLE_QUERY_SUBJECT,
133 SERIALIZER::decode,
134 deviceId -> getRole(localNodeId, deviceId),
135 SERIALIZER::encode,
Madan Jampani84b6b402015-02-25 17:49:54 -0800136 messageHandlingExecutor);
Madan Jampanid46e18f2015-05-04 23:19:33 -0700137 clusterCommunicator.<DeviceId, MastershipEvent>addSubscriber(ROLE_RELINQUISH_SUBJECT,
138 SERIALIZER::decode,
Madan Jampanif7536ab2015-05-07 23:23:23 -0700139 this::relinquishLocalRole,
Madan Jampanid46e18f2015-05-04 23:19:33 -0700140 SERIALIZER::encode,
141 messageHandlingExecutor);
Madan Jampani9bd1f152015-04-30 23:33:35 -0700142 clusterCommunicator.addSubscriber(TRANSITION_FROM_MASTER_TO_STANDBY_SUBJECT,
Madan Jampani1af8e132015-04-30 16:41:18 -0700143 SERIALIZER::decode,
Madan Jampani9bd1f152015-04-30 23:33:35 -0700144 this::transitionFromMasterToStandby,
Madan Jampani1af8e132015-04-30 16:41:18 -0700145 SERIALIZER::encode,
146 messageHandlingExecutor);
Madan Jampani84b6b402015-02-25 17:49:54 -0800147 localNodeId = clusterService.getLocalNode().id();
148 leadershipService.addListener(leadershipEventListener);
149
150 log.info("Started.");
151 }
152
153 @Deactivate
154 public void deactivate() {
155 clusterCommunicator.removeSubscriber(ROLE_QUERY_SUBJECT);
156 clusterCommunicator.removeSubscriber(ROLE_RELINQUISH_SUBJECT);
Madan Jampani9bd1f152015-04-30 23:33:35 -0700157 clusterCommunicator.removeSubscriber(TRANSITION_FROM_MASTER_TO_STANDBY_SUBJECT);
Madan Jampani84b6b402015-02-25 17:49:54 -0800158 messageHandlingExecutor.shutdown();
Madan Jampanif7536ab2015-05-07 23:23:23 -0700159 transferExecutor.shutdown();
Madan Jampani84b6b402015-02-25 17:49:54 -0800160 leadershipService.removeListener(leadershipEventListener);
161
162 log.info("Stoppped.");
163 }
164
165 @Override
Madan Jampanide003d92015-05-11 17:14:20 -0700166 public CompletableFuture<MastershipRole> requestRole(DeviceId deviceId) {
Madan Jampani84b6b402015-02-25 17:49:54 -0800167 checkArgument(deviceId != null, DEVICE_ID_NULL);
168
169 String leadershipTopic = createDeviceMastershipTopic(deviceId);
170 if (connectedDevices.add(deviceId)) {
Madan Jampanide003d92015-05-11 17:14:20 -0700171 return leadershipService.runForLeadership(leadershipTopic)
172 .thenApply(leadership -> {
173 return Objects.equal(localNodeId, leadership.leader())
174 ? MastershipRole.MASTER : MastershipRole.STANDBY;
175 });
Madan Jampani84b6b402015-02-25 17:49:54 -0800176 } else {
177 Leadership leadership = leadershipService.getLeadership(leadershipTopic);
178 if (leadership != null && leadership.leader().equals(localNodeId)) {
Madan Jampanide003d92015-05-11 17:14:20 -0700179 return CompletableFuture.completedFuture(MastershipRole.MASTER);
Madan Jampani84b6b402015-02-25 17:49:54 -0800180 } else {
Madan Jampanide003d92015-05-11 17:14:20 -0700181 return CompletableFuture.completedFuture(MastershipRole.STANDBY);
Madan Jampani84b6b402015-02-25 17:49:54 -0800182 }
183 }
184 }
185
186 @Override
187 public MastershipRole getRole(NodeId nodeId, DeviceId deviceId) {
188 checkArgument(nodeId != null, NODE_ID_NULL);
189 checkArgument(deviceId != null, DEVICE_ID_NULL);
190
191 String leadershipTopic = createDeviceMastershipTopic(deviceId);
192 Leadership leadership = leadershipService.getLeadership(leadershipTopic);
193 if (leadership != null && nodeId.equals(leadership.leader())) {
194 return MastershipRole.MASTER;
195 }
196
197 if (localNodeId.equals(nodeId)) {
198 if (connectedDevices.contains(deviceId)) {
199 return MastershipRole.STANDBY;
200 } else {
201 return MastershipRole.NONE;
202 }
Madan Jampani84b6b402015-02-25 17:49:54 -0800203 }
Ayaka Koshibe941f8602015-04-15 14:17:08 -0700204 MastershipRole role = futureGetOrElse(clusterCommunicator.sendAndReceive(
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700205 deviceId,
206 ROLE_QUERY_SUBJECT,
207 SERIALIZER::encode,
208 SERIALIZER::decode,
Ayaka Koshibe941f8602015-04-15 14:17:08 -0700209 nodeId), null);
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700210 return role == null ? MastershipRole.NONE : role;
Madan Jampani84b6b402015-02-25 17:49:54 -0800211 }
212
213 @Override
214 public NodeId getMaster(DeviceId deviceId) {
215 checkArgument(deviceId != null, DEVICE_ID_NULL);
216
217 String leadershipTopic = createDeviceMastershipTopic(deviceId);
218 Leadership leadership = leadershipService.getLeadership(leadershipTopic);
219 return leadership != null ? leadership.leader() : null;
220 }
221
222 @Override
223 public RoleInfo getNodes(DeviceId deviceId) {
224 checkArgument(deviceId != null, DEVICE_ID_NULL);
225
226 Map<NodeId, MastershipRole> roles = Maps.newHashMap();
227 clusterService
228 .getNodes()
Madan Jampani84b6b402015-02-25 17:49:54 -0800229 .forEach((node) -> roles.put(node.id(), getRole(node.id(), deviceId)));
230
231 NodeId master = null;
232 final List<NodeId> standbys = Lists.newLinkedList();
233
Madan Jampani86940d92015-05-06 11:47:57 -0700234 List<NodeId> candidates = leadershipService.getCandidates(createDeviceMastershipTopic(deviceId));
235
Madan Jampani84b6b402015-02-25 17:49:54 -0800236 for (Map.Entry<NodeId, MastershipRole> entry : roles.entrySet()) {
237 if (entry.getValue() == MastershipRole.MASTER) {
238 master = entry.getKey();
239 } else if (entry.getValue() == MastershipRole.STANDBY) {
240 standbys.add(entry.getKey());
241 }
242 }
243
Madan Jampani86940d92015-05-06 11:47:57 -0700244 List<NodeId> sortedStandbyList = candidates.stream().filter(standbys::contains).collect(Collectors.toList());
245
246 return new RoleInfo(master, sortedStandbyList);
Madan Jampani84b6b402015-02-25 17:49:54 -0800247 }
248
249 @Override
250 public Set<DeviceId> getDevices(NodeId nodeId) {
251 checkArgument(nodeId != null, NODE_ID_NULL);
252
253 return leadershipService
254 .ownedTopics(nodeId)
255 .stream()
256 .filter(this::isDeviceMastershipTopic)
257 .map(this::extractDeviceIdFromTopic)
258 .collect(Collectors.toSet());
259 }
260
261 @Override
Madan Jampanif7536ab2015-05-07 23:23:23 -0700262 public CompletableFuture<MastershipEvent> setMaster(NodeId nodeId, DeviceId deviceId) {
Madan Jampani84b6b402015-02-25 17:49:54 -0800263 checkArgument(nodeId != null, NODE_ID_NULL);
264 checkArgument(deviceId != null, DEVICE_ID_NULL);
265
Madan Jampani1af8e132015-04-30 16:41:18 -0700266 NodeId currentMaster = getMaster(deviceId);
267 if (nodeId.equals(currentMaster)) {
Madan Jampanif7536ab2015-05-07 23:23:23 -0700268 return CompletableFuture.completedFuture(null);
Madan Jampani1af8e132015-04-30 16:41:18 -0700269 } else {
270 String leadershipTopic = createDeviceMastershipTopic(deviceId);
271 List<NodeId> candidates = leadershipService.getCandidates(leadershipTopic);
272 if (candidates.isEmpty()) {
Madan Jampanif7536ab2015-05-07 23:23:23 -0700273 return CompletableFuture.completedFuture(null);
Madan Jampani1af8e132015-04-30 16:41:18 -0700274 }
275 if (leadershipService.makeTopCandidate(leadershipTopic, nodeId)) {
Madan Jampanif7536ab2015-05-07 23:23:23 -0700276 CompletableFuture<MastershipEvent> result = new CompletableFuture<>();
277 // There is brief wait before we step down from mastership.
278 // This is to ensure any work that happens when standby preference
279 // order changes can complete. For example: flow entries need to be backed
280 // to the new top standby (ONOS-1883)
281 // FIXME: This potentially introduces a race-condition.
282 // Right now role changes are only forced via CLI.
283 transferExecutor.schedule(() -> {
284 result.complete(transitionFromMasterToStandby(deviceId));
285 }, WAIT_BEFORE_MASTERSHIP_HANDOFF_MILLIS, TimeUnit.MILLISECONDS);
286 return result;
Madan Jampani1af8e132015-04-30 16:41:18 -0700287 } else {
288 log.warn("Failed to promote {} to mastership for {}", nodeId, deviceId);
289 }
290 }
Madan Jampanif7536ab2015-05-07 23:23:23 -0700291 return CompletableFuture.completedFuture(null);
Madan Jampani84b6b402015-02-25 17:49:54 -0800292 }
293
294 @Override
295 public MastershipTerm getTermFor(DeviceId deviceId) {
296 checkArgument(deviceId != null, DEVICE_ID_NULL);
297
298 String leadershipTopic = createDeviceMastershipTopic(deviceId);
299 Leadership leadership = leadershipService.getLeadership(leadershipTopic);
300 return leadership != null ? MastershipTerm.of(leadership.leader(), leadership.epoch()) : null;
301 }
302
303 @Override
Madan Jampanif7536ab2015-05-07 23:23:23 -0700304 public CompletableFuture<MastershipEvent> setStandby(NodeId nodeId, DeviceId deviceId) {
Madan Jampani84b6b402015-02-25 17:49:54 -0800305 checkArgument(nodeId != null, NODE_ID_NULL);
306 checkArgument(deviceId != null, DEVICE_ID_NULL);
307
Madan Jampani1af8e132015-04-30 16:41:18 -0700308 NodeId currentMaster = getMaster(deviceId);
309 if (!nodeId.equals(currentMaster)) {
Madan Jampanif7536ab2015-05-07 23:23:23 -0700310 return CompletableFuture.completedFuture(null);
Madan Jampani1af8e132015-04-30 16:41:18 -0700311 }
Madan Jampanid46e18f2015-05-04 23:19:33 -0700312
313 String leadershipTopic = createDeviceMastershipTopic(deviceId);
314 List<NodeId> candidates = leadershipService.getCandidates(leadershipTopic);
315
316 NodeId newMaster = candidates.stream()
317 .filter(candidate -> !Objects.equal(nodeId, candidate))
318 .findFirst()
319 .orElse(null);
320 log.info("Transitioning to role {} for {}. Next master: {}",
321 newMaster != null ? MastershipRole.STANDBY : MastershipRole.NONE, deviceId, newMaster);
322
323 if (newMaster != null) {
324 return setMaster(newMaster, deviceId);
325 }
326 return relinquishRole(nodeId, deviceId);
Madan Jampani84b6b402015-02-25 17:49:54 -0800327 }
328
329 @Override
Madan Jampanif7536ab2015-05-07 23:23:23 -0700330 public CompletableFuture<MastershipEvent> relinquishRole(NodeId nodeId, DeviceId deviceId) {
Madan Jampani84b6b402015-02-25 17:49:54 -0800331 checkArgument(nodeId != null, NODE_ID_NULL);
332 checkArgument(deviceId != null, DEVICE_ID_NULL);
333
334 if (!nodeId.equals(localNodeId)) {
335 log.debug("Forwarding request to relinquish "
336 + "role for device {} to {}", deviceId, nodeId);
Madan Jampanif7536ab2015-05-07 23:23:23 -0700337 return clusterCommunicator.sendAndReceive(
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700338 deviceId,
339 ROLE_RELINQUISH_SUBJECT,
340 SERIALIZER::encode,
341 SERIALIZER::decode,
Madan Jampanif7536ab2015-05-07 23:23:23 -0700342 nodeId);
Madan Jampani84b6b402015-02-25 17:49:54 -0800343 }
Madan Jampanif7536ab2015-05-07 23:23:23 -0700344 return CompletableFuture.completedFuture(relinquishLocalRole(deviceId));
345 }
346
347 private MastershipEvent relinquishLocalRole(DeviceId deviceId) {
348 checkArgument(deviceId != null, DEVICE_ID_NULL);
Madan Jampani84b6b402015-02-25 17:49:54 -0800349
350 // Check if this node is can be managed by this node.
351 if (!connectedDevices.contains(deviceId)) {
352 return null;
353 }
354
355 String leadershipTopic = createDeviceMastershipTopic(deviceId);
Madan Jampani9bd1f152015-04-30 23:33:35 -0700356 NodeId currentLeader = leadershipService.getLeader(leadershipTopic);
Madan Jampani84b6b402015-02-25 17:49:54 -0800357
Madan Jampani9bd1f152015-04-30 23:33:35 -0700358 MastershipEvent.Type eventType = Objects.equal(currentLeader, localNodeId)
359 ? MastershipEvent.Type.MASTER_CHANGED
360 : MastershipEvent.Type.BACKUPS_CHANGED;
Madan Jampani84b6b402015-02-25 17:49:54 -0800361
362 connectedDevices.remove(deviceId);
363 leadershipService.withdraw(leadershipTopic);
364
365 return new MastershipEvent(eventType, deviceId, getNodes(deviceId));
366 }
367
Madan Jampani9bd1f152015-04-30 23:33:35 -0700368 private MastershipEvent transitionFromMasterToStandby(DeviceId deviceId) {
Madan Jampani1af8e132015-04-30 16:41:18 -0700369 checkArgument(deviceId != null, DEVICE_ID_NULL);
370
371 NodeId currentMaster = getMaster(deviceId);
372 if (currentMaster == null) {
373 return null;
374 }
375
376 if (!currentMaster.equals(localNodeId)) {
377 log.info("Forwarding request to relinquish "
378 + "mastership for device {} to {}", deviceId, currentMaster);
379 return futureGetOrElse(clusterCommunicator.sendAndReceive(
380 deviceId,
Madan Jampani9bd1f152015-04-30 23:33:35 -0700381 TRANSITION_FROM_MASTER_TO_STANDBY_SUBJECT,
Madan Jampani1af8e132015-04-30 16:41:18 -0700382 SERIALIZER::encode,
383 SERIALIZER::decode,
384 currentMaster), null);
385 }
386
Madan Jampani9bd1f152015-04-30 23:33:35 -0700387 return leadershipService.stepdown(createDeviceMastershipTopic(deviceId))
388 ? new MastershipEvent(MastershipEvent.Type.MASTER_CHANGED, deviceId, getNodes(deviceId)) : null;
Madan Jampani1af8e132015-04-30 16:41:18 -0700389 }
390
Madan Jampani84b6b402015-02-25 17:49:54 -0800391 @Override
392 public void relinquishAllRole(NodeId nodeId) {
393 // Noop. LeadershipService already takes care of detecting and purging deadlocks.
394 }
395
Madan Jampani84b6b402015-02-25 17:49:54 -0800396 private class InternalDeviceMastershipEventListener implements LeadershipEventListener {
397 @Override
398 public void event(LeadershipEvent event) {
399 Leadership leadership = event.subject();
400 if (!isDeviceMastershipTopic(leadership.topic())) {
401 return;
402 }
403 NodeId nodeId = leadership.leader();
404 DeviceId deviceId = extractDeviceIdFromTopic(leadership.topic());
405 if (Objects.equal(nodeId, localNodeId) && connectedDevices.contains(deviceId)) {
406 switch (event.type()) {
407 case LEADER_ELECTED:
408 notifyDelegate(new MastershipEvent(MASTER_CHANGED, deviceId, getNodes(deviceId)));
409 break;
410 case LEADER_REELECTED:
411 // There is no concept of leader re-election in the new distributed leadership manager.
412 throw new IllegalStateException("Unexpected event type");
413 case LEADER_BOOTED:
Madan Jampania2a229c2015-05-11 17:16:30 -0700414 notifyDelegate(new MastershipEvent(MASTER_CHANGED, deviceId, getNodes(deviceId)));
Madan Jampani84b6b402015-02-25 17:49:54 -0800415 break;
Madan Jampani0f6ad142015-05-13 17:10:04 -0700416 case CANDIDATES_CHANGED:
417 notifyDelegate(new MastershipEvent(BACKUPS_CHANGED, deviceId, getNodes(deviceId)));
418 break;
Madan Jampani84b6b402015-02-25 17:49:54 -0800419 default:
420 return;
421 }
422 }
423 }
424 }
425
426 private String createDeviceMastershipTopic(DeviceId deviceId) {
Madan Jampani5756c352015-04-29 00:23:58 -0700427 return String.format("device:%s", deviceId.toString());
Madan Jampani84b6b402015-02-25 17:49:54 -0800428 }
429
430 private DeviceId extractDeviceIdFromTopic(String topic) {
431 Matcher m = DEVICE_MASTERSHIP_TOPIC_PATTERN.matcher(topic);
432 if (m.matches()) {
433 return DeviceId.deviceId(m.group(1));
434 } else {
435 throw new IllegalArgumentException("Invalid device mastership topic: " + topic);
436 }
437 }
438
439 private boolean isDeviceMastershipTopic(String topic) {
440 Matcher m = DEVICE_MASTERSHIP_TOPIC_PATTERN.matcher(topic);
441 return m.matches();
442 }
443
Madan Jampanic26eede2015-04-16 11:42:16 -0700444}