blob: 3d852b19b1332a51732a7235edd9b63f7c0c99f3 [file] [log] [blame]
Madan Jampani84b6b402015-02-25 17:49:54 -08001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.mastership.impl;
17
18import static org.onlab.util.Tools.groupedThreads;
Ayaka Koshibe941f8602015-04-15 14:17:08 -070019import static org.onlab.util.Tools.futureGetOrElse;
Madan Jampani0f6ad142015-05-13 17:10:04 -070020import static org.onosproject.mastership.MastershipEvent.Type.BACKUPS_CHANGED;
Madan Jampani84b6b402015-02-25 17:49:54 -080021import static org.onosproject.mastership.MastershipEvent.Type.MASTER_CHANGED;
22import static org.slf4j.LoggerFactory.getLogger;
23import static com.google.common.base.Preconditions.checkArgument;
24
Madan Jampani84b6b402015-02-25 17:49:54 -080025import java.util.List;
26import java.util.Map;
27import java.util.Set;
Madan Jampanif7536ab2015-05-07 23:23:23 -070028import java.util.concurrent.CompletableFuture;
Madan Jampani84b6b402015-02-25 17:49:54 -080029import java.util.concurrent.ExecutorService;
30import java.util.concurrent.Executors;
Madan Jampanif7536ab2015-05-07 23:23:23 -070031import java.util.concurrent.ScheduledExecutorService;
32import java.util.concurrent.TimeUnit;
Madan Jampani84b6b402015-02-25 17:49:54 -080033import java.util.regex.Matcher;
34import java.util.regex.Pattern;
35import java.util.stream.Collectors;
36
37import org.apache.felix.scr.annotations.Activate;
38import org.apache.felix.scr.annotations.Component;
39import org.apache.felix.scr.annotations.Deactivate;
40import org.apache.felix.scr.annotations.Reference;
41import org.apache.felix.scr.annotations.ReferenceCardinality;
42import org.apache.felix.scr.annotations.Service;
43import org.onlab.util.KryoNamespace;
44import org.onosproject.cluster.ClusterService;
45import org.onosproject.cluster.Leadership;
46import org.onosproject.cluster.LeadershipEvent;
47import org.onosproject.cluster.LeadershipEventListener;
48import org.onosproject.cluster.LeadershipService;
49import org.onosproject.cluster.NodeId;
50import org.onosproject.cluster.RoleInfo;
51import org.onosproject.mastership.MastershipEvent;
52import org.onosproject.mastership.MastershipStore;
53import org.onosproject.mastership.MastershipStoreDelegate;
54import org.onosproject.mastership.MastershipTerm;
55import org.onosproject.net.DeviceId;
56import org.onosproject.net.MastershipRole;
57import org.onosproject.store.AbstractStore;
58import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
Madan Jampani84b6b402015-02-25 17:49:54 -080059import org.onosproject.store.cluster.messaging.MessageSubject;
60import org.onosproject.store.serializers.KryoNamespaces;
61import org.onosproject.store.serializers.KryoSerializer;
62import org.onosproject.store.serializers.StoreSerializer;
63import org.slf4j.Logger;
64
65import com.google.common.base.Objects;
66import com.google.common.collect.Lists;
67import com.google.common.collect.Maps;
68import com.google.common.collect.Sets;
69
70/**
71 * Implementation of the MastershipStore on top of Leadership Service.
72 */
Madan Jampani5756c352015-04-29 00:23:58 -070073@Component(immediate = true, enabled = true)
Madan Jampani84b6b402015-02-25 17:49:54 -080074@Service
75public class ConsistentDeviceMastershipStore
76 extends AbstractStore<MastershipEvent, MastershipStoreDelegate>
77 implements MastershipStore {
78
79 private final Logger log = getLogger(getClass());
80
81 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
82 protected LeadershipService leadershipService;
83
84 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
85 protected ClusterService clusterService;
86
87 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
88 protected ClusterCommunicationService clusterCommunicator;
89
90 private NodeId localNodeId;
91 private final Set<DeviceId> connectedDevices = Sets.newHashSet();
92
Madan Jampani84b6b402015-02-25 17:49:54 -080093 private static final MessageSubject ROLE_RELINQUISH_SUBJECT =
94 new MessageSubject("mastership-store-device-role-relinquish");
Madan Jampani9bd1f152015-04-30 23:33:35 -070095 private static final MessageSubject TRANSITION_FROM_MASTER_TO_STANDBY_SUBJECT =
Madan Jampani1af8e132015-04-30 16:41:18 -070096 new MessageSubject("mastership-store-device-mastership-relinquish");
Madan Jampani84b6b402015-02-25 17:49:54 -080097
98 private static final Pattern DEVICE_MASTERSHIP_TOPIC_PATTERN =
Madan Jampani5756c352015-04-29 00:23:58 -070099 Pattern.compile("device:(.*)");
Madan Jampani84b6b402015-02-25 17:49:54 -0800100
Madan Jampani84b6b402015-02-25 17:49:54 -0800101 private ExecutorService messageHandlingExecutor;
Madan Jampanif7536ab2015-05-07 23:23:23 -0700102 private ScheduledExecutorService transferExecutor;
Madan Jampani84b6b402015-02-25 17:49:54 -0800103 private final LeadershipEventListener leadershipEventListener =
104 new InternalDeviceMastershipEventListener();
105
106 private static final String NODE_ID_NULL = "Node ID cannot be null";
Madan Jampanif7536ab2015-05-07 23:23:23 -0700107 private static final String DEVICE_ID_NULL = "Device ID cannot be null";
108 private static final int WAIT_BEFORE_MASTERSHIP_HANDOFF_MILLIS = 3000;
Madan Jampani84b6b402015-02-25 17:49:54 -0800109
110 public static final StoreSerializer SERIALIZER = new KryoSerializer() {
111 @Override
112 protected void setupKryoPool() {
113 serializerPool = KryoNamespace.newBuilder()
114 .register(KryoNamespaces.API)
115 .register(MastershipRole.class)
116 .register(MastershipEvent.class)
Madan Jampani1af8e132015-04-30 16:41:18 -0700117 .register(MastershipEvent.Type.class)
Madan Jampani84b6b402015-02-25 17:49:54 -0800118 .build();
119 }
120 };
121
122 @Activate
123 public void activate() {
124 messageHandlingExecutor =
Madan Jampanif7536ab2015-05-07 23:23:23 -0700125 Executors.newSingleThreadExecutor(
126 groupedThreads("onos/store/device/mastership", "message-handler"));
127 transferExecutor =
128 Executors.newSingleThreadScheduledExecutor(
129 groupedThreads("onos/store/device/mastership", "mastership-transfer-executor"));
Madan Jampanibbed40422015-05-20 12:00:38 -0700130 clusterCommunicator.addSubscriber(ROLE_RELINQUISH_SUBJECT,
Madan Jampanid46e18f2015-05-04 23:19:33 -0700131 SERIALIZER::decode,
Madan Jampanif7536ab2015-05-07 23:23:23 -0700132 this::relinquishLocalRole,
Madan Jampanid46e18f2015-05-04 23:19:33 -0700133 SERIALIZER::encode,
134 messageHandlingExecutor);
Madan Jampani9bd1f152015-04-30 23:33:35 -0700135 clusterCommunicator.addSubscriber(TRANSITION_FROM_MASTER_TO_STANDBY_SUBJECT,
Madan Jampani1af8e132015-04-30 16:41:18 -0700136 SERIALIZER::decode,
Madan Jampani9bd1f152015-04-30 23:33:35 -0700137 this::transitionFromMasterToStandby,
Madan Jampani1af8e132015-04-30 16:41:18 -0700138 SERIALIZER::encode,
139 messageHandlingExecutor);
Madan Jampani84b6b402015-02-25 17:49:54 -0800140 localNodeId = clusterService.getLocalNode().id();
141 leadershipService.addListener(leadershipEventListener);
142
Madan Jampaniafeebbd2015-05-19 15:26:01 -0700143 log.info("Started");
Madan Jampani84b6b402015-02-25 17:49:54 -0800144 }
145
146 @Deactivate
147 public void deactivate() {
Madan Jampani84b6b402015-02-25 17:49:54 -0800148 clusterCommunicator.removeSubscriber(ROLE_RELINQUISH_SUBJECT);
Madan Jampani9bd1f152015-04-30 23:33:35 -0700149 clusterCommunicator.removeSubscriber(TRANSITION_FROM_MASTER_TO_STANDBY_SUBJECT);
Madan Jampani84b6b402015-02-25 17:49:54 -0800150 messageHandlingExecutor.shutdown();
Madan Jampanif7536ab2015-05-07 23:23:23 -0700151 transferExecutor.shutdown();
Madan Jampani84b6b402015-02-25 17:49:54 -0800152 leadershipService.removeListener(leadershipEventListener);
153
Madan Jampaniafeebbd2015-05-19 15:26:01 -0700154 log.info("Stopped");
Madan Jampani84b6b402015-02-25 17:49:54 -0800155 }
156
157 @Override
Madan Jampanide003d92015-05-11 17:14:20 -0700158 public CompletableFuture<MastershipRole> requestRole(DeviceId deviceId) {
Madan Jampani84b6b402015-02-25 17:49:54 -0800159 checkArgument(deviceId != null, DEVICE_ID_NULL);
160
161 String leadershipTopic = createDeviceMastershipTopic(deviceId);
162 if (connectedDevices.add(deviceId)) {
Madan Jampanide003d92015-05-11 17:14:20 -0700163 return leadershipService.runForLeadership(leadershipTopic)
164 .thenApply(leadership -> {
165 return Objects.equal(localNodeId, leadership.leader())
166 ? MastershipRole.MASTER : MastershipRole.STANDBY;
167 });
Madan Jampani84b6b402015-02-25 17:49:54 -0800168 } else {
Madan Jampani207528f2015-06-04 13:25:10 -0700169 NodeId leader = leadershipService.getLeader(leadershipTopic);
170 if (Objects.equal(localNodeId, leader)) {
Madan Jampanide003d92015-05-11 17:14:20 -0700171 return CompletableFuture.completedFuture(MastershipRole.MASTER);
Madan Jampani84b6b402015-02-25 17:49:54 -0800172 } else {
Madan Jampanide003d92015-05-11 17:14:20 -0700173 return CompletableFuture.completedFuture(MastershipRole.STANDBY);
Madan Jampani84b6b402015-02-25 17:49:54 -0800174 }
175 }
176 }
177
178 @Override
179 public MastershipRole getRole(NodeId nodeId, DeviceId deviceId) {
180 checkArgument(nodeId != null, NODE_ID_NULL);
181 checkArgument(deviceId != null, DEVICE_ID_NULL);
182
183 String leadershipTopic = createDeviceMastershipTopic(deviceId);
Madan Jampani207528f2015-06-04 13:25:10 -0700184 NodeId leader = leadershipService.getLeader(leadershipTopic);
185 if (Objects.equal(localNodeId, leader)) {
Madan Jampani84b6b402015-02-25 17:49:54 -0800186 return MastershipRole.MASTER;
187 }
Madan Jampani1b02f822015-05-19 11:01:49 -0700188 return leadershipService.getCandidates(leadershipTopic).contains(nodeId) ?
189 MastershipRole.STANDBY : MastershipRole.NONE;
Madan Jampani84b6b402015-02-25 17:49:54 -0800190 }
191
192 @Override
193 public NodeId getMaster(DeviceId deviceId) {
194 checkArgument(deviceId != null, DEVICE_ID_NULL);
195
196 String leadershipTopic = createDeviceMastershipTopic(deviceId);
Madan Jampani207528f2015-06-04 13:25:10 -0700197 return leadershipService.getLeader(leadershipTopic);
Madan Jampani84b6b402015-02-25 17:49:54 -0800198 }
199
200 @Override
201 public RoleInfo getNodes(DeviceId deviceId) {
202 checkArgument(deviceId != null, DEVICE_ID_NULL);
203
204 Map<NodeId, MastershipRole> roles = Maps.newHashMap();
205 clusterService
206 .getNodes()
Madan Jampani84b6b402015-02-25 17:49:54 -0800207 .forEach((node) -> roles.put(node.id(), getRole(node.id(), deviceId)));
208
209 NodeId master = null;
210 final List<NodeId> standbys = Lists.newLinkedList();
211
Madan Jampani86940d92015-05-06 11:47:57 -0700212 List<NodeId> candidates = leadershipService.getCandidates(createDeviceMastershipTopic(deviceId));
213
Madan Jampani84b6b402015-02-25 17:49:54 -0800214 for (Map.Entry<NodeId, MastershipRole> entry : roles.entrySet()) {
215 if (entry.getValue() == MastershipRole.MASTER) {
216 master = entry.getKey();
217 } else if (entry.getValue() == MastershipRole.STANDBY) {
218 standbys.add(entry.getKey());
219 }
220 }
221
Madan Jampani86940d92015-05-06 11:47:57 -0700222 List<NodeId> sortedStandbyList = candidates.stream().filter(standbys::contains).collect(Collectors.toList());
223
224 return new RoleInfo(master, sortedStandbyList);
Madan Jampani84b6b402015-02-25 17:49:54 -0800225 }
226
227 @Override
228 public Set<DeviceId> getDevices(NodeId nodeId) {
229 checkArgument(nodeId != null, NODE_ID_NULL);
230
231 return leadershipService
232 .ownedTopics(nodeId)
233 .stream()
234 .filter(this::isDeviceMastershipTopic)
235 .map(this::extractDeviceIdFromTopic)
236 .collect(Collectors.toSet());
237 }
238
239 @Override
Madan Jampanif7536ab2015-05-07 23:23:23 -0700240 public CompletableFuture<MastershipEvent> setMaster(NodeId nodeId, DeviceId deviceId) {
Madan Jampani84b6b402015-02-25 17:49:54 -0800241 checkArgument(nodeId != null, NODE_ID_NULL);
242 checkArgument(deviceId != null, DEVICE_ID_NULL);
243
Madan Jampani1af8e132015-04-30 16:41:18 -0700244 NodeId currentMaster = getMaster(deviceId);
245 if (nodeId.equals(currentMaster)) {
Madan Jampanif7536ab2015-05-07 23:23:23 -0700246 return CompletableFuture.completedFuture(null);
Madan Jampani1af8e132015-04-30 16:41:18 -0700247 } else {
248 String leadershipTopic = createDeviceMastershipTopic(deviceId);
249 List<NodeId> candidates = leadershipService.getCandidates(leadershipTopic);
250 if (candidates.isEmpty()) {
Madan Jampanif7536ab2015-05-07 23:23:23 -0700251 return CompletableFuture.completedFuture(null);
Madan Jampani1af8e132015-04-30 16:41:18 -0700252 }
253 if (leadershipService.makeTopCandidate(leadershipTopic, nodeId)) {
Madan Jampanif7536ab2015-05-07 23:23:23 -0700254 CompletableFuture<MastershipEvent> result = new CompletableFuture<>();
255 // There is brief wait before we step down from mastership.
256 // This is to ensure any work that happens when standby preference
257 // order changes can complete. For example: flow entries need to be backed
258 // to the new top standby (ONOS-1883)
259 // FIXME: This potentially introduces a race-condition.
260 // Right now role changes are only forced via CLI.
261 transferExecutor.schedule(() -> {
262 result.complete(transitionFromMasterToStandby(deviceId));
263 }, WAIT_BEFORE_MASTERSHIP_HANDOFF_MILLIS, TimeUnit.MILLISECONDS);
264 return result;
Madan Jampani1af8e132015-04-30 16:41:18 -0700265 } else {
266 log.warn("Failed to promote {} to mastership for {}", nodeId, deviceId);
267 }
268 }
Madan Jampanif7536ab2015-05-07 23:23:23 -0700269 return CompletableFuture.completedFuture(null);
Madan Jampani84b6b402015-02-25 17:49:54 -0800270 }
271
272 @Override
273 public MastershipTerm getTermFor(DeviceId deviceId) {
274 checkArgument(deviceId != null, DEVICE_ID_NULL);
275
276 String leadershipTopic = createDeviceMastershipTopic(deviceId);
277 Leadership leadership = leadershipService.getLeadership(leadershipTopic);
278 return leadership != null ? MastershipTerm.of(leadership.leader(), leadership.epoch()) : null;
279 }
280
281 @Override
Madan Jampanif7536ab2015-05-07 23:23:23 -0700282 public CompletableFuture<MastershipEvent> setStandby(NodeId nodeId, DeviceId deviceId) {
Madan Jampani84b6b402015-02-25 17:49:54 -0800283 checkArgument(nodeId != null, NODE_ID_NULL);
284 checkArgument(deviceId != null, DEVICE_ID_NULL);
285
Madan Jampani1af8e132015-04-30 16:41:18 -0700286 NodeId currentMaster = getMaster(deviceId);
287 if (!nodeId.equals(currentMaster)) {
Madan Jampanif7536ab2015-05-07 23:23:23 -0700288 return CompletableFuture.completedFuture(null);
Madan Jampani1af8e132015-04-30 16:41:18 -0700289 }
Madan Jampanid46e18f2015-05-04 23:19:33 -0700290
291 String leadershipTopic = createDeviceMastershipTopic(deviceId);
292 List<NodeId> candidates = leadershipService.getCandidates(leadershipTopic);
293
294 NodeId newMaster = candidates.stream()
295 .filter(candidate -> !Objects.equal(nodeId, candidate))
296 .findFirst()
297 .orElse(null);
298 log.info("Transitioning to role {} for {}. Next master: {}",
299 newMaster != null ? MastershipRole.STANDBY : MastershipRole.NONE, deviceId, newMaster);
300
301 if (newMaster != null) {
302 return setMaster(newMaster, deviceId);
303 }
304 return relinquishRole(nodeId, deviceId);
Madan Jampani84b6b402015-02-25 17:49:54 -0800305 }
306
307 @Override
Madan Jampanif7536ab2015-05-07 23:23:23 -0700308 public CompletableFuture<MastershipEvent> relinquishRole(NodeId nodeId, DeviceId deviceId) {
Madan Jampani84b6b402015-02-25 17:49:54 -0800309 checkArgument(nodeId != null, NODE_ID_NULL);
310 checkArgument(deviceId != null, DEVICE_ID_NULL);
311
Madan Jampanibbed40422015-05-20 12:00:38 -0700312 if (nodeId.equals(localNodeId)) {
313 return relinquishLocalRole(deviceId);
Madan Jampani84b6b402015-02-25 17:49:54 -0800314 }
Madan Jampanibbed40422015-05-20 12:00:38 -0700315
316 log.debug("Forwarding request to relinquish "
317 + "role for device {} to {}", deviceId, nodeId);
318 return clusterCommunicator.sendAndReceive(
319 deviceId,
320 ROLE_RELINQUISH_SUBJECT,
321 SERIALIZER::encode,
322 SERIALIZER::decode,
323 nodeId);
Madan Jampanif7536ab2015-05-07 23:23:23 -0700324 }
325
Madan Jampanibbed40422015-05-20 12:00:38 -0700326 private CompletableFuture<MastershipEvent> relinquishLocalRole(DeviceId deviceId) {
Madan Jampanif7536ab2015-05-07 23:23:23 -0700327 checkArgument(deviceId != null, DEVICE_ID_NULL);
Madan Jampani84b6b402015-02-25 17:49:54 -0800328
329 // Check if this node is can be managed by this node.
330 if (!connectedDevices.contains(deviceId)) {
Madan Jampanibbed40422015-05-20 12:00:38 -0700331 return CompletableFuture.completedFuture(null);
Madan Jampani84b6b402015-02-25 17:49:54 -0800332 }
333
334 String leadershipTopic = createDeviceMastershipTopic(deviceId);
Madan Jampani9bd1f152015-04-30 23:33:35 -0700335 NodeId currentLeader = leadershipService.getLeader(leadershipTopic);
Madan Jampani84b6b402015-02-25 17:49:54 -0800336
Madan Jampani9bd1f152015-04-30 23:33:35 -0700337 MastershipEvent.Type eventType = Objects.equal(currentLeader, localNodeId)
338 ? MastershipEvent.Type.MASTER_CHANGED
339 : MastershipEvent.Type.BACKUPS_CHANGED;
Madan Jampani84b6b402015-02-25 17:49:54 -0800340
341 connectedDevices.remove(deviceId);
Madan Jampanibbed40422015-05-20 12:00:38 -0700342 return leadershipService.withdraw(leadershipTopic)
343 .thenApply(v -> new MastershipEvent(eventType, deviceId, getNodes(deviceId)));
Madan Jampani84b6b402015-02-25 17:49:54 -0800344 }
345
Madan Jampani9bd1f152015-04-30 23:33:35 -0700346 private MastershipEvent transitionFromMasterToStandby(DeviceId deviceId) {
Madan Jampani1af8e132015-04-30 16:41:18 -0700347 checkArgument(deviceId != null, DEVICE_ID_NULL);
348
349 NodeId currentMaster = getMaster(deviceId);
350 if (currentMaster == null) {
351 return null;
352 }
353
354 if (!currentMaster.equals(localNodeId)) {
355 log.info("Forwarding request to relinquish "
356 + "mastership for device {} to {}", deviceId, currentMaster);
357 return futureGetOrElse(clusterCommunicator.sendAndReceive(
358 deviceId,
Madan Jampani9bd1f152015-04-30 23:33:35 -0700359 TRANSITION_FROM_MASTER_TO_STANDBY_SUBJECT,
Madan Jampani1af8e132015-04-30 16:41:18 -0700360 SERIALIZER::encode,
361 SERIALIZER::decode,
362 currentMaster), null);
363 }
364
Madan Jampani9bd1f152015-04-30 23:33:35 -0700365 return leadershipService.stepdown(createDeviceMastershipTopic(deviceId))
366 ? new MastershipEvent(MastershipEvent.Type.MASTER_CHANGED, deviceId, getNodes(deviceId)) : null;
Madan Jampani1af8e132015-04-30 16:41:18 -0700367 }
368
Madan Jampani84b6b402015-02-25 17:49:54 -0800369 @Override
370 public void relinquishAllRole(NodeId nodeId) {
371 // Noop. LeadershipService already takes care of detecting and purging deadlocks.
372 }
373
Madan Jampani84b6b402015-02-25 17:49:54 -0800374 private class InternalDeviceMastershipEventListener implements LeadershipEventListener {
375 @Override
376 public void event(LeadershipEvent event) {
377 Leadership leadership = event.subject();
378 if (!isDeviceMastershipTopic(leadership.topic())) {
379 return;
380 }
Madan Jampani84b6b402015-02-25 17:49:54 -0800381 DeviceId deviceId = extractDeviceIdFromTopic(leadership.topic());
Thomas Vachuska4b839c72015-05-18 15:43:03 -0700382 switch (event.type()) {
383 case LEADER_ELECTED:
384 notifyDelegate(new MastershipEvent(MASTER_CHANGED, deviceId, getNodes(deviceId)));
385 break;
386 case LEADER_REELECTED:
387 // There is no concept of leader re-election in the new distributed leadership manager.
388 throw new IllegalStateException("Unexpected event type");
389 case LEADER_BOOTED:
390 notifyDelegate(new MastershipEvent(MASTER_CHANGED, deviceId, getNodes(deviceId)));
391 break;
392 case CANDIDATES_CHANGED:
393 notifyDelegate(new MastershipEvent(BACKUPS_CHANGED, deviceId, getNodes(deviceId)));
394 break;
395 default:
396 return;
Madan Jampani84b6b402015-02-25 17:49:54 -0800397 }
398 }
399 }
400
401 private String createDeviceMastershipTopic(DeviceId deviceId) {
Madan Jampani5756c352015-04-29 00:23:58 -0700402 return String.format("device:%s", deviceId.toString());
Madan Jampani84b6b402015-02-25 17:49:54 -0800403 }
404
405 private DeviceId extractDeviceIdFromTopic(String topic) {
406 Matcher m = DEVICE_MASTERSHIP_TOPIC_PATTERN.matcher(topic);
407 if (m.matches()) {
408 return DeviceId.deviceId(m.group(1));
409 } else {
410 throw new IllegalArgumentException("Invalid device mastership topic: " + topic);
411 }
412 }
413
414 private boolean isDeviceMastershipTopic(String topic) {
415 Matcher m = DEVICE_MASTERSHIP_TOPIC_PATTERN.matcher(topic);
416 return m.matches();
417 }
418
Madan Jampanic26eede2015-04-16 11:42:16 -0700419}