blob: a571b4e5e6bff74d2142c9c2df609d4a35896694 [file] [log] [blame]
alshabib10580802015-02-18 18:30:33 -08001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.group.impl;
17
18import static org.apache.commons.lang3.concurrent.ConcurrentUtils.createIfAbsentUnchecked;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070019import static org.onlab.util.Tools.groupedThreads;
alshabib10580802015-02-18 18:30:33 -080020import static org.slf4j.LoggerFactory.getLogger;
21
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070022import java.net.URI;
alshabib10580802015-02-18 18:30:33 -080023import java.util.ArrayList;
24import java.util.HashMap;
25import java.util.List;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070026import java.util.Objects;
alshabib10580802015-02-18 18:30:33 -080027import java.util.concurrent.ConcurrentHashMap;
28import java.util.concurrent.ConcurrentMap;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070029import java.util.concurrent.ExecutorService;
30import java.util.concurrent.Executors;
alshabib10580802015-02-18 18:30:33 -080031import java.util.concurrent.atomic.AtomicInteger;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070032import java.util.concurrent.atomic.AtomicLong;
33import java.util.stream.Collectors;
alshabib10580802015-02-18 18:30:33 -080034
35import org.apache.felix.scr.annotations.Activate;
36import org.apache.felix.scr.annotations.Component;
37import org.apache.felix.scr.annotations.Deactivate;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070038import org.apache.felix.scr.annotations.Reference;
39import org.apache.felix.scr.annotations.ReferenceCardinality;
alshabib10580802015-02-18 18:30:33 -080040import org.apache.felix.scr.annotations.Service;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070041import org.onlab.util.KryoNamespace;
alshabib10580802015-02-18 18:30:33 -080042import org.onlab.util.NewConcurrentHashMap;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070043import org.onosproject.cluster.ClusterService;
44import org.onosproject.core.DefaultApplicationId;
alshabib10580802015-02-18 18:30:33 -080045import org.onosproject.core.DefaultGroupId;
46import org.onosproject.core.GroupId;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070047import org.onosproject.mastership.MastershipService;
alshabib10580802015-02-18 18:30:33 -080048import org.onosproject.net.DeviceId;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070049import org.onosproject.net.MastershipRole;
50import org.onosproject.net.PortNumber;
51import org.onosproject.net.flow.DefaultTrafficTreatment;
52import org.onosproject.net.flow.FlowRule;
53import org.onosproject.net.flow.instructions.Instructions;
54import org.onosproject.net.flow.instructions.L0ModificationInstruction;
55import org.onosproject.net.flow.instructions.L2ModificationInstruction;
56import org.onosproject.net.flow.instructions.L3ModificationInstruction;
alshabib10580802015-02-18 18:30:33 -080057import org.onosproject.net.group.DefaultGroup;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070058import org.onosproject.net.group.DefaultGroupBucket;
alshabib10580802015-02-18 18:30:33 -080059import org.onosproject.net.group.DefaultGroupDescription;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070060import org.onosproject.net.group.DefaultGroupKey;
alshabib10580802015-02-18 18:30:33 -080061import org.onosproject.net.group.Group;
62import org.onosproject.net.group.Group.GroupState;
63import org.onosproject.net.group.GroupBucket;
64import org.onosproject.net.group.GroupBuckets;
65import org.onosproject.net.group.GroupDescription;
66import org.onosproject.net.group.GroupEvent;
67import org.onosproject.net.group.GroupEvent.Type;
68import org.onosproject.net.group.GroupKey;
69import org.onosproject.net.group.GroupOperation;
70import org.onosproject.net.group.GroupStore;
71import org.onosproject.net.group.GroupStoreDelegate;
72import org.onosproject.net.group.StoredGroupEntry;
73import org.onosproject.store.AbstractStore;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070074import org.onosproject.store.Timestamp;
75import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
76import org.onosproject.store.cluster.messaging.ClusterMessage;
77import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
78import org.onosproject.store.ecmap.EventuallyConsistentMap;
79import org.onosproject.store.ecmap.EventuallyConsistentMapEvent;
80import org.onosproject.store.ecmap.EventuallyConsistentMapImpl;
81import org.onosproject.store.ecmap.EventuallyConsistentMapListener;
82import org.onosproject.store.impl.ClockService;
83import org.onosproject.store.impl.MultiValuedTimestamp;
84import org.onosproject.store.serializers.KryoNamespaces;
alshabib10580802015-02-18 18:30:33 -080085import org.slf4j.Logger;
86
alshabib10580802015-02-18 18:30:33 -080087import com.google.common.collect.FluentIterable;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070088import com.google.common.collect.Iterables;
alshabib10580802015-02-18 18:30:33 -080089
90/**
91 * Manages inventory of group entries using trivial in-memory implementation.
92 */
93@Component(immediate = true)
94@Service
95public class DistributedGroupStore
96 extends AbstractStore<GroupEvent, GroupStoreDelegate>
97 implements GroupStore {
98
99 private final Logger log = getLogger(getClass());
100
101 private final int dummyId = 0xffffffff;
102 private final GroupId dummyGroupId = new DefaultGroupId(dummyId);
103
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700104 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
105 protected ClusterCommunicationService clusterCommunicator;
106
107 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
108 protected ClusterService clusterService;
109
110 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
111 protected MastershipService mastershipService;
112
113 // Per device group table with (device id + app cookie) as key
114 private EventuallyConsistentMap<GroupStoreKeyMapKey,
115 StoredGroupEntry> groupStoreEntriesByKey = null;
116 // Per device group table with (device id + group id) as key
117 private EventuallyConsistentMap<GroupStoreIdMapKey,
118 StoredGroupEntry> groupStoreEntriesById = null;
119 private EventuallyConsistentMap<GroupStoreKeyMapKey,
120 StoredGroupEntry> auditPendingReqQueue = null;
alshabib10580802015-02-18 18:30:33 -0800121 private final ConcurrentMap<DeviceId, ConcurrentMap<GroupId, Group>>
122 extraneousGroupEntriesById = new ConcurrentHashMap<>();
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700123 private ExecutorService messageHandlingExecutor;
124 private static final int MESSAGE_HANDLER_THREAD_POOL_SIZE = 1;
alshabib10580802015-02-18 18:30:33 -0800125
126 private final HashMap<DeviceId, Boolean> deviceAuditStatus =
127 new HashMap<DeviceId, Boolean>();
128
129 private final AtomicInteger groupIdGen = new AtomicInteger();
130
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700131 private KryoNamespace.Builder kryoBuilder = null;
132
alshabib10580802015-02-18 18:30:33 -0800133 @Activate
134 public void activate() {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700135 kryoBuilder = new KryoNamespace.Builder()
136 .register(DefaultGroup.class,
137 DefaultGroupBucket.class,
138 DefaultGroupDescription.class,
139 DefaultGroupKey.class,
140 GroupDescription.Type.class,
141 Group.GroupState.class,
142 GroupBuckets.class,
143 DefaultGroupId.class,
144 GroupStoreMessage.class,
145 GroupStoreMessage.Type.class,
146 UpdateType.class,
147 GroupStoreMessageSubjects.class,
148 MultiValuedTimestamp.class,
149 GroupStoreKeyMapKey.class,
150 GroupStoreIdMapKey.class,
151 GroupStoreMapKey.class
152 )
153 .register(URI.class)
154 .register(DeviceId.class)
155 .register(PortNumber.class)
156 .register(DefaultApplicationId.class)
157 .register(DefaultTrafficTreatment.class,
158 Instructions.DropInstruction.class,
159 Instructions.OutputInstruction.class,
160 Instructions.GroupInstruction.class,
161 Instructions.TableTypeTransition.class,
162 FlowRule.Type.class,
163 L0ModificationInstruction.class,
164 L0ModificationInstruction.L0SubType.class,
165 L0ModificationInstruction.ModLambdaInstruction.class,
166 L2ModificationInstruction.class,
167 L2ModificationInstruction.L2SubType.class,
168 L2ModificationInstruction.ModEtherInstruction.class,
169 L2ModificationInstruction.PushHeaderInstructions.class,
170 L2ModificationInstruction.ModVlanIdInstruction.class,
171 L2ModificationInstruction.ModVlanPcpInstruction.class,
172 L2ModificationInstruction.ModMplsLabelInstruction.class,
173 L2ModificationInstruction.ModMplsTtlInstruction.class,
174 L3ModificationInstruction.class,
175 L3ModificationInstruction.L3SubType.class,
176 L3ModificationInstruction.ModIPInstruction.class,
177 L3ModificationInstruction.ModIPv6FlowLabelInstruction.class,
178 L3ModificationInstruction.ModTtlInstruction.class,
179 org.onlab.packet.MplsLabel.class
180 )
181 .register(org.onosproject.cluster.NodeId.class)
182 .register(KryoNamespaces.BASIC)
183 .register(KryoNamespaces.MISC);
184
185 messageHandlingExecutor = Executors.
186 newFixedThreadPool(MESSAGE_HANDLER_THREAD_POOL_SIZE,
187 groupedThreads("onos/store/group",
188 "message-handlers"));
189 clusterCommunicator.
190 addSubscriber(GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST,
191 new ClusterGroupMsgHandler(),
192 messageHandlingExecutor);
193
194 log.debug("Creating EC map groupstorekeymap");
195 groupStoreEntriesByKey =
196 new EventuallyConsistentMapImpl<>("groupstorekeymap",
197 clusterService,
198 clusterCommunicator,
199 kryoBuilder,
200 new GroupStoreLogicalClockManager<>());
201 log.trace("Current size {}", groupStoreEntriesByKey.size());
202
203 log.debug("Creating EC map groupstoreidmap");
204 groupStoreEntriesById =
205 new EventuallyConsistentMapImpl<>("groupstoreidmap",
206 clusterService,
207 clusterCommunicator,
208 kryoBuilder,
209 new GroupStoreLogicalClockManager<>());
210 groupStoreEntriesById.addListener(new GroupStoreIdMapListener());
211 log.trace("Current size {}", groupStoreEntriesById.size());
212
213 log.debug("Creating EC map pendinggroupkeymap");
214 auditPendingReqQueue =
215 new EventuallyConsistentMapImpl<>("pendinggroupkeymap",
216 clusterService,
217 clusterCommunicator,
218 kryoBuilder,
219 new GroupStoreLogicalClockManager<>());
220 log.trace("Current size {}", auditPendingReqQueue.size());
221
alshabib10580802015-02-18 18:30:33 -0800222 log.info("Started");
223 }
224
225 @Deactivate
226 public void deactivate() {
alshabib10580802015-02-18 18:30:33 -0800227 log.info("Stopped");
228 }
229
alshabib10580802015-02-18 18:30:33 -0800230 private static NewConcurrentHashMap<GroupId, Group>
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700231 lazyEmptyExtraneousGroupIdTable() {
alshabib10580802015-02-18 18:30:33 -0800232 return NewConcurrentHashMap.<GroupId, Group>ifNeeded();
233 }
234
235 /**
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700236 * Returns the group store eventual consistent key map.
alshabib10580802015-02-18 18:30:33 -0800237 *
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700238 * @return Map representing group key table.
alshabib10580802015-02-18 18:30:33 -0800239 */
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700240 private EventuallyConsistentMap<GroupStoreKeyMapKey, StoredGroupEntry>
241 getGroupStoreKeyMap() {
242 return groupStoreEntriesByKey;
alshabib10580802015-02-18 18:30:33 -0800243 }
244
245 /**
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700246 * Returns the group store eventual consistent id map.
alshabib10580802015-02-18 18:30:33 -0800247 *
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700248 * @return Map representing group id table.
alshabib10580802015-02-18 18:30:33 -0800249 */
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700250 private EventuallyConsistentMap<GroupStoreIdMapKey, StoredGroupEntry>
251 getGroupStoreIdMap() {
252 return groupStoreEntriesById;
alshabib10580802015-02-18 18:30:33 -0800253 }
254
255 /**
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700256 * Returns the pending group request table.
alshabib10580802015-02-18 18:30:33 -0800257 *
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700258 * @return Map representing group key table.
alshabib10580802015-02-18 18:30:33 -0800259 */
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700260 private EventuallyConsistentMap<GroupStoreKeyMapKey, StoredGroupEntry>
261 getPendingGroupKeyTable() {
262 return auditPendingReqQueue;
alshabib10580802015-02-18 18:30:33 -0800263 }
264
265 /**
266 * Returns the extraneous group id table for specified device.
267 *
268 * @param deviceId identifier of the device
269 * @return Map representing group key table of given device.
270 */
271 private ConcurrentMap<GroupId, Group>
272 getExtraneousGroupIdTable(DeviceId deviceId) {
273 return createIfAbsentUnchecked(extraneousGroupEntriesById,
274 deviceId,
275 lazyEmptyExtraneousGroupIdTable());
276 }
277
278 /**
279 * Returns the number of groups for the specified device in the store.
280 *
281 * @return number of groups for the specified device
282 */
283 @Override
284 public int getGroupCount(DeviceId deviceId) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700285 return (getGroups(deviceId) != null) ?
286 Iterables.size(getGroups(deviceId)) : 0;
alshabib10580802015-02-18 18:30:33 -0800287 }
288
289 /**
290 * Returns the groups associated with a device.
291 *
292 * @param deviceId the device ID
293 *
294 * @return the group entries
295 */
296 @Override
297 public Iterable<Group> getGroups(DeviceId deviceId) {
298 // flatten and make iterator unmodifiable
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700299 log.trace("getGroups: for device {} total number of groups {}",
300 deviceId, getGroupStoreKeyMap().values().size());
301 return FluentIterable.from(getGroupStoreKeyMap().values())
302 .filter(input -> input.deviceId().equals(deviceId))
303 .transform(input -> input);
alshabib10580802015-02-18 18:30:33 -0800304 }
305
306 /**
307 * Returns the stored group entry.
308 *
309 * @param deviceId the device ID
310 * @param appCookie the group key
311 *
312 * @return a group associated with the key
313 */
314 @Override
315 public Group getGroup(DeviceId deviceId, GroupKey appCookie) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700316 return getStoredGroupEntry(deviceId, appCookie);
317 }
318
319 private StoredGroupEntry getStoredGroupEntry(DeviceId deviceId,
320 GroupKey appCookie) {
321 return getGroupStoreKeyMap().get(new GroupStoreKeyMapKey(deviceId,
322 appCookie));
323 }
324
325 @Override
326 public Group getGroup(DeviceId deviceId, GroupId groupId) {
327 return getStoredGroupEntry(deviceId, groupId);
328 }
329
330 private StoredGroupEntry getStoredGroupEntry(DeviceId deviceId,
331 GroupId groupId) {
332 return getGroupStoreIdMap().get(new GroupStoreIdMapKey(deviceId,
333 groupId));
alshabib10580802015-02-18 18:30:33 -0800334 }
335
336 private int getFreeGroupIdValue(DeviceId deviceId) {
337 int freeId = groupIdGen.incrementAndGet();
338
339 while (true) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700340 Group existing = getGroup(deviceId, new DefaultGroupId(freeId));
alshabib10580802015-02-18 18:30:33 -0800341 if (existing == null) {
342 existing = (
343 extraneousGroupEntriesById.get(deviceId) != null) ?
344 extraneousGroupEntriesById.get(deviceId).
345 get(new DefaultGroupId(freeId)) :
346 null;
347 }
348 if (existing != null) {
349 freeId = groupIdGen.incrementAndGet();
350 } else {
351 break;
352 }
353 }
354 return freeId;
355 }
356
357 /**
358 * Stores a new group entry using the information from group description.
359 *
360 * @param groupDesc group description to be used to create group entry
361 */
362 @Override
363 public void storeGroupDescription(GroupDescription groupDesc) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700364 log.trace("In storeGroupDescription");
alshabib10580802015-02-18 18:30:33 -0800365 // Check if a group is existing with the same key
366 if (getGroup(groupDesc.deviceId(), groupDesc.appCookie()) != null) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700367 log.warn("Group already exists with the same key {}",
368 groupDesc.appCookie());
alshabib10580802015-02-18 18:30:33 -0800369 return;
370 }
371
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700372 // Check if group to be created by a remote instance
373 if (mastershipService.getLocalRole(
374 groupDesc.deviceId()) != MastershipRole.MASTER) {
375 log.debug("Device {} local role is not MASTER",
376 groupDesc.deviceId());
377 GroupStoreMessage groupOp = GroupStoreMessage.
378 createGroupAddRequestMsg(groupDesc.deviceId(),
379 groupDesc);
380 ClusterMessage message = new ClusterMessage(
381 clusterService.getLocalNode().id(),
382 GroupStoreMessageSubjects.
383 REMOTE_GROUP_OP_REQUEST,
384 kryoBuilder.build().serialize(groupOp));
385 if (!clusterCommunicator.unicast(message,
386 mastershipService.
387 getMasterFor(
388 groupDesc.deviceId()))) {
389 log.warn("Failed to send request to master: {} to {}",
390 message,
391 mastershipService.getMasterFor(groupDesc.deviceId()));
392 //TODO: Send Group operation failure event
393 }
394 log.debug("Sent Group operation request for device {} "
395 + "to remote MASTER {}",
396 groupDesc.deviceId(),
397 mastershipService.getMasterFor(groupDesc.deviceId()));
alshabib10580802015-02-18 18:30:33 -0800398 return;
399 }
400
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700401 log.debug("Store group for device {} is getting handled locally",
402 groupDesc.deviceId());
alshabib10580802015-02-18 18:30:33 -0800403 storeGroupDescriptionInternal(groupDesc);
404 }
405
406 private void storeGroupDescriptionInternal(GroupDescription groupDesc) {
407 // Check if a group is existing with the same key
408 if (getGroup(groupDesc.deviceId(), groupDesc.appCookie()) != null) {
409 return;
410 }
411
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700412 if (deviceAuditStatus.get(groupDesc.deviceId()) == null) {
413 // Device group audit has not completed yet
414 // Add this group description to pending group key table
415 // Create a group entry object with Dummy Group ID
416 log.debug("storeGroupDescriptionInternal: Device {} AUDIT "
417 + "pending...Queuing Group ADD request",
418 groupDesc.deviceId());
419 StoredGroupEntry group = new DefaultGroup(dummyGroupId, groupDesc);
420 group.setState(GroupState.WAITING_AUDIT_COMPLETE);
421 EventuallyConsistentMap<GroupStoreKeyMapKey, StoredGroupEntry> pendingKeyTable =
422 getPendingGroupKeyTable();
423 pendingKeyTable.put(new GroupStoreKeyMapKey(groupDesc.deviceId(),
424 groupDesc.appCookie()),
425 group);
426 return;
427 }
428
alshabib10580802015-02-18 18:30:33 -0800429 // Get a new group identifier
430 GroupId id = new DefaultGroupId(getFreeGroupIdValue(groupDesc.deviceId()));
431 // Create a group entry object
432 StoredGroupEntry group = new DefaultGroup(id, groupDesc);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700433 // Insert the newly created group entry into key and id maps
434 getGroupStoreKeyMap().
435 put(new GroupStoreKeyMapKey(groupDesc.deviceId(),
436 groupDesc.appCookie()), group);
437 getGroupStoreIdMap().
438 put(new GroupStoreIdMapKey(groupDesc.deviceId(),
439 id), group);
alshabib10580802015-02-18 18:30:33 -0800440 notifyDelegate(new GroupEvent(GroupEvent.Type.GROUP_ADD_REQUESTED,
441 group));
442 }
443
444 /**
445 * Updates the existing group entry with the information
446 * from group description.
447 *
448 * @param deviceId the device ID
449 * @param oldAppCookie the current group key
450 * @param type update type
451 * @param newBuckets group buckets for updates
452 * @param newAppCookie optional new group key
453 */
454 @Override
455 public void updateGroupDescription(DeviceId deviceId,
456 GroupKey oldAppCookie,
457 UpdateType type,
458 GroupBuckets newBuckets,
459 GroupKey newAppCookie) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700460 // Check if group update to be done by a remote instance
461 if (mastershipService.
462 getLocalRole(deviceId) != MastershipRole.MASTER) {
463 GroupStoreMessage groupOp = GroupStoreMessage.
464 createGroupUpdateRequestMsg(deviceId,
465 oldAppCookie,
466 type,
467 newBuckets,
468 newAppCookie);
469 ClusterMessage message =
470 new ClusterMessage(clusterService.getLocalNode().id(),
471 GroupStoreMessageSubjects.
472 REMOTE_GROUP_OP_REQUEST,
473 kryoBuilder.build().serialize(groupOp));
474 if (!clusterCommunicator.unicast(message,
475 mastershipService.
476 getMasterFor(deviceId))) {
477 log.warn("Failed to send request to master: {} to {}",
478 message,
479 mastershipService.getMasterFor(deviceId));
480 //TODO: Send Group operation failure event
481 }
482 return;
483 }
484 updateGroupDescriptionInternal(deviceId,
485 oldAppCookie,
486 type,
487 newBuckets,
488 newAppCookie);
489 }
490
491 private void updateGroupDescriptionInternal(DeviceId deviceId,
492 GroupKey oldAppCookie,
493 UpdateType type,
494 GroupBuckets newBuckets,
495 GroupKey newAppCookie) {
alshabib10580802015-02-18 18:30:33 -0800496 // Check if a group is existing with the provided key
497 Group oldGroup = getGroup(deviceId, oldAppCookie);
498 if (oldGroup == null) {
499 return;
500 }
501
502 List<GroupBucket> newBucketList = getUpdatedBucketList(oldGroup,
503 type,
504 newBuckets);
505 if (newBucketList != null) {
506 // Create a new group object from the old group
507 GroupBuckets updatedBuckets = new GroupBuckets(newBucketList);
508 GroupKey newCookie = (newAppCookie != null) ? newAppCookie : oldAppCookie;
509 GroupDescription updatedGroupDesc = new DefaultGroupDescription(
510 oldGroup.deviceId(),
511 oldGroup.type(),
512 updatedBuckets,
513 newCookie,
514 oldGroup.appId());
515 StoredGroupEntry newGroup = new DefaultGroup(oldGroup.id(),
516 updatedGroupDesc);
517 newGroup.setState(GroupState.PENDING_UPDATE);
518 newGroup.setLife(oldGroup.life());
519 newGroup.setPackets(oldGroup.packets());
520 newGroup.setBytes(oldGroup.bytes());
521 // Remove the old entry from maps and add new entry using new key
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700522 getGroupStoreKeyMap().remove(new GroupStoreKeyMapKey(oldGroup.deviceId(),
523 oldGroup.appCookie()));
524 getGroupStoreIdMap().remove(new GroupStoreIdMapKey(oldGroup.deviceId(),
525 oldGroup.id()));
526 getGroupStoreKeyMap().
527 put(new GroupStoreKeyMapKey(newGroup.deviceId(),
528 newGroup.appCookie()), newGroup);
529 getGroupStoreIdMap().
530 put(new GroupStoreIdMapKey(newGroup.deviceId(),
531 newGroup.id()), newGroup);
532
alshabib10580802015-02-18 18:30:33 -0800533 notifyDelegate(new GroupEvent(Type.GROUP_UPDATE_REQUESTED, newGroup));
534 }
535 }
536
537 private List<GroupBucket> getUpdatedBucketList(Group oldGroup,
538 UpdateType type,
539 GroupBuckets buckets) {
540 GroupBuckets oldBuckets = oldGroup.buckets();
541 List<GroupBucket> newBucketList = new ArrayList<GroupBucket>(
542 oldBuckets.buckets());
543 boolean groupDescUpdated = false;
544
545 if (type == UpdateType.ADD) {
546 // Check if the any of the new buckets are part of
547 // the old bucket list
548 for (GroupBucket addBucket:buckets.buckets()) {
549 if (!newBucketList.contains(addBucket)) {
550 newBucketList.add(addBucket);
551 groupDescUpdated = true;
552 }
553 }
554 } else if (type == UpdateType.REMOVE) {
555 // Check if the to be removed buckets are part of the
556 // old bucket list
557 for (GroupBucket removeBucket:buckets.buckets()) {
558 if (newBucketList.contains(removeBucket)) {
559 newBucketList.remove(removeBucket);
560 groupDescUpdated = true;
561 }
562 }
563 }
564
565 if (groupDescUpdated) {
566 return newBucketList;
567 } else {
568 return null;
569 }
570 }
571
572 /**
573 * Triggers deleting the existing group entry.
574 *
575 * @param deviceId the device ID
576 * @param appCookie the group key
577 */
578 @Override
579 public void deleteGroupDescription(DeviceId deviceId,
580 GroupKey appCookie) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700581 // Check if group to be deleted by a remote instance
582 if (mastershipService.
583 getLocalRole(deviceId) != MastershipRole.MASTER) {
584 GroupStoreMessage groupOp = GroupStoreMessage.
585 createGroupDeleteRequestMsg(deviceId,
586 appCookie);
587 ClusterMessage message =
588 new ClusterMessage(clusterService.getLocalNode().id(),
589 GroupStoreMessageSubjects.
590 REMOTE_GROUP_OP_REQUEST,
591 kryoBuilder.build().serialize(groupOp));
592 if (!clusterCommunicator.unicast(message,
593 mastershipService.
594 getMasterFor(deviceId))) {
595 log.warn("Failed to send request to master: {} to {}",
596 message,
597 mastershipService.getMasterFor(deviceId));
598 //TODO: Send Group operation failure event
599 }
600 return;
601 }
602 deleteGroupDescriptionInternal(deviceId, appCookie);
603 }
604
605 private void deleteGroupDescriptionInternal(DeviceId deviceId,
606 GroupKey appCookie) {
alshabib10580802015-02-18 18:30:33 -0800607 // Check if a group is existing with the provided key
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700608 StoredGroupEntry existing = getStoredGroupEntry(deviceId, appCookie);
alshabib10580802015-02-18 18:30:33 -0800609 if (existing == null) {
610 return;
611 }
612
613 synchronized (existing) {
614 existing.setState(GroupState.PENDING_DELETE);
615 }
616 notifyDelegate(new GroupEvent(Type.GROUP_REMOVE_REQUESTED, existing));
617 }
618
619 /**
620 * Stores a new group entry, or updates an existing entry.
621 *
622 * @param group group entry
623 */
624 @Override
625 public void addOrUpdateGroupEntry(Group group) {
626 // check if this new entry is an update to an existing entry
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700627 StoredGroupEntry existing = getStoredGroupEntry(group.deviceId(),
628 group.id());
alshabib10580802015-02-18 18:30:33 -0800629 GroupEvent event = null;
630
631 if (existing != null) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700632 log.trace("addOrUpdateGroupEntry: updating group "
633 + "entry {} in device {}",
634 group.id(),
635 group.deviceId());
alshabib10580802015-02-18 18:30:33 -0800636 synchronized (existing) {
637 existing.setLife(group.life());
638 existing.setPackets(group.packets());
639 existing.setBytes(group.bytes());
640 if (existing.state() == GroupState.PENDING_ADD) {
641 existing.setState(GroupState.ADDED);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700642 existing.setIsGroupStateAddedFirstTime(true);
alshabib10580802015-02-18 18:30:33 -0800643 event = new GroupEvent(Type.GROUP_ADDED, existing);
644 } else {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700645 existing.setState(GroupState.ADDED);
646 existing.setIsGroupStateAddedFirstTime(false);
alshabib10580802015-02-18 18:30:33 -0800647 event = new GroupEvent(Type.GROUP_UPDATED, existing);
648 }
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700649 //Re-PUT map entries to trigger map update events
650 getGroupStoreKeyMap().
651 put(new GroupStoreKeyMapKey(existing.deviceId(),
652 existing.appCookie()), existing);
653 getGroupStoreIdMap().
654 put(new GroupStoreIdMapKey(existing.deviceId(),
655 existing.id()), existing);
alshabib10580802015-02-18 18:30:33 -0800656 }
657 }
658
659 if (event != null) {
660 notifyDelegate(event);
661 }
662 }
663
664 /**
665 * Removes the group entry from store.
666 *
667 * @param group group entry
668 */
669 @Override
670 public void removeGroupEntry(Group group) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700671 StoredGroupEntry existing = getStoredGroupEntry(group.deviceId(),
672 group.id());
alshabib10580802015-02-18 18:30:33 -0800673
674 if (existing != null) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700675 log.trace("removeGroupEntry: removing group "
676 + "entry {} in device {}",
677 group.id(),
678 group.deviceId());
679 getGroupStoreKeyMap().remove(new GroupStoreKeyMapKey(existing.deviceId(),
680 existing.appCookie()));
681 getGroupStoreIdMap().remove(new GroupStoreIdMapKey(existing.deviceId(),
682 existing.id()));
alshabib10580802015-02-18 18:30:33 -0800683 notifyDelegate(new GroupEvent(Type.GROUP_REMOVED, existing));
684 }
685 }
686
687 @Override
688 public void deviceInitialAuditCompleted(DeviceId deviceId,
689 boolean completed) {
690 synchronized (deviceAuditStatus) {
691 if (completed) {
692 log.debug("deviceInitialAuditCompleted: AUDIT "
693 + "completed for device {}", deviceId);
694 deviceAuditStatus.put(deviceId, true);
695 // Execute all pending group requests
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700696 List<StoredGroupEntry> pendingGroupRequests =
697 getPendingGroupKeyTable().values()
698 .stream()
699 .filter(g-> g.deviceId().equals(deviceId))
700 .collect(Collectors.toList());
701 log.trace("deviceInitialAuditCompleted: processing "
702 + "pending group add requests for device {} and "
703 + "number of pending requests {}",
704 deviceId,
705 pendingGroupRequests.size());
706 for (Group group:pendingGroupRequests) {
alshabib10580802015-02-18 18:30:33 -0800707 GroupDescription tmp = new DefaultGroupDescription(
708 group.deviceId(),
709 group.type(),
710 group.buckets(),
711 group.appCookie(),
712 group.appId());
713 storeGroupDescriptionInternal(tmp);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700714 getPendingGroupKeyTable().
715 remove(new GroupStoreKeyMapKey(deviceId, group.appCookie()));
alshabib10580802015-02-18 18:30:33 -0800716 }
alshabib10580802015-02-18 18:30:33 -0800717 } else {
718 if (deviceAuditStatus.get(deviceId)) {
719 log.debug("deviceInitialAuditCompleted: Clearing AUDIT "
720 + "status for device {}", deviceId);
721 deviceAuditStatus.put(deviceId, false);
722 }
723 }
724 }
725 }
726
727 @Override
728 public boolean deviceInitialAuditStatus(DeviceId deviceId) {
729 synchronized (deviceAuditStatus) {
730 return (deviceAuditStatus.get(deviceId) != null)
731 ? deviceAuditStatus.get(deviceId) : false;
732 }
733 }
734
735 @Override
736 public void groupOperationFailed(DeviceId deviceId, GroupOperation operation) {
737
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700738 StoredGroupEntry existing = getStoredGroupEntry(deviceId,
739 operation.groupId());
alshabib10580802015-02-18 18:30:33 -0800740
741 if (existing == null) {
742 log.warn("No group entry with ID {} found ", operation.groupId());
743 return;
744 }
745
746 switch (operation.opType()) {
747 case ADD:
748 notifyDelegate(new GroupEvent(Type.GROUP_ADD_FAILED, existing));
749 break;
750 case MODIFY:
751 notifyDelegate(new GroupEvent(Type.GROUP_UPDATE_FAILED, existing));
752 break;
753 case DELETE:
754 notifyDelegate(new GroupEvent(Type.GROUP_REMOVE_FAILED, existing));
755 break;
756 default:
757 log.warn("Unknown group operation type {}", operation.opType());
758 }
759
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700760 getGroupStoreKeyMap().remove(new GroupStoreKeyMapKey(existing.deviceId(),
761 existing.appCookie()));
762 getGroupStoreIdMap().remove(new GroupStoreIdMapKey(existing.deviceId(),
763 existing.id()));
alshabib10580802015-02-18 18:30:33 -0800764 }
765
766 @Override
767 public void addOrUpdateExtraneousGroupEntry(Group group) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700768 log.trace("addOrUpdateExtraneousGroupEntry: add/update extraneous "
769 + "group entry {} in device {}",
770 group.id(),
771 group.deviceId());
alshabib10580802015-02-18 18:30:33 -0800772 ConcurrentMap<GroupId, Group> extraneousIdTable =
773 getExtraneousGroupIdTable(group.deviceId());
774 extraneousIdTable.put(group.id(), group);
775 // Check the reference counter
776 if (group.referenceCount() == 0) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700777 log.trace("addOrUpdateExtraneousGroupEntry: Flow reference "
778 + "counter is zero and triggering remove",
779 group.id(),
780 group.deviceId());
alshabib10580802015-02-18 18:30:33 -0800781 notifyDelegate(new GroupEvent(Type.GROUP_REMOVE_REQUESTED, group));
782 }
783 }
784
785 @Override
786 public void removeExtraneousGroupEntry(Group group) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700787 log.trace("removeExtraneousGroupEntry: remove extraneous "
788 + "group entry {} of device {} from store",
789 group.id(),
790 group.deviceId());
alshabib10580802015-02-18 18:30:33 -0800791 ConcurrentMap<GroupId, Group> extraneousIdTable =
792 getExtraneousGroupIdTable(group.deviceId());
793 extraneousIdTable.remove(group.id());
794 }
795
796 @Override
797 public Iterable<Group> getExtraneousGroups(DeviceId deviceId) {
798 // flatten and make iterator unmodifiable
799 return FluentIterable.from(
800 getExtraneousGroupIdTable(deviceId).values());
801 }
802
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700803 /**
804 * ClockService that generates wallclock based timestamps.
805 */
806 private class GroupStoreLogicalClockManager<T, U>
807 implements ClockService<T, U> {
alshabib10580802015-02-18 18:30:33 -0800808
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700809 private final AtomicLong sequenceNumber = new AtomicLong(0);
810
811 @Override
812 public Timestamp getTimestamp(T t1, U u1) {
813 return new MultiValuedTimestamp<>(System.currentTimeMillis(),
814 sequenceNumber.getAndIncrement());
815 }
816 }
817
818 /**
819 * Map handler to receive any events when the group map is updated.
820 */
821 private class GroupStoreIdMapListener implements
822 EventuallyConsistentMapListener<GroupStoreIdMapKey, StoredGroupEntry> {
823
824 @Override
825 public void event(EventuallyConsistentMapEvent<GroupStoreIdMapKey,
826 StoredGroupEntry> mapEvent) {
827 GroupEvent groupEvent = null;
828 log.trace("GroupStoreIdMapListener: received groupid map event {}",
829 mapEvent.type());
830 if (mapEvent.type() == EventuallyConsistentMapEvent.Type.PUT) {
831 log.trace("GroupIdMapListener: Received PUT event");
832 if (mapEvent.value().state() == Group.GroupState.ADDED) {
833 if (mapEvent.value().isGroupStateAddedFirstTime()) {
834 groupEvent = new GroupEvent(Type.GROUP_ADDED,
835 mapEvent.value());
836 log.trace("GroupIdMapListener: Received first time "
837 + "GROUP_ADDED state update");
838 } else {
839 groupEvent = new GroupEvent(Type.GROUP_UPDATED,
840 mapEvent.value());
841 log.trace("GroupIdMapListener: Received following "
842 + "GROUP_ADDED state update");
843 }
844 }
845 } else if (mapEvent.type() == EventuallyConsistentMapEvent.Type.REMOVE) {
846 log.trace("GroupIdMapListener: Received REMOVE event");
847 groupEvent = new GroupEvent(Type.GROUP_REMOVED, mapEvent.value());
848 }
849
850 if (groupEvent != null) {
851 notifyDelegate(groupEvent);
852 }
853 }
854 }
855 /**
856 * Message handler to receive messages from group subsystems of
857 * other cluster members.
858 */
859 private final class ClusterGroupMsgHandler
860 implements ClusterMessageHandler {
861 @Override
862 public void handle(ClusterMessage message) {
863 log.trace("ClusterGroupMsgHandler: received remote group message");
864 if (message.subject() ==
865 GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST) {
866 GroupStoreMessage groupOp = kryoBuilder.
867 build().deserialize(message.payload());
868 log.trace("received remote group operation request");
869 if (!(mastershipService.
870 getLocalRole(groupOp.deviceId()) !=
871 MastershipRole.MASTER)) {
872 log.warn("ClusterGroupMsgHandler: This node is not "
873 + "MASTER for device {}", groupOp.deviceId());
874 return;
875 }
876 if (groupOp.type() == GroupStoreMessage.Type.ADD) {
877 log.trace("processing remote group "
878 + "add operation request");
879 storeGroupDescriptionInternal(groupOp.groupDesc());
880 } else if (groupOp.type() == GroupStoreMessage.Type.UPDATE) {
881 log.trace("processing remote group "
882 + "update operation request");
883 updateGroupDescriptionInternal(groupOp.deviceId(),
884 groupOp.appCookie(),
885 groupOp.updateType(),
886 groupOp.updateBuckets(),
887 groupOp.newAppCookie());
888 } else if (groupOp.type() == GroupStoreMessage.Type.DELETE) {
889 log.trace("processing remote group "
890 + "delete operation request");
891 deleteGroupDescriptionInternal(groupOp.deviceId(),
892 groupOp.appCookie());
893 }
894 }
895 }
896 }
897
898 /**
899 * Flattened map key to be used to store group entries.
900 */
901 private class GroupStoreMapKey {
902 private final DeviceId deviceId;
903
904 public GroupStoreMapKey(DeviceId deviceId) {
905 this.deviceId = deviceId;
906 }
907
908 @Override
909 public boolean equals(Object o) {
910 if (this == o) {
911 return true;
912 }
913 if (!(o instanceof GroupStoreMapKey)) {
914 return false;
915 }
916 GroupStoreMapKey that = (GroupStoreMapKey) o;
917 return this.deviceId.equals(that.deviceId);
918 }
919
920 @Override
921 public int hashCode() {
922 int result = 17;
923
924 result = 31 * result + Objects.hash(this.deviceId);
925
926 return result;
927 }
928 }
929
930 private class GroupStoreKeyMapKey extends GroupStoreMapKey {
931 private final GroupKey appCookie;
932 public GroupStoreKeyMapKey(DeviceId deviceId,
933 GroupKey appCookie) {
934 super(deviceId);
935 this.appCookie = appCookie;
936 }
937
938 @Override
939 public boolean equals(Object o) {
940 if (this == o) {
941 return true;
942 }
943 if (!(o instanceof GroupStoreKeyMapKey)) {
944 return false;
945 }
946 GroupStoreKeyMapKey that = (GroupStoreKeyMapKey) o;
947 return (super.equals(that) &&
948 this.appCookie.equals(that.appCookie));
949 }
950
951 @Override
952 public int hashCode() {
953 int result = 17;
954
955 result = 31 * result + super.hashCode() + Objects.hash(this.appCookie);
956
957 return result;
958 }
959 }
960
961 private class GroupStoreIdMapKey extends GroupStoreMapKey {
962 private final GroupId groupId;
963 public GroupStoreIdMapKey(DeviceId deviceId,
964 GroupId groupId) {
965 super(deviceId);
966 this.groupId = groupId;
967 }
968
969 @Override
970 public boolean equals(Object o) {
971 if (this == o) {
972 return true;
973 }
974 if (!(o instanceof GroupStoreIdMapKey)) {
975 return false;
976 }
977 GroupStoreIdMapKey that = (GroupStoreIdMapKey) o;
978 return (super.equals(that) &&
979 this.groupId.equals(that.groupId));
980 }
981
982 @Override
983 public int hashCode() {
984 int result = 17;
985
986 result = 31 * result + super.hashCode() + Objects.hash(this.groupId);
987
988 return result;
989 }
990 }
alshabib10580802015-02-18 18:30:33 -0800991}