blob: 5d9f45379440d3ff04b62fe731dd1fe8bb8930c4 [file] [log] [blame]
Madan Jampani84b6b402015-02-25 17:49:54 -08001/*
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2015-present Open Networking Foundation
Madan Jampani84b6b402015-02-25 17:49:54 -08003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.mastership.impl;
17
18import static org.onlab.util.Tools.groupedThreads;
Madan Jampani0f6ad142015-05-13 17:10:04 -070019import static org.onosproject.mastership.MastershipEvent.Type.BACKUPS_CHANGED;
Madan Jampani84b6b402015-02-25 17:49:54 -080020import static org.onosproject.mastership.MastershipEvent.Type.MASTER_CHANGED;
Jon Hall7a8bfc62016-05-26 17:59:04 -070021import static org.onosproject.mastership.MastershipEvent.Type.SUSPENDED;
Madan Jampani84b6b402015-02-25 17:49:54 -080022import static org.slf4j.LoggerFactory.getLogger;
23import static com.google.common.base.Preconditions.checkArgument;
24
Madan Jampani84b6b402015-02-25 17:49:54 -080025import java.util.List;
26import java.util.Map;
27import java.util.Set;
Madan Jampanif7536ab2015-05-07 23:23:23 -070028import java.util.concurrent.CompletableFuture;
Madan Jampani84b6b402015-02-25 17:49:54 -080029import java.util.concurrent.ExecutorService;
30import java.util.concurrent.Executors;
Madan Jampanif7536ab2015-05-07 23:23:23 -070031import java.util.concurrent.ScheduledExecutorService;
32import java.util.concurrent.TimeUnit;
Madan Jampani84b6b402015-02-25 17:49:54 -080033import java.util.regex.Matcher;
34import java.util.regex.Pattern;
35import java.util.stream.Collectors;
36
37import org.apache.felix.scr.annotations.Activate;
38import org.apache.felix.scr.annotations.Component;
39import org.apache.felix.scr.annotations.Deactivate;
40import org.apache.felix.scr.annotations.Reference;
41import org.apache.felix.scr.annotations.ReferenceCardinality;
42import org.apache.felix.scr.annotations.Service;
43import org.onlab.util.KryoNamespace;
44import org.onosproject.cluster.ClusterService;
45import org.onosproject.cluster.Leadership;
Madan Jampani620f70d2016-01-30 22:22:47 -080046import org.onosproject.cluster.LeadershipAdminService;
Madan Jampani84b6b402015-02-25 17:49:54 -080047import org.onosproject.cluster.LeadershipEvent;
48import org.onosproject.cluster.LeadershipEventListener;
49import org.onosproject.cluster.LeadershipService;
50import org.onosproject.cluster.NodeId;
51import org.onosproject.cluster.RoleInfo;
52import org.onosproject.mastership.MastershipEvent;
53import org.onosproject.mastership.MastershipStore;
54import org.onosproject.mastership.MastershipStoreDelegate;
55import org.onosproject.mastership.MastershipTerm;
56import org.onosproject.net.DeviceId;
57import org.onosproject.net.MastershipRole;
58import org.onosproject.store.AbstractStore;
59import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
Madan Jampani84b6b402015-02-25 17:49:54 -080060import org.onosproject.store.cluster.messaging.MessageSubject;
61import org.onosproject.store.serializers.KryoNamespaces;
Jordan Halterman2c83a102017-08-20 17:11:41 -070062import org.onosproject.store.service.Serializer;
Jordan Halterman980a8c12017-09-22 18:01:19 -070063import org.onosproject.upgrade.UpgradeService;
Madan Jampani84b6b402015-02-25 17:49:54 -080064import org.slf4j.Logger;
65
66import com.google.common.base.Objects;
Madan Jampani620f70d2016-01-30 22:22:47 -080067import com.google.common.collect.ImmutableList;
Madan Jampani84b6b402015-02-25 17:49:54 -080068import com.google.common.collect.Lists;
69import com.google.common.collect.Maps;
Madan Jampani84b6b402015-02-25 17:49:54 -080070
71/**
72 * Implementation of the MastershipStore on top of Leadership Service.
73 */
Sho SHIMIZU5c396e32016-08-12 15:19:12 -070074@Component(immediate = true)
Madan Jampani84b6b402015-02-25 17:49:54 -080075@Service
76public class ConsistentDeviceMastershipStore
77 extends AbstractStore<MastershipEvent, MastershipStoreDelegate>
78 implements MastershipStore {
79
80 private final Logger log = getLogger(getClass());
81
82 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
83 protected LeadershipService leadershipService;
84
85 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Madan Jampani620f70d2016-01-30 22:22:47 -080086 protected LeadershipAdminService leadershipAdminService;
87
88 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Madan Jampani84b6b402015-02-25 17:49:54 -080089 protected ClusterService clusterService;
90
91 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
92 protected ClusterCommunicationService clusterCommunicator;
93
Jordan Halterman980a8c12017-09-22 18:01:19 -070094 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
95 protected UpgradeService upgradeService;
96
Madan Jampani84b6b402015-02-25 17:49:54 -080097 private NodeId localNodeId;
Madan Jampani84b6b402015-02-25 17:49:54 -080098
Madan Jampani84b6b402015-02-25 17:49:54 -080099 private static final MessageSubject ROLE_RELINQUISH_SUBJECT =
100 new MessageSubject("mastership-store-device-role-relinquish");
101
102 private static final Pattern DEVICE_MASTERSHIP_TOPIC_PATTERN =
Madan Jampani5756c352015-04-29 00:23:58 -0700103 Pattern.compile("device:(.*)");
Madan Jampani84b6b402015-02-25 17:49:54 -0800104
Madan Jampani71c32ca2016-06-22 08:23:18 -0700105 private ExecutorService eventHandler;
Madan Jampani84b6b402015-02-25 17:49:54 -0800106 private ExecutorService messageHandlingExecutor;
Madan Jampanif7536ab2015-05-07 23:23:23 -0700107 private ScheduledExecutorService transferExecutor;
Madan Jampani84b6b402015-02-25 17:49:54 -0800108 private final LeadershipEventListener leadershipEventListener =
109 new InternalDeviceMastershipEventListener();
110
111 private static final String NODE_ID_NULL = "Node ID cannot be null";
Madan Jampanif7536ab2015-05-07 23:23:23 -0700112 private static final String DEVICE_ID_NULL = "Device ID cannot be null";
113 private static final int WAIT_BEFORE_MASTERSHIP_HANDOFF_MILLIS = 3000;
Madan Jampani84b6b402015-02-25 17:49:54 -0800114
Jordan Halterman2c83a102017-08-20 17:11:41 -0700115 public static final Serializer SERIALIZER = Serializer.using(
HIGUCHI Yutae7290652016-05-18 11:29:01 -0700116 KryoNamespace.newBuilder()
Madan Jampani84b6b402015-02-25 17:49:54 -0800117 .register(KryoNamespaces.API)
118 .register(MastershipRole.class)
119 .register(MastershipEvent.class)
Madan Jampani1af8e132015-04-30 16:41:18 -0700120 .register(MastershipEvent.Type.class)
HIGUCHI Yutae7290652016-05-18 11:29:01 -0700121 .build("MastershipStore"));
Madan Jampani84b6b402015-02-25 17:49:54 -0800122
123 @Activate
124 public void activate() {
Madan Jampani71c32ca2016-06-22 08:23:18 -0700125
126 eventHandler = Executors.newSingleThreadExecutor(
127 groupedThreads("onos/store/device/mastership", "event-handler", log));
128
Madan Jampani84b6b402015-02-25 17:49:54 -0800129 messageHandlingExecutor =
Madan Jampanif7536ab2015-05-07 23:23:23 -0700130 Executors.newSingleThreadExecutor(
HIGUCHI Yuta060da9a2016-03-11 19:16:35 -0800131 groupedThreads("onos/store/device/mastership", "message-handler", log));
Madan Jampanif7536ab2015-05-07 23:23:23 -0700132 transferExecutor =
133 Executors.newSingleThreadScheduledExecutor(
HIGUCHI Yuta060da9a2016-03-11 19:16:35 -0800134 groupedThreads("onos/store/device/mastership", "mastership-transfer-executor", log));
Madan Jampanibbed40422015-05-20 12:00:38 -0700135 clusterCommunicator.addSubscriber(ROLE_RELINQUISH_SUBJECT,
Madan Jampanid46e18f2015-05-04 23:19:33 -0700136 SERIALIZER::decode,
Madan Jampanif7536ab2015-05-07 23:23:23 -0700137 this::relinquishLocalRole,
Madan Jampanid46e18f2015-05-04 23:19:33 -0700138 SERIALIZER::encode,
139 messageHandlingExecutor);
Madan Jampani84b6b402015-02-25 17:49:54 -0800140 localNodeId = clusterService.getLocalNode().id();
141 leadershipService.addListener(leadershipEventListener);
142
Madan Jampaniafeebbd2015-05-19 15:26:01 -0700143 log.info("Started");
Madan Jampani84b6b402015-02-25 17:49:54 -0800144 }
145
146 @Deactivate
147 public void deactivate() {
Madan Jampani84b6b402015-02-25 17:49:54 -0800148 clusterCommunicator.removeSubscriber(ROLE_RELINQUISH_SUBJECT);
Madan Jampani71c32ca2016-06-22 08:23:18 -0700149 leadershipService.removeListener(leadershipEventListener);
Madan Jampani84b6b402015-02-25 17:49:54 -0800150 messageHandlingExecutor.shutdown();
Madan Jampanif7536ab2015-05-07 23:23:23 -0700151 transferExecutor.shutdown();
Madan Jampani71c32ca2016-06-22 08:23:18 -0700152 eventHandler.shutdown();
Madan Jampaniafeebbd2015-05-19 15:26:01 -0700153 log.info("Stopped");
Madan Jampani84b6b402015-02-25 17:49:54 -0800154 }
155
156 @Override
Madan Jampanide003d92015-05-11 17:14:20 -0700157 public CompletableFuture<MastershipRole> requestRole(DeviceId deviceId) {
Madan Jampani84b6b402015-02-25 17:49:54 -0800158 checkArgument(deviceId != null, DEVICE_ID_NULL);
159
160 String leadershipTopic = createDeviceMastershipTopic(deviceId);
Madan Jampani620f70d2016-01-30 22:22:47 -0800161 Leadership leadership = leadershipService.runForLeadership(leadershipTopic);
Jordan Halterman980a8c12017-09-22 18:01:19 -0700162 NodeId leader = leadership == null ? null : leadership.leaderNodeId();
163 List<NodeId> candidates = leadership == null ?
164 ImmutableList.of() : ImmutableList.copyOf(leadership.candidates());
165 MastershipRole role = Objects.equal(localNodeId, leader) ?
166 MastershipRole.MASTER : candidates.contains(localNodeId) ? MastershipRole.STANDBY : MastershipRole.NONE;
167 return CompletableFuture.completedFuture(role);
Madan Jampani84b6b402015-02-25 17:49:54 -0800168 }
169
170 @Override
171 public MastershipRole getRole(NodeId nodeId, DeviceId deviceId) {
172 checkArgument(nodeId != null, NODE_ID_NULL);
173 checkArgument(deviceId != null, DEVICE_ID_NULL);
174
175 String leadershipTopic = createDeviceMastershipTopic(deviceId);
Madan Jampani620f70d2016-01-30 22:22:47 -0800176 Leadership leadership = leadershipService.getLeadership(leadershipTopic);
177 NodeId leader = leadership == null ? null : leadership.leaderNodeId();
178 List<NodeId> candidates = leadership == null ?
179 ImmutableList.of() : ImmutableList.copyOf(leadership.candidates());
180 return Objects.equal(nodeId, leader) ?
181 MastershipRole.MASTER : candidates.contains(nodeId) ? MastershipRole.STANDBY : MastershipRole.NONE;
Madan Jampani84b6b402015-02-25 17:49:54 -0800182 }
183
184 @Override
185 public NodeId getMaster(DeviceId deviceId) {
186 checkArgument(deviceId != null, DEVICE_ID_NULL);
187
Madan Jampani620f70d2016-01-30 22:22:47 -0800188 return leadershipService.getLeader(createDeviceMastershipTopic(deviceId));
Madan Jampani84b6b402015-02-25 17:49:54 -0800189 }
190
191 @Override
192 public RoleInfo getNodes(DeviceId deviceId) {
193 checkArgument(deviceId != null, DEVICE_ID_NULL);
194
195 Map<NodeId, MastershipRole> roles = Maps.newHashMap();
Madan Jampani620f70d2016-01-30 22:22:47 -0800196 clusterService.getNodes()
197 .forEach((node) -> roles.put(node.id(), getRole(node.id(), deviceId)));
Madan Jampani84b6b402015-02-25 17:49:54 -0800198
199 NodeId master = null;
200 final List<NodeId> standbys = Lists.newLinkedList();
201
Madan Jampani86940d92015-05-06 11:47:57 -0700202 List<NodeId> candidates = leadershipService.getCandidates(createDeviceMastershipTopic(deviceId));
203
Madan Jampani84b6b402015-02-25 17:49:54 -0800204 for (Map.Entry<NodeId, MastershipRole> entry : roles.entrySet()) {
205 if (entry.getValue() == MastershipRole.MASTER) {
206 master = entry.getKey();
207 } else if (entry.getValue() == MastershipRole.STANDBY) {
208 standbys.add(entry.getKey());
209 }
210 }
211
Madan Jampani86940d92015-05-06 11:47:57 -0700212 List<NodeId> sortedStandbyList = candidates.stream().filter(standbys::contains).collect(Collectors.toList());
213
214 return new RoleInfo(master, sortedStandbyList);
Madan Jampani84b6b402015-02-25 17:49:54 -0800215 }
216
217 @Override
218 public Set<DeviceId> getDevices(NodeId nodeId) {
219 checkArgument(nodeId != null, NODE_ID_NULL);
220
Yuta HIGUCHId9340032017-01-25 09:25:44 -0800221 // FIXME This result contains REMOVED device.
222 // MastershipService cannot listen to DeviceEvent to GC removed topic,
223 // since DeviceManager depend on it.
224 // Reference count, etc. at LeadershipService layer?
Madan Jampani84b6b402015-02-25 17:49:54 -0800225 return leadershipService
226 .ownedTopics(nodeId)
227 .stream()
228 .filter(this::isDeviceMastershipTopic)
229 .map(this::extractDeviceIdFromTopic)
230 .collect(Collectors.toSet());
231 }
232
233 @Override
Madan Jampanif7536ab2015-05-07 23:23:23 -0700234 public CompletableFuture<MastershipEvent> setMaster(NodeId nodeId, DeviceId deviceId) {
Madan Jampani84b6b402015-02-25 17:49:54 -0800235 checkArgument(nodeId != null, NODE_ID_NULL);
236 checkArgument(deviceId != null, DEVICE_ID_NULL);
237
Madan Jampani620f70d2016-01-30 22:22:47 -0800238 String leadershipTopic = createDeviceMastershipTopic(deviceId);
239 if (leadershipAdminService.promoteToTopOfCandidateList(leadershipTopic, nodeId)) {
240 transferExecutor.schedule(() -> leadershipAdminService.transferLeadership(leadershipTopic, nodeId),
241 WAIT_BEFORE_MASTERSHIP_HANDOFF_MILLIS, TimeUnit.MILLISECONDS);
Madan Jampani1af8e132015-04-30 16:41:18 -0700242 }
Madan Jampanif7536ab2015-05-07 23:23:23 -0700243 return CompletableFuture.completedFuture(null);
Madan Jampani84b6b402015-02-25 17:49:54 -0800244 }
245
246 @Override
247 public MastershipTerm getTermFor(DeviceId deviceId) {
248 checkArgument(deviceId != null, DEVICE_ID_NULL);
249
250 String leadershipTopic = createDeviceMastershipTopic(deviceId);
251 Leadership leadership = leadershipService.getLeadership(leadershipTopic);
Madan Jampanidbe8a812016-01-31 21:10:46 -0800252 return leadership != null && leadership.leaderNodeId() != null ?
253 MastershipTerm.of(leadership.leaderNodeId(), leadership.leader().term()) : null;
Madan Jampani84b6b402015-02-25 17:49:54 -0800254 }
255
256 @Override
Madan Jampanif7536ab2015-05-07 23:23:23 -0700257 public CompletableFuture<MastershipEvent> setStandby(NodeId nodeId, DeviceId deviceId) {
Madan Jampani84b6b402015-02-25 17:49:54 -0800258 checkArgument(nodeId != null, NODE_ID_NULL);
259 checkArgument(deviceId != null, DEVICE_ID_NULL);
260
Madan Jampani1af8e132015-04-30 16:41:18 -0700261 NodeId currentMaster = getMaster(deviceId);
262 if (!nodeId.equals(currentMaster)) {
Madan Jampanif7536ab2015-05-07 23:23:23 -0700263 return CompletableFuture.completedFuture(null);
Madan Jampani1af8e132015-04-30 16:41:18 -0700264 }
Madan Jampanid46e18f2015-05-04 23:19:33 -0700265
266 String leadershipTopic = createDeviceMastershipTopic(deviceId);
267 List<NodeId> candidates = leadershipService.getCandidates(leadershipTopic);
268
269 NodeId newMaster = candidates.stream()
270 .filter(candidate -> !Objects.equal(nodeId, candidate))
271 .findFirst()
272 .orElse(null);
273 log.info("Transitioning to role {} for {}. Next master: {}",
274 newMaster != null ? MastershipRole.STANDBY : MastershipRole.NONE, deviceId, newMaster);
275
276 if (newMaster != null) {
277 return setMaster(newMaster, deviceId);
278 }
279 return relinquishRole(nodeId, deviceId);
Madan Jampani84b6b402015-02-25 17:49:54 -0800280 }
281
282 @Override
Madan Jampanif7536ab2015-05-07 23:23:23 -0700283 public CompletableFuture<MastershipEvent> relinquishRole(NodeId nodeId, DeviceId deviceId) {
Madan Jampani84b6b402015-02-25 17:49:54 -0800284 checkArgument(nodeId != null, NODE_ID_NULL);
285 checkArgument(deviceId != null, DEVICE_ID_NULL);
286
Madan Jampanibbed40422015-05-20 12:00:38 -0700287 if (nodeId.equals(localNodeId)) {
288 return relinquishLocalRole(deviceId);
Madan Jampani84b6b402015-02-25 17:49:54 -0800289 }
Madan Jampanibbed40422015-05-20 12:00:38 -0700290
291 log.debug("Forwarding request to relinquish "
292 + "role for device {} to {}", deviceId, nodeId);
293 return clusterCommunicator.sendAndReceive(
294 deviceId,
295 ROLE_RELINQUISH_SUBJECT,
296 SERIALIZER::encode,
297 SERIALIZER::decode,
298 nodeId);
Madan Jampanif7536ab2015-05-07 23:23:23 -0700299 }
300
Madan Jampanibbed40422015-05-20 12:00:38 -0700301 private CompletableFuture<MastershipEvent> relinquishLocalRole(DeviceId deviceId) {
Madan Jampanif7536ab2015-05-07 23:23:23 -0700302 checkArgument(deviceId != null, DEVICE_ID_NULL);
Madan Jampani84b6b402015-02-25 17:49:54 -0800303
Madan Jampani620f70d2016-01-30 22:22:47 -0800304 String leadershipTopic = createDeviceMastershipTopic(deviceId);
305 if (!leadershipService.getCandidates(leadershipTopic).contains(localNodeId)) {
Madan Jampanibbed40422015-05-20 12:00:38 -0700306 return CompletableFuture.completedFuture(null);
Madan Jampani84b6b402015-02-25 17:49:54 -0800307 }
Madan Jampani620f70d2016-01-30 22:22:47 -0800308 MastershipEvent.Type eventType = localNodeId.equals(leadershipService.getLeader(leadershipTopic)) ?
309 MastershipEvent.Type.MASTER_CHANGED : MastershipEvent.Type.BACKUPS_CHANGED;
310 leadershipService.withdraw(leadershipTopic);
311 return CompletableFuture.completedFuture(new MastershipEvent(eventType, deviceId, getNodes(deviceId)));
Madan Jampani1af8e132015-04-30 16:41:18 -0700312 }
313
Madan Jampani84b6b402015-02-25 17:49:54 -0800314 @Override
315 public void relinquishAllRole(NodeId nodeId) {
Madan Jampani620f70d2016-01-30 22:22:47 -0800316 // Noop. LeadershipService already takes care of detecting and purging stale locks.
Madan Jampani84b6b402015-02-25 17:49:54 -0800317 }
318
Madan Jampani84b6b402015-02-25 17:49:54 -0800319 private class InternalDeviceMastershipEventListener implements LeadershipEventListener {
Madan Jampani620f70d2016-01-30 22:22:47 -0800320
321 @Override
322 public boolean isRelevant(LeadershipEvent event) {
323 Leadership leadership = event.subject();
324 return isDeviceMastershipTopic(leadership.topic());
325 }
326
Madan Jampani84b6b402015-02-25 17:49:54 -0800327 @Override
328 public void event(LeadershipEvent event) {
Madan Jampani71c32ca2016-06-22 08:23:18 -0700329 eventHandler.execute(() -> handleEvent(event));
330 }
331
332 private void handleEvent(LeadershipEvent event) {
Madan Jampani84b6b402015-02-25 17:49:54 -0800333 Leadership leadership = event.subject();
Madan Jampani84b6b402015-02-25 17:49:54 -0800334 DeviceId deviceId = extractDeviceIdFromTopic(leadership.topic());
Jon Hall7a8bfc62016-05-26 17:59:04 -0700335 RoleInfo roleInfo = event.type() != LeadershipEvent.Type.SERVICE_DISRUPTED ?
336 getNodes(deviceId) : new RoleInfo();
Thomas Vachuska4b839c72015-05-18 15:43:03 -0700337 switch (event.type()) {
Madan Jampani620f70d2016-01-30 22:22:47 -0800338 case LEADER_AND_CANDIDATES_CHANGED:
339 notifyDelegate(new MastershipEvent(BACKUPS_CHANGED, deviceId, roleInfo));
340 notifyDelegate(new MastershipEvent(MASTER_CHANGED, deviceId, roleInfo));
Thomas Vachuska4b839c72015-05-18 15:43:03 -0700341 break;
Madan Jampani620f70d2016-01-30 22:22:47 -0800342 case LEADER_CHANGED:
343 notifyDelegate(new MastershipEvent(MASTER_CHANGED, deviceId, roleInfo));
Thomas Vachuska4b839c72015-05-18 15:43:03 -0700344 break;
345 case CANDIDATES_CHANGED:
Madan Jampani620f70d2016-01-30 22:22:47 -0800346 notifyDelegate(new MastershipEvent(BACKUPS_CHANGED, deviceId, roleInfo));
Thomas Vachuska4b839c72015-05-18 15:43:03 -0700347 break;
Jon Hall7a8bfc62016-05-26 17:59:04 -0700348 case SERVICE_DISRUPTED:
349 notifyDelegate(new MastershipEvent(SUSPENDED, deviceId, roleInfo));
350 break;
351 case SERVICE_RESTORED:
352 // Do nothing, wait for updates from peers
353 break;
Thomas Vachuska4b839c72015-05-18 15:43:03 -0700354 default:
355 return;
Madan Jampani84b6b402015-02-25 17:49:54 -0800356 }
357 }
358 }
359
360 private String createDeviceMastershipTopic(DeviceId deviceId) {
Madan Jampani5756c352015-04-29 00:23:58 -0700361 return String.format("device:%s", deviceId.toString());
Madan Jampani84b6b402015-02-25 17:49:54 -0800362 }
363
364 private DeviceId extractDeviceIdFromTopic(String topic) {
365 Matcher m = DEVICE_MASTERSHIP_TOPIC_PATTERN.matcher(topic);
366 if (m.matches()) {
367 return DeviceId.deviceId(m.group(1));
368 } else {
369 throw new IllegalArgumentException("Invalid device mastership topic: " + topic);
370 }
371 }
372
373 private boolean isDeviceMastershipTopic(String topic) {
374 Matcher m = DEVICE_MASTERSHIP_TOPIC_PATTERN.matcher(topic);
375 return m.matches();
376 }
Madan Jampanidbe8a812016-01-31 21:10:46 -0800377}