blob: 1d30607e695bcd007c4453d5855a907576b8cde9 [file] [log] [blame]
alshabib10580802015-02-18 18:30:33 -08001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.group.impl;
17
Jonathan Hart6ec029a2015-03-24 17:12:35 -070018import com.google.common.collect.FluentIterable;
19import com.google.common.collect.Iterables;
alshabib10580802015-02-18 18:30:33 -080020import org.apache.felix.scr.annotations.Activate;
21import org.apache.felix.scr.annotations.Component;
22import org.apache.felix.scr.annotations.Deactivate;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070023import org.apache.felix.scr.annotations.Reference;
24import org.apache.felix.scr.annotations.ReferenceCardinality;
alshabib10580802015-02-18 18:30:33 -080025import org.apache.felix.scr.annotations.Service;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070026import org.onlab.util.KryoNamespace;
alshabib10580802015-02-18 18:30:33 -080027import org.onlab.util.NewConcurrentHashMap;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070028import org.onosproject.cluster.ClusterService;
29import org.onosproject.core.DefaultApplicationId;
alshabib10580802015-02-18 18:30:33 -080030import org.onosproject.core.DefaultGroupId;
31import org.onosproject.core.GroupId;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070032import org.onosproject.mastership.MastershipService;
alshabib10580802015-02-18 18:30:33 -080033import org.onosproject.net.DeviceId;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070034import org.onosproject.net.MastershipRole;
35import org.onosproject.net.PortNumber;
36import org.onosproject.net.flow.DefaultTrafficTreatment;
37import org.onosproject.net.flow.FlowRule;
38import org.onosproject.net.flow.instructions.Instructions;
39import org.onosproject.net.flow.instructions.L0ModificationInstruction;
40import org.onosproject.net.flow.instructions.L2ModificationInstruction;
41import org.onosproject.net.flow.instructions.L3ModificationInstruction;
alshabib10580802015-02-18 18:30:33 -080042import org.onosproject.net.group.DefaultGroup;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070043import org.onosproject.net.group.DefaultGroupBucket;
alshabib10580802015-02-18 18:30:33 -080044import org.onosproject.net.group.DefaultGroupDescription;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070045import org.onosproject.net.group.DefaultGroupKey;
alshabib10580802015-02-18 18:30:33 -080046import org.onosproject.net.group.Group;
47import org.onosproject.net.group.Group.GroupState;
48import org.onosproject.net.group.GroupBucket;
49import org.onosproject.net.group.GroupBuckets;
50import org.onosproject.net.group.GroupDescription;
51import org.onosproject.net.group.GroupEvent;
52import org.onosproject.net.group.GroupEvent.Type;
53import org.onosproject.net.group.GroupKey;
54import org.onosproject.net.group.GroupOperation;
55import org.onosproject.net.group.GroupStore;
56import org.onosproject.net.group.GroupStoreDelegate;
57import org.onosproject.net.group.StoredGroupEntry;
58import org.onosproject.store.AbstractStore;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070059import org.onosproject.store.Timestamp;
60import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
61import org.onosproject.store.cluster.messaging.ClusterMessage;
62import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070063import org.onosproject.store.impl.MultiValuedTimestamp;
64import org.onosproject.store.serializers.KryoNamespaces;
Jonathan Hart6ec029a2015-03-24 17:12:35 -070065import org.onosproject.store.service.ClockService;
66import org.onosproject.store.service.EventuallyConsistentMap;
67import org.onosproject.store.service.EventuallyConsistentMapBuilder;
68import org.onosproject.store.service.EventuallyConsistentMapEvent;
69import org.onosproject.store.service.EventuallyConsistentMapListener;
70import org.onosproject.store.service.StorageService;
alshabib10580802015-02-18 18:30:33 -080071import org.slf4j.Logger;
72
Jonathan Hart6ec029a2015-03-24 17:12:35 -070073import java.net.URI;
74import java.util.ArrayList;
75import java.util.HashMap;
76import java.util.List;
77import java.util.Objects;
78import java.util.concurrent.ConcurrentHashMap;
79import java.util.concurrent.ConcurrentMap;
80import java.util.concurrent.ExecutorService;
81import java.util.concurrent.Executors;
82import java.util.concurrent.atomic.AtomicInteger;
83import java.util.concurrent.atomic.AtomicLong;
84import java.util.stream.Collectors;
85
86import static org.apache.commons.lang3.concurrent.ConcurrentUtils.createIfAbsentUnchecked;
87import static org.onlab.util.Tools.groupedThreads;
88import static org.slf4j.LoggerFactory.getLogger;
alshabib10580802015-02-18 18:30:33 -080089
90/**
91 * Manages inventory of group entries using trivial in-memory implementation.
92 */
93@Component(immediate = true)
94@Service
95public class DistributedGroupStore
96 extends AbstractStore<GroupEvent, GroupStoreDelegate>
97 implements GroupStore {
98
99 private final Logger log = getLogger(getClass());
100
101 private final int dummyId = 0xffffffff;
102 private final GroupId dummyGroupId = new DefaultGroupId(dummyId);
103
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700104 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
105 protected ClusterCommunicationService clusterCommunicator;
106
107 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
108 protected ClusterService clusterService;
109
110 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700111 protected StorageService storageService;
112
113 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700114 protected MastershipService mastershipService;
115
116 // Per device group table with (device id + app cookie) as key
117 private EventuallyConsistentMap<GroupStoreKeyMapKey,
118 StoredGroupEntry> groupStoreEntriesByKey = null;
119 // Per device group table with (device id + group id) as key
120 private EventuallyConsistentMap<GroupStoreIdMapKey,
121 StoredGroupEntry> groupStoreEntriesById = null;
122 private EventuallyConsistentMap<GroupStoreKeyMapKey,
123 StoredGroupEntry> auditPendingReqQueue = null;
alshabib10580802015-02-18 18:30:33 -0800124 private final ConcurrentMap<DeviceId, ConcurrentMap<GroupId, Group>>
125 extraneousGroupEntriesById = new ConcurrentHashMap<>();
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700126 private ExecutorService messageHandlingExecutor;
127 private static final int MESSAGE_HANDLER_THREAD_POOL_SIZE = 1;
alshabib10580802015-02-18 18:30:33 -0800128
129 private final HashMap<DeviceId, Boolean> deviceAuditStatus =
130 new HashMap<DeviceId, Boolean>();
131
132 private final AtomicInteger groupIdGen = new AtomicInteger();
133
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700134 private KryoNamespace.Builder kryoBuilder = null;
135
alshabib10580802015-02-18 18:30:33 -0800136 @Activate
137 public void activate() {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700138 kryoBuilder = new KryoNamespace.Builder()
139 .register(DefaultGroup.class,
140 DefaultGroupBucket.class,
141 DefaultGroupDescription.class,
142 DefaultGroupKey.class,
143 GroupDescription.Type.class,
144 Group.GroupState.class,
145 GroupBuckets.class,
146 DefaultGroupId.class,
147 GroupStoreMessage.class,
148 GroupStoreMessage.Type.class,
149 UpdateType.class,
150 GroupStoreMessageSubjects.class,
151 MultiValuedTimestamp.class,
152 GroupStoreKeyMapKey.class,
153 GroupStoreIdMapKey.class,
154 GroupStoreMapKey.class
155 )
156 .register(URI.class)
157 .register(DeviceId.class)
158 .register(PortNumber.class)
159 .register(DefaultApplicationId.class)
160 .register(DefaultTrafficTreatment.class,
161 Instructions.DropInstruction.class,
162 Instructions.OutputInstruction.class,
163 Instructions.GroupInstruction.class,
164 Instructions.TableTypeTransition.class,
165 FlowRule.Type.class,
166 L0ModificationInstruction.class,
167 L0ModificationInstruction.L0SubType.class,
168 L0ModificationInstruction.ModLambdaInstruction.class,
169 L2ModificationInstruction.class,
170 L2ModificationInstruction.L2SubType.class,
171 L2ModificationInstruction.ModEtherInstruction.class,
172 L2ModificationInstruction.PushHeaderInstructions.class,
173 L2ModificationInstruction.ModVlanIdInstruction.class,
174 L2ModificationInstruction.ModVlanPcpInstruction.class,
175 L2ModificationInstruction.ModMplsLabelInstruction.class,
176 L2ModificationInstruction.ModMplsTtlInstruction.class,
177 L3ModificationInstruction.class,
178 L3ModificationInstruction.L3SubType.class,
179 L3ModificationInstruction.ModIPInstruction.class,
180 L3ModificationInstruction.ModIPv6FlowLabelInstruction.class,
181 L3ModificationInstruction.ModTtlInstruction.class,
182 org.onlab.packet.MplsLabel.class
183 )
184 .register(org.onosproject.cluster.NodeId.class)
185 .register(KryoNamespaces.BASIC)
186 .register(KryoNamespaces.MISC);
187
188 messageHandlingExecutor = Executors.
189 newFixedThreadPool(MESSAGE_HANDLER_THREAD_POOL_SIZE,
190 groupedThreads("onos/store/group",
191 "message-handlers"));
192 clusterCommunicator.
193 addSubscriber(GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST,
194 new ClusterGroupMsgHandler(),
195 messageHandlingExecutor);
196
197 log.debug("Creating EC map groupstorekeymap");
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700198 EventuallyConsistentMapBuilder<GroupStoreKeyMapKey, StoredGroupEntry>
199 keyMapBuilder = storageService.eventuallyConsistentMapBuilder();
200
201 groupStoreEntriesByKey = keyMapBuilder
202 .withName("groupstorekeymap")
203 .withSerializer(kryoBuilder)
204 .withClockService(new GroupStoreLogicalClockManager<>())
205 .build();
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700206 log.trace("Current size {}", groupStoreEntriesByKey.size());
207
208 log.debug("Creating EC map groupstoreidmap");
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700209 EventuallyConsistentMapBuilder<GroupStoreIdMapKey, StoredGroupEntry>
210 idMapBuilder = storageService.eventuallyConsistentMapBuilder();
211
212 groupStoreEntriesById = idMapBuilder
213 .withName("groupstoreidmap")
214 .withSerializer(kryoBuilder)
215 .withClockService(new GroupStoreLogicalClockManager<>())
216 .build();
217
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700218 groupStoreEntriesById.addListener(new GroupStoreIdMapListener());
219 log.trace("Current size {}", groupStoreEntriesById.size());
220
221 log.debug("Creating EC map pendinggroupkeymap");
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700222 EventuallyConsistentMapBuilder<GroupStoreKeyMapKey, StoredGroupEntry>
223 auditMapBuilder = storageService.eventuallyConsistentMapBuilder();
224
225 auditPendingReqQueue = auditMapBuilder
226 .withName("pendinggroupkeymap")
227 .withSerializer(kryoBuilder)
228 .withClockService(new GroupStoreLogicalClockManager<>())
229 .build();
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700230 log.trace("Current size {}", auditPendingReqQueue.size());
231
alshabib10580802015-02-18 18:30:33 -0800232 log.info("Started");
233 }
234
235 @Deactivate
236 public void deactivate() {
alshabib10580802015-02-18 18:30:33 -0800237 log.info("Stopped");
238 }
239
alshabib10580802015-02-18 18:30:33 -0800240 private static NewConcurrentHashMap<GroupId, Group>
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700241 lazyEmptyExtraneousGroupIdTable() {
alshabib10580802015-02-18 18:30:33 -0800242 return NewConcurrentHashMap.<GroupId, Group>ifNeeded();
243 }
244
245 /**
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700246 * Returns the group store eventual consistent key map.
alshabib10580802015-02-18 18:30:33 -0800247 *
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700248 * @return Map representing group key table.
alshabib10580802015-02-18 18:30:33 -0800249 */
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700250 private EventuallyConsistentMap<GroupStoreKeyMapKey, StoredGroupEntry>
251 getGroupStoreKeyMap() {
252 return groupStoreEntriesByKey;
alshabib10580802015-02-18 18:30:33 -0800253 }
254
255 /**
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700256 * Returns the group store eventual consistent id map.
alshabib10580802015-02-18 18:30:33 -0800257 *
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700258 * @return Map representing group id table.
alshabib10580802015-02-18 18:30:33 -0800259 */
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700260 private EventuallyConsistentMap<GroupStoreIdMapKey, StoredGroupEntry>
261 getGroupStoreIdMap() {
262 return groupStoreEntriesById;
alshabib10580802015-02-18 18:30:33 -0800263 }
264
265 /**
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700266 * Returns the pending group request table.
alshabib10580802015-02-18 18:30:33 -0800267 *
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700268 * @return Map representing group key table.
alshabib10580802015-02-18 18:30:33 -0800269 */
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700270 private EventuallyConsistentMap<GroupStoreKeyMapKey, StoredGroupEntry>
271 getPendingGroupKeyTable() {
272 return auditPendingReqQueue;
alshabib10580802015-02-18 18:30:33 -0800273 }
274
275 /**
276 * Returns the extraneous group id table for specified device.
277 *
278 * @param deviceId identifier of the device
279 * @return Map representing group key table of given device.
280 */
281 private ConcurrentMap<GroupId, Group>
282 getExtraneousGroupIdTable(DeviceId deviceId) {
283 return createIfAbsentUnchecked(extraneousGroupEntriesById,
284 deviceId,
285 lazyEmptyExtraneousGroupIdTable());
286 }
287
288 /**
289 * Returns the number of groups for the specified device in the store.
290 *
291 * @return number of groups for the specified device
292 */
293 @Override
294 public int getGroupCount(DeviceId deviceId) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700295 return (getGroups(deviceId) != null) ?
296 Iterables.size(getGroups(deviceId)) : 0;
alshabib10580802015-02-18 18:30:33 -0800297 }
298
299 /**
300 * Returns the groups associated with a device.
301 *
302 * @param deviceId the device ID
303 *
304 * @return the group entries
305 */
306 @Override
307 public Iterable<Group> getGroups(DeviceId deviceId) {
308 // flatten and make iterator unmodifiable
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700309 log.trace("getGroups: for device {} total number of groups {}",
310 deviceId, getGroupStoreKeyMap().values().size());
311 return FluentIterable.from(getGroupStoreKeyMap().values())
312 .filter(input -> input.deviceId().equals(deviceId))
313 .transform(input -> input);
alshabib10580802015-02-18 18:30:33 -0800314 }
315
316 /**
317 * Returns the stored group entry.
318 *
319 * @param deviceId the device ID
320 * @param appCookie the group key
321 *
322 * @return a group associated with the key
323 */
324 @Override
325 public Group getGroup(DeviceId deviceId, GroupKey appCookie) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700326 return getStoredGroupEntry(deviceId, appCookie);
327 }
328
329 private StoredGroupEntry getStoredGroupEntry(DeviceId deviceId,
330 GroupKey appCookie) {
331 return getGroupStoreKeyMap().get(new GroupStoreKeyMapKey(deviceId,
332 appCookie));
333 }
334
335 @Override
336 public Group getGroup(DeviceId deviceId, GroupId groupId) {
337 return getStoredGroupEntry(deviceId, groupId);
338 }
339
340 private StoredGroupEntry getStoredGroupEntry(DeviceId deviceId,
341 GroupId groupId) {
342 return getGroupStoreIdMap().get(new GroupStoreIdMapKey(deviceId,
343 groupId));
alshabib10580802015-02-18 18:30:33 -0800344 }
345
346 private int getFreeGroupIdValue(DeviceId deviceId) {
347 int freeId = groupIdGen.incrementAndGet();
348
349 while (true) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700350 Group existing = getGroup(deviceId, new DefaultGroupId(freeId));
alshabib10580802015-02-18 18:30:33 -0800351 if (existing == null) {
352 existing = (
353 extraneousGroupEntriesById.get(deviceId) != null) ?
354 extraneousGroupEntriesById.get(deviceId).
355 get(new DefaultGroupId(freeId)) :
356 null;
357 }
358 if (existing != null) {
359 freeId = groupIdGen.incrementAndGet();
360 } else {
361 break;
362 }
363 }
364 return freeId;
365 }
366
367 /**
368 * Stores a new group entry using the information from group description.
369 *
370 * @param groupDesc group description to be used to create group entry
371 */
372 @Override
373 public void storeGroupDescription(GroupDescription groupDesc) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700374 log.trace("In storeGroupDescription");
alshabib10580802015-02-18 18:30:33 -0800375 // Check if a group is existing with the same key
376 if (getGroup(groupDesc.deviceId(), groupDesc.appCookie()) != null) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700377 log.warn("Group already exists with the same key {}",
378 groupDesc.appCookie());
alshabib10580802015-02-18 18:30:33 -0800379 return;
380 }
381
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700382 // Check if group to be created by a remote instance
383 if (mastershipService.getLocalRole(
384 groupDesc.deviceId()) != MastershipRole.MASTER) {
385 log.debug("Device {} local role is not MASTER",
386 groupDesc.deviceId());
387 GroupStoreMessage groupOp = GroupStoreMessage.
388 createGroupAddRequestMsg(groupDesc.deviceId(),
389 groupDesc);
390 ClusterMessage message = new ClusterMessage(
391 clusterService.getLocalNode().id(),
392 GroupStoreMessageSubjects.
393 REMOTE_GROUP_OP_REQUEST,
394 kryoBuilder.build().serialize(groupOp));
395 if (!clusterCommunicator.unicast(message,
396 mastershipService.
397 getMasterFor(
398 groupDesc.deviceId()))) {
399 log.warn("Failed to send request to master: {} to {}",
400 message,
401 mastershipService.getMasterFor(groupDesc.deviceId()));
402 //TODO: Send Group operation failure event
403 }
404 log.debug("Sent Group operation request for device {} "
405 + "to remote MASTER {}",
406 groupDesc.deviceId(),
407 mastershipService.getMasterFor(groupDesc.deviceId()));
alshabib10580802015-02-18 18:30:33 -0800408 return;
409 }
410
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700411 log.debug("Store group for device {} is getting handled locally",
412 groupDesc.deviceId());
alshabib10580802015-02-18 18:30:33 -0800413 storeGroupDescriptionInternal(groupDesc);
414 }
415
416 private void storeGroupDescriptionInternal(GroupDescription groupDesc) {
417 // Check if a group is existing with the same key
418 if (getGroup(groupDesc.deviceId(), groupDesc.appCookie()) != null) {
419 return;
420 }
421
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700422 if (deviceAuditStatus.get(groupDesc.deviceId()) == null) {
423 // Device group audit has not completed yet
424 // Add this group description to pending group key table
425 // Create a group entry object with Dummy Group ID
426 log.debug("storeGroupDescriptionInternal: Device {} AUDIT "
427 + "pending...Queuing Group ADD request",
428 groupDesc.deviceId());
429 StoredGroupEntry group = new DefaultGroup(dummyGroupId, groupDesc);
430 group.setState(GroupState.WAITING_AUDIT_COMPLETE);
431 EventuallyConsistentMap<GroupStoreKeyMapKey, StoredGroupEntry> pendingKeyTable =
432 getPendingGroupKeyTable();
433 pendingKeyTable.put(new GroupStoreKeyMapKey(groupDesc.deviceId(),
434 groupDesc.appCookie()),
435 group);
436 return;
437 }
438
alshabib10580802015-02-18 18:30:33 -0800439 // Get a new group identifier
440 GroupId id = new DefaultGroupId(getFreeGroupIdValue(groupDesc.deviceId()));
441 // Create a group entry object
442 StoredGroupEntry group = new DefaultGroup(id, groupDesc);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700443 // Insert the newly created group entry into key and id maps
444 getGroupStoreKeyMap().
445 put(new GroupStoreKeyMapKey(groupDesc.deviceId(),
446 groupDesc.appCookie()), group);
447 getGroupStoreIdMap().
448 put(new GroupStoreIdMapKey(groupDesc.deviceId(),
449 id), group);
alshabib10580802015-02-18 18:30:33 -0800450 notifyDelegate(new GroupEvent(GroupEvent.Type.GROUP_ADD_REQUESTED,
451 group));
452 }
453
454 /**
455 * Updates the existing group entry with the information
456 * from group description.
457 *
458 * @param deviceId the device ID
459 * @param oldAppCookie the current group key
460 * @param type update type
461 * @param newBuckets group buckets for updates
462 * @param newAppCookie optional new group key
463 */
464 @Override
465 public void updateGroupDescription(DeviceId deviceId,
466 GroupKey oldAppCookie,
467 UpdateType type,
468 GroupBuckets newBuckets,
469 GroupKey newAppCookie) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700470 // Check if group update to be done by a remote instance
471 if (mastershipService.
472 getLocalRole(deviceId) != MastershipRole.MASTER) {
473 GroupStoreMessage groupOp = GroupStoreMessage.
474 createGroupUpdateRequestMsg(deviceId,
475 oldAppCookie,
476 type,
477 newBuckets,
478 newAppCookie);
479 ClusterMessage message =
480 new ClusterMessage(clusterService.getLocalNode().id(),
481 GroupStoreMessageSubjects.
482 REMOTE_GROUP_OP_REQUEST,
483 kryoBuilder.build().serialize(groupOp));
484 if (!clusterCommunicator.unicast(message,
485 mastershipService.
486 getMasterFor(deviceId))) {
487 log.warn("Failed to send request to master: {} to {}",
488 message,
489 mastershipService.getMasterFor(deviceId));
490 //TODO: Send Group operation failure event
491 }
492 return;
493 }
494 updateGroupDescriptionInternal(deviceId,
495 oldAppCookie,
496 type,
497 newBuckets,
498 newAppCookie);
499 }
500
501 private void updateGroupDescriptionInternal(DeviceId deviceId,
502 GroupKey oldAppCookie,
503 UpdateType type,
504 GroupBuckets newBuckets,
505 GroupKey newAppCookie) {
alshabib10580802015-02-18 18:30:33 -0800506 // Check if a group is existing with the provided key
507 Group oldGroup = getGroup(deviceId, oldAppCookie);
508 if (oldGroup == null) {
509 return;
510 }
511
512 List<GroupBucket> newBucketList = getUpdatedBucketList(oldGroup,
513 type,
514 newBuckets);
515 if (newBucketList != null) {
516 // Create a new group object from the old group
517 GroupBuckets updatedBuckets = new GroupBuckets(newBucketList);
518 GroupKey newCookie = (newAppCookie != null) ? newAppCookie : oldAppCookie;
519 GroupDescription updatedGroupDesc = new DefaultGroupDescription(
520 oldGroup.deviceId(),
521 oldGroup.type(),
522 updatedBuckets,
523 newCookie,
524 oldGroup.appId());
525 StoredGroupEntry newGroup = new DefaultGroup(oldGroup.id(),
526 updatedGroupDesc);
527 newGroup.setState(GroupState.PENDING_UPDATE);
528 newGroup.setLife(oldGroup.life());
529 newGroup.setPackets(oldGroup.packets());
530 newGroup.setBytes(oldGroup.bytes());
531 // Remove the old entry from maps and add new entry using new key
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700532 getGroupStoreKeyMap().remove(new GroupStoreKeyMapKey(oldGroup.deviceId(),
533 oldGroup.appCookie()));
534 getGroupStoreIdMap().remove(new GroupStoreIdMapKey(oldGroup.deviceId(),
535 oldGroup.id()));
536 getGroupStoreKeyMap().
537 put(new GroupStoreKeyMapKey(newGroup.deviceId(),
538 newGroup.appCookie()), newGroup);
539 getGroupStoreIdMap().
540 put(new GroupStoreIdMapKey(newGroup.deviceId(),
541 newGroup.id()), newGroup);
542
alshabib10580802015-02-18 18:30:33 -0800543 notifyDelegate(new GroupEvent(Type.GROUP_UPDATE_REQUESTED, newGroup));
544 }
545 }
546
547 private List<GroupBucket> getUpdatedBucketList(Group oldGroup,
548 UpdateType type,
549 GroupBuckets buckets) {
550 GroupBuckets oldBuckets = oldGroup.buckets();
551 List<GroupBucket> newBucketList = new ArrayList<GroupBucket>(
552 oldBuckets.buckets());
553 boolean groupDescUpdated = false;
554
555 if (type == UpdateType.ADD) {
556 // Check if the any of the new buckets are part of
557 // the old bucket list
558 for (GroupBucket addBucket:buckets.buckets()) {
559 if (!newBucketList.contains(addBucket)) {
560 newBucketList.add(addBucket);
561 groupDescUpdated = true;
562 }
563 }
564 } else if (type == UpdateType.REMOVE) {
565 // Check if the to be removed buckets are part of the
566 // old bucket list
567 for (GroupBucket removeBucket:buckets.buckets()) {
568 if (newBucketList.contains(removeBucket)) {
569 newBucketList.remove(removeBucket);
570 groupDescUpdated = true;
571 }
572 }
573 }
574
575 if (groupDescUpdated) {
576 return newBucketList;
577 } else {
578 return null;
579 }
580 }
581
582 /**
583 * Triggers deleting the existing group entry.
584 *
585 * @param deviceId the device ID
586 * @param appCookie the group key
587 */
588 @Override
589 public void deleteGroupDescription(DeviceId deviceId,
590 GroupKey appCookie) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700591 // Check if group to be deleted by a remote instance
592 if (mastershipService.
593 getLocalRole(deviceId) != MastershipRole.MASTER) {
594 GroupStoreMessage groupOp = GroupStoreMessage.
595 createGroupDeleteRequestMsg(deviceId,
596 appCookie);
597 ClusterMessage message =
598 new ClusterMessage(clusterService.getLocalNode().id(),
599 GroupStoreMessageSubjects.
600 REMOTE_GROUP_OP_REQUEST,
601 kryoBuilder.build().serialize(groupOp));
602 if (!clusterCommunicator.unicast(message,
603 mastershipService.
604 getMasterFor(deviceId))) {
605 log.warn("Failed to send request to master: {} to {}",
606 message,
607 mastershipService.getMasterFor(deviceId));
608 //TODO: Send Group operation failure event
609 }
610 return;
611 }
612 deleteGroupDescriptionInternal(deviceId, appCookie);
613 }
614
615 private void deleteGroupDescriptionInternal(DeviceId deviceId,
616 GroupKey appCookie) {
alshabib10580802015-02-18 18:30:33 -0800617 // Check if a group is existing with the provided key
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700618 StoredGroupEntry existing = getStoredGroupEntry(deviceId, appCookie);
alshabib10580802015-02-18 18:30:33 -0800619 if (existing == null) {
620 return;
621 }
622
623 synchronized (existing) {
624 existing.setState(GroupState.PENDING_DELETE);
625 }
626 notifyDelegate(new GroupEvent(Type.GROUP_REMOVE_REQUESTED, existing));
627 }
628
629 /**
630 * Stores a new group entry, or updates an existing entry.
631 *
632 * @param group group entry
633 */
634 @Override
635 public void addOrUpdateGroupEntry(Group group) {
636 // check if this new entry is an update to an existing entry
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700637 StoredGroupEntry existing = getStoredGroupEntry(group.deviceId(),
638 group.id());
alshabib10580802015-02-18 18:30:33 -0800639 GroupEvent event = null;
640
641 if (existing != null) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700642 log.trace("addOrUpdateGroupEntry: updating group "
643 + "entry {} in device {}",
644 group.id(),
645 group.deviceId());
alshabib10580802015-02-18 18:30:33 -0800646 synchronized (existing) {
647 existing.setLife(group.life());
648 existing.setPackets(group.packets());
649 existing.setBytes(group.bytes());
650 if (existing.state() == GroupState.PENDING_ADD) {
651 existing.setState(GroupState.ADDED);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700652 existing.setIsGroupStateAddedFirstTime(true);
alshabib10580802015-02-18 18:30:33 -0800653 event = new GroupEvent(Type.GROUP_ADDED, existing);
654 } else {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700655 existing.setState(GroupState.ADDED);
656 existing.setIsGroupStateAddedFirstTime(false);
alshabib10580802015-02-18 18:30:33 -0800657 event = new GroupEvent(Type.GROUP_UPDATED, existing);
658 }
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700659 //Re-PUT map entries to trigger map update events
660 getGroupStoreKeyMap().
661 put(new GroupStoreKeyMapKey(existing.deviceId(),
662 existing.appCookie()), existing);
663 getGroupStoreIdMap().
664 put(new GroupStoreIdMapKey(existing.deviceId(),
665 existing.id()), existing);
alshabib10580802015-02-18 18:30:33 -0800666 }
667 }
668
669 if (event != null) {
670 notifyDelegate(event);
671 }
672 }
673
674 /**
675 * Removes the group entry from store.
676 *
677 * @param group group entry
678 */
679 @Override
680 public void removeGroupEntry(Group group) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700681 StoredGroupEntry existing = getStoredGroupEntry(group.deviceId(),
682 group.id());
alshabib10580802015-02-18 18:30:33 -0800683
684 if (existing != null) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700685 log.trace("removeGroupEntry: removing group "
686 + "entry {} in device {}",
687 group.id(),
688 group.deviceId());
689 getGroupStoreKeyMap().remove(new GroupStoreKeyMapKey(existing.deviceId(),
690 existing.appCookie()));
691 getGroupStoreIdMap().remove(new GroupStoreIdMapKey(existing.deviceId(),
692 existing.id()));
alshabib10580802015-02-18 18:30:33 -0800693 notifyDelegate(new GroupEvent(Type.GROUP_REMOVED, existing));
694 }
695 }
696
697 @Override
698 public void deviceInitialAuditCompleted(DeviceId deviceId,
699 boolean completed) {
700 synchronized (deviceAuditStatus) {
701 if (completed) {
702 log.debug("deviceInitialAuditCompleted: AUDIT "
703 + "completed for device {}", deviceId);
704 deviceAuditStatus.put(deviceId, true);
705 // Execute all pending group requests
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700706 List<StoredGroupEntry> pendingGroupRequests =
707 getPendingGroupKeyTable().values()
708 .stream()
709 .filter(g-> g.deviceId().equals(deviceId))
710 .collect(Collectors.toList());
711 log.trace("deviceInitialAuditCompleted: processing "
712 + "pending group add requests for device {} and "
713 + "number of pending requests {}",
714 deviceId,
715 pendingGroupRequests.size());
716 for (Group group:pendingGroupRequests) {
alshabib10580802015-02-18 18:30:33 -0800717 GroupDescription tmp = new DefaultGroupDescription(
718 group.deviceId(),
719 group.type(),
720 group.buckets(),
721 group.appCookie(),
722 group.appId());
723 storeGroupDescriptionInternal(tmp);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700724 getPendingGroupKeyTable().
725 remove(new GroupStoreKeyMapKey(deviceId, group.appCookie()));
alshabib10580802015-02-18 18:30:33 -0800726 }
alshabib10580802015-02-18 18:30:33 -0800727 } else {
728 if (deviceAuditStatus.get(deviceId)) {
729 log.debug("deviceInitialAuditCompleted: Clearing AUDIT "
730 + "status for device {}", deviceId);
731 deviceAuditStatus.put(deviceId, false);
732 }
733 }
734 }
735 }
736
737 @Override
738 public boolean deviceInitialAuditStatus(DeviceId deviceId) {
739 synchronized (deviceAuditStatus) {
740 return (deviceAuditStatus.get(deviceId) != null)
741 ? deviceAuditStatus.get(deviceId) : false;
742 }
743 }
744
745 @Override
746 public void groupOperationFailed(DeviceId deviceId, GroupOperation operation) {
747
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700748 StoredGroupEntry existing = getStoredGroupEntry(deviceId,
749 operation.groupId());
alshabib10580802015-02-18 18:30:33 -0800750
751 if (existing == null) {
752 log.warn("No group entry with ID {} found ", operation.groupId());
753 return;
754 }
755
756 switch (operation.opType()) {
757 case ADD:
758 notifyDelegate(new GroupEvent(Type.GROUP_ADD_FAILED, existing));
759 break;
760 case MODIFY:
761 notifyDelegate(new GroupEvent(Type.GROUP_UPDATE_FAILED, existing));
762 break;
763 case DELETE:
764 notifyDelegate(new GroupEvent(Type.GROUP_REMOVE_FAILED, existing));
765 break;
766 default:
767 log.warn("Unknown group operation type {}", operation.opType());
768 }
769
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700770 getGroupStoreKeyMap().remove(new GroupStoreKeyMapKey(existing.deviceId(),
771 existing.appCookie()));
772 getGroupStoreIdMap().remove(new GroupStoreIdMapKey(existing.deviceId(),
773 existing.id()));
alshabib10580802015-02-18 18:30:33 -0800774 }
775
776 @Override
777 public void addOrUpdateExtraneousGroupEntry(Group group) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700778 log.trace("addOrUpdateExtraneousGroupEntry: add/update extraneous "
779 + "group entry {} in device {}",
780 group.id(),
781 group.deviceId());
alshabib10580802015-02-18 18:30:33 -0800782 ConcurrentMap<GroupId, Group> extraneousIdTable =
783 getExtraneousGroupIdTable(group.deviceId());
784 extraneousIdTable.put(group.id(), group);
785 // Check the reference counter
786 if (group.referenceCount() == 0) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700787 log.trace("addOrUpdateExtraneousGroupEntry: Flow reference "
788 + "counter is zero and triggering remove",
789 group.id(),
790 group.deviceId());
alshabib10580802015-02-18 18:30:33 -0800791 notifyDelegate(new GroupEvent(Type.GROUP_REMOVE_REQUESTED, group));
792 }
793 }
794
795 @Override
796 public void removeExtraneousGroupEntry(Group group) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700797 log.trace("removeExtraneousGroupEntry: remove extraneous "
798 + "group entry {} of device {} from store",
799 group.id(),
800 group.deviceId());
alshabib10580802015-02-18 18:30:33 -0800801 ConcurrentMap<GroupId, Group> extraneousIdTable =
802 getExtraneousGroupIdTable(group.deviceId());
803 extraneousIdTable.remove(group.id());
804 }
805
806 @Override
807 public Iterable<Group> getExtraneousGroups(DeviceId deviceId) {
808 // flatten and make iterator unmodifiable
809 return FluentIterable.from(
810 getExtraneousGroupIdTable(deviceId).values());
811 }
812
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700813 /**
814 * ClockService that generates wallclock based timestamps.
815 */
816 private class GroupStoreLogicalClockManager<T, U>
817 implements ClockService<T, U> {
alshabib10580802015-02-18 18:30:33 -0800818
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700819 private final AtomicLong sequenceNumber = new AtomicLong(0);
820
821 @Override
822 public Timestamp getTimestamp(T t1, U u1) {
823 return new MultiValuedTimestamp<>(System.currentTimeMillis(),
824 sequenceNumber.getAndIncrement());
825 }
826 }
827
828 /**
829 * Map handler to receive any events when the group map is updated.
830 */
831 private class GroupStoreIdMapListener implements
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700832 EventuallyConsistentMapListener<GroupStoreIdMapKey, StoredGroupEntry> {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700833
834 @Override
835 public void event(EventuallyConsistentMapEvent<GroupStoreIdMapKey,
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700836 StoredGroupEntry> mapEvent) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700837 GroupEvent groupEvent = null;
838 log.trace("GroupStoreIdMapListener: received groupid map event {}",
839 mapEvent.type());
840 if (mapEvent.type() == EventuallyConsistentMapEvent.Type.PUT) {
841 log.trace("GroupIdMapListener: Received PUT event");
842 if (mapEvent.value().state() == Group.GroupState.ADDED) {
843 if (mapEvent.value().isGroupStateAddedFirstTime()) {
844 groupEvent = new GroupEvent(Type.GROUP_ADDED,
845 mapEvent.value());
846 log.trace("GroupIdMapListener: Received first time "
847 + "GROUP_ADDED state update");
848 } else {
849 groupEvent = new GroupEvent(Type.GROUP_UPDATED,
850 mapEvent.value());
851 log.trace("GroupIdMapListener: Received following "
852 + "GROUP_ADDED state update");
853 }
854 }
855 } else if (mapEvent.type() == EventuallyConsistentMapEvent.Type.REMOVE) {
856 log.trace("GroupIdMapListener: Received REMOVE event");
857 groupEvent = new GroupEvent(Type.GROUP_REMOVED, mapEvent.value());
858 }
859
860 if (groupEvent != null) {
861 notifyDelegate(groupEvent);
862 }
863 }
864 }
865 /**
866 * Message handler to receive messages from group subsystems of
867 * other cluster members.
868 */
869 private final class ClusterGroupMsgHandler
870 implements ClusterMessageHandler {
871 @Override
872 public void handle(ClusterMessage message) {
873 log.trace("ClusterGroupMsgHandler: received remote group message");
874 if (message.subject() ==
875 GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST) {
876 GroupStoreMessage groupOp = kryoBuilder.
877 build().deserialize(message.payload());
878 log.trace("received remote group operation request");
879 if (!(mastershipService.
880 getLocalRole(groupOp.deviceId()) !=
881 MastershipRole.MASTER)) {
882 log.warn("ClusterGroupMsgHandler: This node is not "
883 + "MASTER for device {}", groupOp.deviceId());
884 return;
885 }
886 if (groupOp.type() == GroupStoreMessage.Type.ADD) {
887 log.trace("processing remote group "
888 + "add operation request");
889 storeGroupDescriptionInternal(groupOp.groupDesc());
890 } else if (groupOp.type() == GroupStoreMessage.Type.UPDATE) {
891 log.trace("processing remote group "
892 + "update operation request");
893 updateGroupDescriptionInternal(groupOp.deviceId(),
894 groupOp.appCookie(),
895 groupOp.updateType(),
896 groupOp.updateBuckets(),
897 groupOp.newAppCookie());
898 } else if (groupOp.type() == GroupStoreMessage.Type.DELETE) {
899 log.trace("processing remote group "
900 + "delete operation request");
901 deleteGroupDescriptionInternal(groupOp.deviceId(),
902 groupOp.appCookie());
903 }
904 }
905 }
906 }
907
908 /**
909 * Flattened map key to be used to store group entries.
910 */
911 private class GroupStoreMapKey {
912 private final DeviceId deviceId;
913
914 public GroupStoreMapKey(DeviceId deviceId) {
915 this.deviceId = deviceId;
916 }
917
918 @Override
919 public boolean equals(Object o) {
920 if (this == o) {
921 return true;
922 }
923 if (!(o instanceof GroupStoreMapKey)) {
924 return false;
925 }
926 GroupStoreMapKey that = (GroupStoreMapKey) o;
927 return this.deviceId.equals(that.deviceId);
928 }
929
930 @Override
931 public int hashCode() {
932 int result = 17;
933
934 result = 31 * result + Objects.hash(this.deviceId);
935
936 return result;
937 }
938 }
939
940 private class GroupStoreKeyMapKey extends GroupStoreMapKey {
941 private final GroupKey appCookie;
942 public GroupStoreKeyMapKey(DeviceId deviceId,
943 GroupKey appCookie) {
944 super(deviceId);
945 this.appCookie = appCookie;
946 }
947
948 @Override
949 public boolean equals(Object o) {
950 if (this == o) {
951 return true;
952 }
953 if (!(o instanceof GroupStoreKeyMapKey)) {
954 return false;
955 }
956 GroupStoreKeyMapKey that = (GroupStoreKeyMapKey) o;
957 return (super.equals(that) &&
958 this.appCookie.equals(that.appCookie));
959 }
960
961 @Override
962 public int hashCode() {
963 int result = 17;
964
965 result = 31 * result + super.hashCode() + Objects.hash(this.appCookie);
966
967 return result;
968 }
969 }
970
971 private class GroupStoreIdMapKey extends GroupStoreMapKey {
972 private final GroupId groupId;
973 public GroupStoreIdMapKey(DeviceId deviceId,
974 GroupId groupId) {
975 super(deviceId);
976 this.groupId = groupId;
977 }
978
979 @Override
980 public boolean equals(Object o) {
981 if (this == o) {
982 return true;
983 }
984 if (!(o instanceof GroupStoreIdMapKey)) {
985 return false;
986 }
987 GroupStoreIdMapKey that = (GroupStoreIdMapKey) o;
988 return (super.equals(that) &&
989 this.groupId.equals(that.groupId));
990 }
991
992 @Override
993 public int hashCode() {
994 int result = 17;
995
996 result = 31 * result + super.hashCode() + Objects.hash(this.groupId);
997
998 return result;
999 }
1000 }
alshabib10580802015-02-18 18:30:33 -08001001}