blob: 1930a473d01c9291d746156d419d9a39c1b3dfcb [file] [log] [blame]
alshabib10580802015-02-18 18:30:33 -08001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.group.impl;
17
Jonathan Hart6ec029a2015-03-24 17:12:35 -070018import com.google.common.collect.FluentIterable;
19import com.google.common.collect.Iterables;
alshabib10580802015-02-18 18:30:33 -080020import org.apache.felix.scr.annotations.Activate;
21import org.apache.felix.scr.annotations.Component;
22import org.apache.felix.scr.annotations.Deactivate;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070023import org.apache.felix.scr.annotations.Reference;
24import org.apache.felix.scr.annotations.ReferenceCardinality;
alshabib10580802015-02-18 18:30:33 -080025import org.apache.felix.scr.annotations.Service;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070026import org.onlab.util.KryoNamespace;
alshabib10580802015-02-18 18:30:33 -080027import org.onlab.util.NewConcurrentHashMap;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070028import org.onosproject.cluster.ClusterService;
29import org.onosproject.core.DefaultApplicationId;
alshabib10580802015-02-18 18:30:33 -080030import org.onosproject.core.DefaultGroupId;
31import org.onosproject.core.GroupId;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070032import org.onosproject.mastership.MastershipService;
alshabib10580802015-02-18 18:30:33 -080033import org.onosproject.net.DeviceId;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070034import org.onosproject.net.MastershipRole;
35import org.onosproject.net.PortNumber;
36import org.onosproject.net.flow.DefaultTrafficTreatment;
37import org.onosproject.net.flow.FlowRule;
38import org.onosproject.net.flow.instructions.Instructions;
39import org.onosproject.net.flow.instructions.L0ModificationInstruction;
40import org.onosproject.net.flow.instructions.L2ModificationInstruction;
41import org.onosproject.net.flow.instructions.L3ModificationInstruction;
alshabib10580802015-02-18 18:30:33 -080042import org.onosproject.net.group.DefaultGroup;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070043import org.onosproject.net.group.DefaultGroupBucket;
alshabib10580802015-02-18 18:30:33 -080044import org.onosproject.net.group.DefaultGroupDescription;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070045import org.onosproject.net.group.DefaultGroupKey;
alshabib10580802015-02-18 18:30:33 -080046import org.onosproject.net.group.Group;
47import org.onosproject.net.group.Group.GroupState;
48import org.onosproject.net.group.GroupBucket;
49import org.onosproject.net.group.GroupBuckets;
50import org.onosproject.net.group.GroupDescription;
51import org.onosproject.net.group.GroupEvent;
52import org.onosproject.net.group.GroupEvent.Type;
53import org.onosproject.net.group.GroupKey;
54import org.onosproject.net.group.GroupOperation;
55import org.onosproject.net.group.GroupStore;
56import org.onosproject.net.group.GroupStoreDelegate;
57import org.onosproject.net.group.StoredGroupEntry;
58import org.onosproject.store.AbstractStore;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070059import org.onosproject.store.Timestamp;
60import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
61import org.onosproject.store.cluster.messaging.ClusterMessage;
62import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070063import org.onosproject.store.impl.MultiValuedTimestamp;
64import org.onosproject.store.serializers.KryoNamespaces;
Jonathan Hart6ec029a2015-03-24 17:12:35 -070065import org.onosproject.store.service.ClockService;
66import org.onosproject.store.service.EventuallyConsistentMap;
67import org.onosproject.store.service.EventuallyConsistentMapBuilder;
68import org.onosproject.store.service.EventuallyConsistentMapEvent;
69import org.onosproject.store.service.EventuallyConsistentMapListener;
70import org.onosproject.store.service.StorageService;
alshabib10580802015-02-18 18:30:33 -080071import org.slf4j.Logger;
72
Jonathan Hart6ec029a2015-03-24 17:12:35 -070073import java.net.URI;
74import java.util.ArrayList;
75import java.util.HashMap;
76import java.util.List;
77import java.util.Objects;
78import java.util.concurrent.ConcurrentHashMap;
79import java.util.concurrent.ConcurrentMap;
80import java.util.concurrent.ExecutorService;
81import java.util.concurrent.Executors;
82import java.util.concurrent.atomic.AtomicInteger;
83import java.util.concurrent.atomic.AtomicLong;
84import java.util.stream.Collectors;
85
86import static org.apache.commons.lang3.concurrent.ConcurrentUtils.createIfAbsentUnchecked;
87import static org.onlab.util.Tools.groupedThreads;
88import static org.slf4j.LoggerFactory.getLogger;
alshabib10580802015-02-18 18:30:33 -080089
90/**
91 * Manages inventory of group entries using trivial in-memory implementation.
92 */
93@Component(immediate = true)
94@Service
95public class DistributedGroupStore
96 extends AbstractStore<GroupEvent, GroupStoreDelegate>
97 implements GroupStore {
98
99 private final Logger log = getLogger(getClass());
100
101 private final int dummyId = 0xffffffff;
102 private final GroupId dummyGroupId = new DefaultGroupId(dummyId);
103
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700104 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
105 protected ClusterCommunicationService clusterCommunicator;
106
107 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
108 protected ClusterService clusterService;
109
110 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700111 protected StorageService storageService;
112
113 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700114 protected MastershipService mastershipService;
115
116 // Per device group table with (device id + app cookie) as key
117 private EventuallyConsistentMap<GroupStoreKeyMapKey,
118 StoredGroupEntry> groupStoreEntriesByKey = null;
119 // Per device group table with (device id + group id) as key
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700120 private final ConcurrentMap<DeviceId, ConcurrentMap<GroupId, StoredGroupEntry>>
121 groupEntriesById = new ConcurrentHashMap<>();
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700122 private EventuallyConsistentMap<GroupStoreKeyMapKey,
123 StoredGroupEntry> auditPendingReqQueue = null;
alshabib10580802015-02-18 18:30:33 -0800124 private final ConcurrentMap<DeviceId, ConcurrentMap<GroupId, Group>>
125 extraneousGroupEntriesById = new ConcurrentHashMap<>();
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700126 private ExecutorService messageHandlingExecutor;
127 private static final int MESSAGE_HANDLER_THREAD_POOL_SIZE = 1;
alshabib10580802015-02-18 18:30:33 -0800128
129 private final HashMap<DeviceId, Boolean> deviceAuditStatus =
130 new HashMap<DeviceId, Boolean>();
131
132 private final AtomicInteger groupIdGen = new AtomicInteger();
133
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700134 private KryoNamespace.Builder kryoBuilder = null;
135
alshabib10580802015-02-18 18:30:33 -0800136 @Activate
137 public void activate() {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700138 kryoBuilder = new KryoNamespace.Builder()
139 .register(DefaultGroup.class,
140 DefaultGroupBucket.class,
141 DefaultGroupDescription.class,
142 DefaultGroupKey.class,
143 GroupDescription.Type.class,
144 Group.GroupState.class,
145 GroupBuckets.class,
146 DefaultGroupId.class,
147 GroupStoreMessage.class,
148 GroupStoreMessage.Type.class,
149 UpdateType.class,
150 GroupStoreMessageSubjects.class,
151 MultiValuedTimestamp.class,
152 GroupStoreKeyMapKey.class,
153 GroupStoreIdMapKey.class,
154 GroupStoreMapKey.class
155 )
156 .register(URI.class)
157 .register(DeviceId.class)
158 .register(PortNumber.class)
159 .register(DefaultApplicationId.class)
160 .register(DefaultTrafficTreatment.class,
161 Instructions.DropInstruction.class,
162 Instructions.OutputInstruction.class,
163 Instructions.GroupInstruction.class,
164 Instructions.TableTypeTransition.class,
165 FlowRule.Type.class,
166 L0ModificationInstruction.class,
167 L0ModificationInstruction.L0SubType.class,
168 L0ModificationInstruction.ModLambdaInstruction.class,
169 L2ModificationInstruction.class,
170 L2ModificationInstruction.L2SubType.class,
171 L2ModificationInstruction.ModEtherInstruction.class,
172 L2ModificationInstruction.PushHeaderInstructions.class,
173 L2ModificationInstruction.ModVlanIdInstruction.class,
174 L2ModificationInstruction.ModVlanPcpInstruction.class,
175 L2ModificationInstruction.ModMplsLabelInstruction.class,
176 L2ModificationInstruction.ModMplsTtlInstruction.class,
177 L3ModificationInstruction.class,
178 L3ModificationInstruction.L3SubType.class,
179 L3ModificationInstruction.ModIPInstruction.class,
180 L3ModificationInstruction.ModIPv6FlowLabelInstruction.class,
181 L3ModificationInstruction.ModTtlInstruction.class,
182 org.onlab.packet.MplsLabel.class
183 )
184 .register(org.onosproject.cluster.NodeId.class)
185 .register(KryoNamespaces.BASIC)
186 .register(KryoNamespaces.MISC);
187
188 messageHandlingExecutor = Executors.
189 newFixedThreadPool(MESSAGE_HANDLER_THREAD_POOL_SIZE,
190 groupedThreads("onos/store/group",
191 "message-handlers"));
192 clusterCommunicator.
193 addSubscriber(GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST,
194 new ClusterGroupMsgHandler(),
195 messageHandlingExecutor);
196
197 log.debug("Creating EC map groupstorekeymap");
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700198 EventuallyConsistentMapBuilder<GroupStoreKeyMapKey, StoredGroupEntry>
199 keyMapBuilder = storageService.eventuallyConsistentMapBuilder();
200
201 groupStoreEntriesByKey = keyMapBuilder
202 .withName("groupstorekeymap")
203 .withSerializer(kryoBuilder)
204 .withClockService(new GroupStoreLogicalClockManager<>())
205 .build();
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700206 groupStoreEntriesByKey.addListener(new GroupStoreKeyMapListener());
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700207 log.trace("Current size {}", groupStoreEntriesByKey.size());
208
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700209 log.debug("Creating EC map pendinggroupkeymap");
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700210 EventuallyConsistentMapBuilder<GroupStoreKeyMapKey, StoredGroupEntry>
211 auditMapBuilder = storageService.eventuallyConsistentMapBuilder();
212
213 auditPendingReqQueue = auditMapBuilder
214 .withName("pendinggroupkeymap")
215 .withSerializer(kryoBuilder)
216 .withClockService(new GroupStoreLogicalClockManager<>())
217 .build();
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700218 log.trace("Current size {}", auditPendingReqQueue.size());
219
alshabib10580802015-02-18 18:30:33 -0800220 log.info("Started");
221 }
222
223 @Deactivate
224 public void deactivate() {
Thomas Vachuska152f9fd2015-04-02 16:28:13 -0700225 groupStoreEntriesByKey.destroy();
Thomas Vachuska152f9fd2015-04-02 16:28:13 -0700226 auditPendingReqQueue.destroy();
alshabib10580802015-02-18 18:30:33 -0800227 log.info("Stopped");
228 }
229
alshabib10580802015-02-18 18:30:33 -0800230 private static NewConcurrentHashMap<GroupId, Group>
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700231 lazyEmptyExtraneousGroupIdTable() {
alshabib10580802015-02-18 18:30:33 -0800232 return NewConcurrentHashMap.<GroupId, Group>ifNeeded();
233 }
234
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700235 private static NewConcurrentHashMap<GroupId, StoredGroupEntry>
236 lazyEmptyGroupIdTable() {
237 return NewConcurrentHashMap.<GroupId, StoredGroupEntry>ifNeeded();
238 }
239
alshabib10580802015-02-18 18:30:33 -0800240 /**
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700241 * Returns the group store eventual consistent key map.
alshabib10580802015-02-18 18:30:33 -0800242 *
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700243 * @return Map representing group key table.
alshabib10580802015-02-18 18:30:33 -0800244 */
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700245 private EventuallyConsistentMap<GroupStoreKeyMapKey, StoredGroupEntry>
246 getGroupStoreKeyMap() {
247 return groupStoreEntriesByKey;
alshabib10580802015-02-18 18:30:33 -0800248 }
249
250 /**
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700251 * Returns the group id table for specified device.
alshabib10580802015-02-18 18:30:33 -0800252 *
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700253 * @param deviceId identifier of the device
254 * @return Map representing group key table of given device.
alshabib10580802015-02-18 18:30:33 -0800255 */
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700256 private ConcurrentMap<GroupId, StoredGroupEntry> getGroupIdTable(DeviceId deviceId) {
257 return createIfAbsentUnchecked(groupEntriesById,
258 deviceId, lazyEmptyGroupIdTable());
alshabib10580802015-02-18 18:30:33 -0800259 }
260
261 /**
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700262 * Returns the pending group request table.
alshabib10580802015-02-18 18:30:33 -0800263 *
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700264 * @return Map representing group key table.
alshabib10580802015-02-18 18:30:33 -0800265 */
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700266 private EventuallyConsistentMap<GroupStoreKeyMapKey, StoredGroupEntry>
267 getPendingGroupKeyTable() {
268 return auditPendingReqQueue;
alshabib10580802015-02-18 18:30:33 -0800269 }
270
271 /**
272 * Returns the extraneous group id table for specified device.
273 *
274 * @param deviceId identifier of the device
275 * @return Map representing group key table of given device.
276 */
277 private ConcurrentMap<GroupId, Group>
278 getExtraneousGroupIdTable(DeviceId deviceId) {
279 return createIfAbsentUnchecked(extraneousGroupEntriesById,
280 deviceId,
281 lazyEmptyExtraneousGroupIdTable());
282 }
283
284 /**
285 * Returns the number of groups for the specified device in the store.
286 *
287 * @return number of groups for the specified device
288 */
289 @Override
290 public int getGroupCount(DeviceId deviceId) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700291 return (getGroups(deviceId) != null) ?
292 Iterables.size(getGroups(deviceId)) : 0;
alshabib10580802015-02-18 18:30:33 -0800293 }
294
295 /**
296 * Returns the groups associated with a device.
297 *
298 * @param deviceId the device ID
299 *
300 * @return the group entries
301 */
302 @Override
303 public Iterable<Group> getGroups(DeviceId deviceId) {
304 // flatten and make iterator unmodifiable
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700305 log.trace("getGroups: for device {} total number of groups {}",
306 deviceId, getGroupStoreKeyMap().values().size());
307 return FluentIterable.from(getGroupStoreKeyMap().values())
308 .filter(input -> input.deviceId().equals(deviceId))
309 .transform(input -> input);
alshabib10580802015-02-18 18:30:33 -0800310 }
311
312 /**
313 * Returns the stored group entry.
314 *
315 * @param deviceId the device ID
316 * @param appCookie the group key
317 *
318 * @return a group associated with the key
319 */
320 @Override
321 public Group getGroup(DeviceId deviceId, GroupKey appCookie) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700322 return getStoredGroupEntry(deviceId, appCookie);
323 }
324
325 private StoredGroupEntry getStoredGroupEntry(DeviceId deviceId,
326 GroupKey appCookie) {
327 return getGroupStoreKeyMap().get(new GroupStoreKeyMapKey(deviceId,
328 appCookie));
329 }
330
331 @Override
332 public Group getGroup(DeviceId deviceId, GroupId groupId) {
333 return getStoredGroupEntry(deviceId, groupId);
334 }
335
336 private StoredGroupEntry getStoredGroupEntry(DeviceId deviceId,
337 GroupId groupId) {
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700338 return getGroupIdTable(deviceId).get(groupId);
alshabib10580802015-02-18 18:30:33 -0800339 }
340
341 private int getFreeGroupIdValue(DeviceId deviceId) {
342 int freeId = groupIdGen.incrementAndGet();
343
344 while (true) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700345 Group existing = getGroup(deviceId, new DefaultGroupId(freeId));
alshabib10580802015-02-18 18:30:33 -0800346 if (existing == null) {
347 existing = (
348 extraneousGroupEntriesById.get(deviceId) != null) ?
349 extraneousGroupEntriesById.get(deviceId).
350 get(new DefaultGroupId(freeId)) :
351 null;
352 }
353 if (existing != null) {
354 freeId = groupIdGen.incrementAndGet();
355 } else {
356 break;
357 }
358 }
359 return freeId;
360 }
361
362 /**
363 * Stores a new group entry using the information from group description.
364 *
365 * @param groupDesc group description to be used to create group entry
366 */
367 @Override
368 public void storeGroupDescription(GroupDescription groupDesc) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700369 log.trace("In storeGroupDescription");
alshabib10580802015-02-18 18:30:33 -0800370 // Check if a group is existing with the same key
371 if (getGroup(groupDesc.deviceId(), groupDesc.appCookie()) != null) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700372 log.warn("Group already exists with the same key {}",
373 groupDesc.appCookie());
alshabib10580802015-02-18 18:30:33 -0800374 return;
375 }
376
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700377 // Check if group to be created by a remote instance
378 if (mastershipService.getLocalRole(
379 groupDesc.deviceId()) != MastershipRole.MASTER) {
380 log.debug("Device {} local role is not MASTER",
381 groupDesc.deviceId());
382 GroupStoreMessage groupOp = GroupStoreMessage.
383 createGroupAddRequestMsg(groupDesc.deviceId(),
384 groupDesc);
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700385
386 if (!clusterCommunicator.unicast(groupOp,
387 GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST,
388 m -> kryoBuilder.build().serialize(m),
389 mastershipService.getMasterFor(groupDesc.deviceId()))) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700390 log.warn("Failed to send request to master: {} to {}",
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700391 groupOp,
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700392 mastershipService.getMasterFor(groupDesc.deviceId()));
393 //TODO: Send Group operation failure event
394 }
395 log.debug("Sent Group operation request for device {} "
396 + "to remote MASTER {}",
397 groupDesc.deviceId(),
398 mastershipService.getMasterFor(groupDesc.deviceId()));
alshabib10580802015-02-18 18:30:33 -0800399 return;
400 }
401
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700402 log.debug("Store group for device {} is getting handled locally",
403 groupDesc.deviceId());
alshabib10580802015-02-18 18:30:33 -0800404 storeGroupDescriptionInternal(groupDesc);
405 }
406
407 private void storeGroupDescriptionInternal(GroupDescription groupDesc) {
408 // Check if a group is existing with the same key
409 if (getGroup(groupDesc.deviceId(), groupDesc.appCookie()) != null) {
410 return;
411 }
412
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700413 if (deviceAuditStatus.get(groupDesc.deviceId()) == null) {
414 // Device group audit has not completed yet
415 // Add this group description to pending group key table
416 // Create a group entry object with Dummy Group ID
417 log.debug("storeGroupDescriptionInternal: Device {} AUDIT "
418 + "pending...Queuing Group ADD request",
419 groupDesc.deviceId());
420 StoredGroupEntry group = new DefaultGroup(dummyGroupId, groupDesc);
421 group.setState(GroupState.WAITING_AUDIT_COMPLETE);
422 EventuallyConsistentMap<GroupStoreKeyMapKey, StoredGroupEntry> pendingKeyTable =
423 getPendingGroupKeyTable();
424 pendingKeyTable.put(new GroupStoreKeyMapKey(groupDesc.deviceId(),
425 groupDesc.appCookie()),
426 group);
427 return;
428 }
429
alshabib10580802015-02-18 18:30:33 -0800430 // Get a new group identifier
431 GroupId id = new DefaultGroupId(getFreeGroupIdValue(groupDesc.deviceId()));
432 // Create a group entry object
433 StoredGroupEntry group = new DefaultGroup(id, groupDesc);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700434 // Insert the newly created group entry into key and id maps
435 getGroupStoreKeyMap().
436 put(new GroupStoreKeyMapKey(groupDesc.deviceId(),
437 groupDesc.appCookie()), group);
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700438 // Ensure it also inserted into group id based table to
439 // avoid any chances of duplication in group id generation
440 getGroupIdTable(groupDesc.deviceId()).
441 put(id, group);
alshabib10580802015-02-18 18:30:33 -0800442 notifyDelegate(new GroupEvent(GroupEvent.Type.GROUP_ADD_REQUESTED,
443 group));
444 }
445
446 /**
447 * Updates the existing group entry with the information
448 * from group description.
449 *
450 * @param deviceId the device ID
451 * @param oldAppCookie the current group key
452 * @param type update type
453 * @param newBuckets group buckets for updates
454 * @param newAppCookie optional new group key
455 */
456 @Override
457 public void updateGroupDescription(DeviceId deviceId,
458 GroupKey oldAppCookie,
459 UpdateType type,
460 GroupBuckets newBuckets,
461 GroupKey newAppCookie) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700462 // Check if group update to be done by a remote instance
463 if (mastershipService.
464 getLocalRole(deviceId) != MastershipRole.MASTER) {
465 GroupStoreMessage groupOp = GroupStoreMessage.
466 createGroupUpdateRequestMsg(deviceId,
467 oldAppCookie,
468 type,
469 newBuckets,
470 newAppCookie);
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700471
472 if (!clusterCommunicator.unicast(groupOp,
473 GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST,
474 m -> kryoBuilder.build().serialize(m),
475 mastershipService.getMasterFor(deviceId))) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700476 log.warn("Failed to send request to master: {} to {}",
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700477 groupOp,
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700478 mastershipService.getMasterFor(deviceId));
479 //TODO: Send Group operation failure event
480 }
481 return;
482 }
483 updateGroupDescriptionInternal(deviceId,
484 oldAppCookie,
485 type,
486 newBuckets,
487 newAppCookie);
488 }
489
490 private void updateGroupDescriptionInternal(DeviceId deviceId,
491 GroupKey oldAppCookie,
492 UpdateType type,
493 GroupBuckets newBuckets,
494 GroupKey newAppCookie) {
alshabib10580802015-02-18 18:30:33 -0800495 // Check if a group is existing with the provided key
496 Group oldGroup = getGroup(deviceId, oldAppCookie);
497 if (oldGroup == null) {
498 return;
499 }
500
501 List<GroupBucket> newBucketList = getUpdatedBucketList(oldGroup,
502 type,
503 newBuckets);
504 if (newBucketList != null) {
505 // Create a new group object from the old group
506 GroupBuckets updatedBuckets = new GroupBuckets(newBucketList);
507 GroupKey newCookie = (newAppCookie != null) ? newAppCookie : oldAppCookie;
508 GroupDescription updatedGroupDesc = new DefaultGroupDescription(
509 oldGroup.deviceId(),
510 oldGroup.type(),
511 updatedBuckets,
512 newCookie,
513 oldGroup.appId());
514 StoredGroupEntry newGroup = new DefaultGroup(oldGroup.id(),
515 updatedGroupDesc);
516 newGroup.setState(GroupState.PENDING_UPDATE);
517 newGroup.setLife(oldGroup.life());
518 newGroup.setPackets(oldGroup.packets());
519 newGroup.setBytes(oldGroup.bytes());
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700520 //Update the group entry in groupkey based map.
521 //Update to groupid based map will happen in the
522 //groupkey based map update listener
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700523 getGroupStoreKeyMap().
524 put(new GroupStoreKeyMapKey(newGroup.deviceId(),
525 newGroup.appCookie()), newGroup);
alshabib10580802015-02-18 18:30:33 -0800526 notifyDelegate(new GroupEvent(Type.GROUP_UPDATE_REQUESTED, newGroup));
527 }
528 }
529
530 private List<GroupBucket> getUpdatedBucketList(Group oldGroup,
531 UpdateType type,
532 GroupBuckets buckets) {
533 GroupBuckets oldBuckets = oldGroup.buckets();
534 List<GroupBucket> newBucketList = new ArrayList<GroupBucket>(
535 oldBuckets.buckets());
536 boolean groupDescUpdated = false;
537
538 if (type == UpdateType.ADD) {
539 // Check if the any of the new buckets are part of
540 // the old bucket list
541 for (GroupBucket addBucket:buckets.buckets()) {
542 if (!newBucketList.contains(addBucket)) {
543 newBucketList.add(addBucket);
544 groupDescUpdated = true;
545 }
546 }
547 } else if (type == UpdateType.REMOVE) {
548 // Check if the to be removed buckets are part of the
549 // old bucket list
550 for (GroupBucket removeBucket:buckets.buckets()) {
551 if (newBucketList.contains(removeBucket)) {
552 newBucketList.remove(removeBucket);
553 groupDescUpdated = true;
554 }
555 }
556 }
557
558 if (groupDescUpdated) {
559 return newBucketList;
560 } else {
561 return null;
562 }
563 }
564
565 /**
566 * Triggers deleting the existing group entry.
567 *
568 * @param deviceId the device ID
569 * @param appCookie the group key
570 */
571 @Override
572 public void deleteGroupDescription(DeviceId deviceId,
573 GroupKey appCookie) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700574 // Check if group to be deleted by a remote instance
575 if (mastershipService.
576 getLocalRole(deviceId) != MastershipRole.MASTER) {
577 GroupStoreMessage groupOp = GroupStoreMessage.
578 createGroupDeleteRequestMsg(deviceId,
579 appCookie);
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700580
581 if (!clusterCommunicator.unicast(groupOp,
582 GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST,
583 m -> kryoBuilder.build().serialize(m),
584 mastershipService.getMasterFor(deviceId))) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700585 log.warn("Failed to send request to master: {} to {}",
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700586 groupOp,
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700587 mastershipService.getMasterFor(deviceId));
588 //TODO: Send Group operation failure event
589 }
590 return;
591 }
592 deleteGroupDescriptionInternal(deviceId, appCookie);
593 }
594
595 private void deleteGroupDescriptionInternal(DeviceId deviceId,
596 GroupKey appCookie) {
alshabib10580802015-02-18 18:30:33 -0800597 // Check if a group is existing with the provided key
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700598 StoredGroupEntry existing = getStoredGroupEntry(deviceId, appCookie);
alshabib10580802015-02-18 18:30:33 -0800599 if (existing == null) {
600 return;
601 }
602
603 synchronized (existing) {
604 existing.setState(GroupState.PENDING_DELETE);
605 }
606 notifyDelegate(new GroupEvent(Type.GROUP_REMOVE_REQUESTED, existing));
607 }
608
609 /**
610 * Stores a new group entry, or updates an existing entry.
611 *
612 * @param group group entry
613 */
614 @Override
615 public void addOrUpdateGroupEntry(Group group) {
616 // check if this new entry is an update to an existing entry
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700617 StoredGroupEntry existing = getStoredGroupEntry(group.deviceId(),
618 group.id());
alshabib10580802015-02-18 18:30:33 -0800619 GroupEvent event = null;
620
621 if (existing != null) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700622 log.trace("addOrUpdateGroupEntry: updating group "
623 + "entry {} in device {}",
624 group.id(),
625 group.deviceId());
alshabib10580802015-02-18 18:30:33 -0800626 synchronized (existing) {
627 existing.setLife(group.life());
628 existing.setPackets(group.packets());
629 existing.setBytes(group.bytes());
630 if (existing.state() == GroupState.PENDING_ADD) {
631 existing.setState(GroupState.ADDED);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700632 existing.setIsGroupStateAddedFirstTime(true);
alshabib10580802015-02-18 18:30:33 -0800633 event = new GroupEvent(Type.GROUP_ADDED, existing);
634 } else {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700635 existing.setState(GroupState.ADDED);
636 existing.setIsGroupStateAddedFirstTime(false);
alshabib10580802015-02-18 18:30:33 -0800637 event = new GroupEvent(Type.GROUP_UPDATED, existing);
638 }
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700639 //Re-PUT map entries to trigger map update events
640 getGroupStoreKeyMap().
641 put(new GroupStoreKeyMapKey(existing.deviceId(),
642 existing.appCookie()), existing);
alshabib10580802015-02-18 18:30:33 -0800643 }
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700644 } else {
645 log.warn("addOrUpdateGroupEntry: Group update "
646 + "happening for a non-existing entry in the map");
alshabib10580802015-02-18 18:30:33 -0800647 }
648
649 if (event != null) {
650 notifyDelegate(event);
651 }
652 }
653
654 /**
655 * Removes the group entry from store.
656 *
657 * @param group group entry
658 */
659 @Override
660 public void removeGroupEntry(Group group) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700661 StoredGroupEntry existing = getStoredGroupEntry(group.deviceId(),
662 group.id());
alshabib10580802015-02-18 18:30:33 -0800663
664 if (existing != null) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700665 log.trace("removeGroupEntry: removing group "
666 + "entry {} in device {}",
667 group.id(),
668 group.deviceId());
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700669 //Removal from groupid based map will happen in the
670 //map update listener
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700671 getGroupStoreKeyMap().remove(new GroupStoreKeyMapKey(existing.deviceId(),
672 existing.appCookie()));
alshabib10580802015-02-18 18:30:33 -0800673 notifyDelegate(new GroupEvent(Type.GROUP_REMOVED, existing));
674 }
675 }
676
677 @Override
678 public void deviceInitialAuditCompleted(DeviceId deviceId,
679 boolean completed) {
680 synchronized (deviceAuditStatus) {
681 if (completed) {
682 log.debug("deviceInitialAuditCompleted: AUDIT "
683 + "completed for device {}", deviceId);
684 deviceAuditStatus.put(deviceId, true);
685 // Execute all pending group requests
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700686 List<StoredGroupEntry> pendingGroupRequests =
687 getPendingGroupKeyTable().values()
688 .stream()
689 .filter(g-> g.deviceId().equals(deviceId))
690 .collect(Collectors.toList());
691 log.trace("deviceInitialAuditCompleted: processing "
692 + "pending group add requests for device {} and "
693 + "number of pending requests {}",
694 deviceId,
695 pendingGroupRequests.size());
696 for (Group group:pendingGroupRequests) {
alshabib10580802015-02-18 18:30:33 -0800697 GroupDescription tmp = new DefaultGroupDescription(
698 group.deviceId(),
699 group.type(),
700 group.buckets(),
701 group.appCookie(),
702 group.appId());
703 storeGroupDescriptionInternal(tmp);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700704 getPendingGroupKeyTable().
705 remove(new GroupStoreKeyMapKey(deviceId, group.appCookie()));
alshabib10580802015-02-18 18:30:33 -0800706 }
alshabib10580802015-02-18 18:30:33 -0800707 } else {
Thomas Vachuskac40d4632015-04-09 16:55:03 -0700708 Boolean audited = deviceAuditStatus.get(deviceId);
709 if (audited != null && audited) {
alshabib10580802015-02-18 18:30:33 -0800710 log.debug("deviceInitialAuditCompleted: Clearing AUDIT "
711 + "status for device {}", deviceId);
712 deviceAuditStatus.put(deviceId, false);
713 }
714 }
715 }
716 }
717
718 @Override
719 public boolean deviceInitialAuditStatus(DeviceId deviceId) {
720 synchronized (deviceAuditStatus) {
Thomas Vachuskac40d4632015-04-09 16:55:03 -0700721 Boolean audited = deviceAuditStatus.get(deviceId);
722 return audited != null && audited;
alshabib10580802015-02-18 18:30:33 -0800723 }
724 }
725
726 @Override
727 public void groupOperationFailed(DeviceId deviceId, GroupOperation operation) {
728
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700729 StoredGroupEntry existing = getStoredGroupEntry(deviceId,
730 operation.groupId());
alshabib10580802015-02-18 18:30:33 -0800731
732 if (existing == null) {
733 log.warn("No group entry with ID {} found ", operation.groupId());
734 return;
735 }
736
737 switch (operation.opType()) {
738 case ADD:
739 notifyDelegate(new GroupEvent(Type.GROUP_ADD_FAILED, existing));
740 break;
741 case MODIFY:
742 notifyDelegate(new GroupEvent(Type.GROUP_UPDATE_FAILED, existing));
743 break;
744 case DELETE:
745 notifyDelegate(new GroupEvent(Type.GROUP_REMOVE_FAILED, existing));
746 break;
747 default:
748 log.warn("Unknown group operation type {}", operation.opType());
749 }
750
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700751 //Removal from groupid based map will happen in the
752 //map update listener
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700753 getGroupStoreKeyMap().remove(new GroupStoreKeyMapKey(existing.deviceId(),
754 existing.appCookie()));
alshabib10580802015-02-18 18:30:33 -0800755 }
756
757 @Override
758 public void addOrUpdateExtraneousGroupEntry(Group group) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700759 log.trace("addOrUpdateExtraneousGroupEntry: add/update extraneous "
760 + "group entry {} in device {}",
761 group.id(),
762 group.deviceId());
alshabib10580802015-02-18 18:30:33 -0800763 ConcurrentMap<GroupId, Group> extraneousIdTable =
764 getExtraneousGroupIdTable(group.deviceId());
765 extraneousIdTable.put(group.id(), group);
766 // Check the reference counter
767 if (group.referenceCount() == 0) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700768 log.trace("addOrUpdateExtraneousGroupEntry: Flow reference "
769 + "counter is zero and triggering remove",
770 group.id(),
771 group.deviceId());
alshabib10580802015-02-18 18:30:33 -0800772 notifyDelegate(new GroupEvent(Type.GROUP_REMOVE_REQUESTED, group));
773 }
774 }
775
776 @Override
777 public void removeExtraneousGroupEntry(Group group) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700778 log.trace("removeExtraneousGroupEntry: remove extraneous "
779 + "group entry {} of device {} from store",
780 group.id(),
781 group.deviceId());
alshabib10580802015-02-18 18:30:33 -0800782 ConcurrentMap<GroupId, Group> extraneousIdTable =
783 getExtraneousGroupIdTable(group.deviceId());
784 extraneousIdTable.remove(group.id());
785 }
786
787 @Override
788 public Iterable<Group> getExtraneousGroups(DeviceId deviceId) {
789 // flatten and make iterator unmodifiable
790 return FluentIterable.from(
791 getExtraneousGroupIdTable(deviceId).values());
792 }
793
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700794 /**
795 * ClockService that generates wallclock based timestamps.
796 */
797 private class GroupStoreLogicalClockManager<T, U>
798 implements ClockService<T, U> {
alshabib10580802015-02-18 18:30:33 -0800799
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700800 private final AtomicLong sequenceNumber = new AtomicLong(0);
801
802 @Override
803 public Timestamp getTimestamp(T t1, U u1) {
804 return new MultiValuedTimestamp<>(System.currentTimeMillis(),
805 sequenceNumber.getAndIncrement());
806 }
807 }
808
809 /**
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700810 * Map handler to receive any events when the group key map is updated.
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700811 */
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700812 private class GroupStoreKeyMapListener implements
813 EventuallyConsistentMapListener<GroupStoreKeyMapKey, StoredGroupEntry> {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700814
815 @Override
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700816 public void event(EventuallyConsistentMapEvent<GroupStoreKeyMapKey,
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700817 StoredGroupEntry> mapEvent) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700818 GroupEvent groupEvent = null;
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700819 StoredGroupEntry group = mapEvent.value();
820 log.trace("GroupStoreKeyMapListener: received groupid map event {}",
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700821 mapEvent.type());
822 if (mapEvent.type() == EventuallyConsistentMapEvent.Type.PUT) {
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700823 log.trace("GroupStoreKeyMapListener: Received PUT event");
824 // Update the group ID table
825 getGroupIdTable(group.deviceId()).put(group.id(), group);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700826 if (mapEvent.value().state() == Group.GroupState.ADDED) {
827 if (mapEvent.value().isGroupStateAddedFirstTime()) {
828 groupEvent = new GroupEvent(Type.GROUP_ADDED,
829 mapEvent.value());
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700830 log.trace("GroupStoreKeyMapListener: Received first time "
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700831 + "GROUP_ADDED state update");
832 } else {
833 groupEvent = new GroupEvent(Type.GROUP_UPDATED,
834 mapEvent.value());
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700835 log.trace("GroupStoreKeyMapListener: Received following "
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700836 + "GROUP_ADDED state update");
837 }
838 }
839 } else if (mapEvent.type() == EventuallyConsistentMapEvent.Type.REMOVE) {
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700840 log.trace("GroupStoreKeyMapListener: Received REMOVE event");
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700841 groupEvent = new GroupEvent(Type.GROUP_REMOVED, mapEvent.value());
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700842 // Remove the entry from the group ID table
843 getGroupIdTable(group.deviceId()).remove(group.id(), group);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700844 }
845
846 if (groupEvent != null) {
847 notifyDelegate(groupEvent);
848 }
849 }
850 }
851 /**
852 * Message handler to receive messages from group subsystems of
853 * other cluster members.
854 */
855 private final class ClusterGroupMsgHandler
856 implements ClusterMessageHandler {
857 @Override
858 public void handle(ClusterMessage message) {
859 log.trace("ClusterGroupMsgHandler: received remote group message");
860 if (message.subject() ==
861 GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST) {
862 GroupStoreMessage groupOp = kryoBuilder.
863 build().deserialize(message.payload());
864 log.trace("received remote group operation request");
865 if (!(mastershipService.
866 getLocalRole(groupOp.deviceId()) !=
867 MastershipRole.MASTER)) {
868 log.warn("ClusterGroupMsgHandler: This node is not "
869 + "MASTER for device {}", groupOp.deviceId());
870 return;
871 }
872 if (groupOp.type() == GroupStoreMessage.Type.ADD) {
873 log.trace("processing remote group "
874 + "add operation request");
875 storeGroupDescriptionInternal(groupOp.groupDesc());
876 } else if (groupOp.type() == GroupStoreMessage.Type.UPDATE) {
877 log.trace("processing remote group "
878 + "update operation request");
879 updateGroupDescriptionInternal(groupOp.deviceId(),
880 groupOp.appCookie(),
881 groupOp.updateType(),
882 groupOp.updateBuckets(),
883 groupOp.newAppCookie());
884 } else if (groupOp.type() == GroupStoreMessage.Type.DELETE) {
885 log.trace("processing remote group "
886 + "delete operation request");
887 deleteGroupDescriptionInternal(groupOp.deviceId(),
888 groupOp.appCookie());
889 }
890 }
891 }
892 }
893
894 /**
895 * Flattened map key to be used to store group entries.
896 */
897 private class GroupStoreMapKey {
898 private final DeviceId deviceId;
899
900 public GroupStoreMapKey(DeviceId deviceId) {
901 this.deviceId = deviceId;
902 }
903
904 @Override
905 public boolean equals(Object o) {
906 if (this == o) {
907 return true;
908 }
909 if (!(o instanceof GroupStoreMapKey)) {
910 return false;
911 }
912 GroupStoreMapKey that = (GroupStoreMapKey) o;
913 return this.deviceId.equals(that.deviceId);
914 }
915
916 @Override
917 public int hashCode() {
918 int result = 17;
919
920 result = 31 * result + Objects.hash(this.deviceId);
921
922 return result;
923 }
924 }
925
926 private class GroupStoreKeyMapKey extends GroupStoreMapKey {
927 private final GroupKey appCookie;
928 public GroupStoreKeyMapKey(DeviceId deviceId,
929 GroupKey appCookie) {
930 super(deviceId);
931 this.appCookie = appCookie;
932 }
933
934 @Override
935 public boolean equals(Object o) {
936 if (this == o) {
937 return true;
938 }
939 if (!(o instanceof GroupStoreKeyMapKey)) {
940 return false;
941 }
942 GroupStoreKeyMapKey that = (GroupStoreKeyMapKey) o;
943 return (super.equals(that) &&
944 this.appCookie.equals(that.appCookie));
945 }
946
947 @Override
948 public int hashCode() {
949 int result = 17;
950
951 result = 31 * result + super.hashCode() + Objects.hash(this.appCookie);
952
953 return result;
954 }
955 }
956
957 private class GroupStoreIdMapKey extends GroupStoreMapKey {
958 private final GroupId groupId;
959 public GroupStoreIdMapKey(DeviceId deviceId,
960 GroupId groupId) {
961 super(deviceId);
962 this.groupId = groupId;
963 }
964
965 @Override
966 public boolean equals(Object o) {
967 if (this == o) {
968 return true;
969 }
970 if (!(o instanceof GroupStoreIdMapKey)) {
971 return false;
972 }
973 GroupStoreIdMapKey that = (GroupStoreIdMapKey) o;
974 return (super.equals(that) &&
975 this.groupId.equals(that.groupId));
976 }
977
978 @Override
979 public int hashCode() {
980 int result = 17;
981
982 result = 31 * result + super.hashCode() + Objects.hash(this.groupId);
983
984 return result;
985 }
986 }
alshabib10580802015-02-18 18:30:33 -0800987}