blob: 5551ecf60243c46c90bde9eebb21144295b3f3e2 [file] [log] [blame]
alshabib10580802015-02-18 18:30:33 -08001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.group.impl;
17
Jonathan Hart6ec029a2015-03-24 17:12:35 -070018import com.google.common.collect.FluentIterable;
19import com.google.common.collect.Iterables;
Srikanth Vavilapalli23181912015-05-04 09:48:09 -070020import com.google.common.collect.Sets;
Srikanth Vavilapalli10e75cd2015-04-13 16:21:24 -070021
alshabib10580802015-02-18 18:30:33 -080022import org.apache.felix.scr.annotations.Activate;
23import org.apache.felix.scr.annotations.Component;
24import org.apache.felix.scr.annotations.Deactivate;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070025import org.apache.felix.scr.annotations.Reference;
26import org.apache.felix.scr.annotations.ReferenceCardinality;
alshabib10580802015-02-18 18:30:33 -080027import org.apache.felix.scr.annotations.Service;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070028import org.onlab.util.KryoNamespace;
alshabib10580802015-02-18 18:30:33 -080029import org.onlab.util.NewConcurrentHashMap;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070030import org.onosproject.cluster.ClusterService;
alshabib10580802015-02-18 18:30:33 -080031import org.onosproject.core.DefaultGroupId;
32import org.onosproject.core.GroupId;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070033import org.onosproject.mastership.MastershipService;
alshabib10580802015-02-18 18:30:33 -080034import org.onosproject.net.DeviceId;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070035import org.onosproject.net.MastershipRole;
alshabib10580802015-02-18 18:30:33 -080036import org.onosproject.net.group.DefaultGroup;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070037import org.onosproject.net.group.DefaultGroupBucket;
alshabib10580802015-02-18 18:30:33 -080038import org.onosproject.net.group.DefaultGroupDescription;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070039import org.onosproject.net.group.DefaultGroupKey;
alshabib10580802015-02-18 18:30:33 -080040import org.onosproject.net.group.Group;
41import org.onosproject.net.group.Group.GroupState;
42import org.onosproject.net.group.GroupBucket;
43import org.onosproject.net.group.GroupBuckets;
44import org.onosproject.net.group.GroupDescription;
45import org.onosproject.net.group.GroupEvent;
46import org.onosproject.net.group.GroupEvent.Type;
47import org.onosproject.net.group.GroupKey;
48import org.onosproject.net.group.GroupOperation;
49import org.onosproject.net.group.GroupStore;
50import org.onosproject.net.group.GroupStoreDelegate;
Srikanth Vavilapalli10e75cd2015-04-13 16:21:24 -070051import org.onosproject.net.group.StoredGroupBucketEntry;
alshabib10580802015-02-18 18:30:33 -080052import org.onosproject.net.group.StoredGroupEntry;
53import org.onosproject.store.AbstractStore;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070054import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
Jonathan Hart63939a32015-05-08 11:57:03 -070055import org.onosproject.store.service.MultiValuedTimestamp;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070056import org.onosproject.store.serializers.KryoNamespaces;
Jonathan Hart6ec029a2015-03-24 17:12:35 -070057import org.onosproject.store.service.EventuallyConsistentMap;
58import org.onosproject.store.service.EventuallyConsistentMapBuilder;
59import org.onosproject.store.service.EventuallyConsistentMapEvent;
60import org.onosproject.store.service.EventuallyConsistentMapListener;
61import org.onosproject.store.service.StorageService;
alshabib10580802015-02-18 18:30:33 -080062import org.slf4j.Logger;
63
Jonathan Hart6ec029a2015-03-24 17:12:35 -070064import java.util.ArrayList;
Srikanth Vavilapalli23181912015-05-04 09:48:09 -070065import java.util.Collection;
Jonathan Hart6ec029a2015-03-24 17:12:35 -070066import java.util.HashMap;
Srikanth Vavilapalli23181912015-05-04 09:48:09 -070067import java.util.Iterator;
Jonathan Hart6ec029a2015-03-24 17:12:35 -070068import java.util.List;
69import java.util.Objects;
Sho SHIMIZU30d639b2015-05-05 09:30:35 -070070import java.util.Optional;
Srikanth Vavilapalli23181912015-05-04 09:48:09 -070071import java.util.Set;
Jonathan Hart6ec029a2015-03-24 17:12:35 -070072import java.util.concurrent.ConcurrentHashMap;
73import java.util.concurrent.ConcurrentMap;
74import java.util.concurrent.ExecutorService;
75import java.util.concurrent.Executors;
76import java.util.concurrent.atomic.AtomicInteger;
77import java.util.concurrent.atomic.AtomicLong;
78import java.util.stream.Collectors;
79
80import static org.apache.commons.lang3.concurrent.ConcurrentUtils.createIfAbsentUnchecked;
81import static org.onlab.util.Tools.groupedThreads;
82import static org.slf4j.LoggerFactory.getLogger;
alshabib10580802015-02-18 18:30:33 -080083
84/**
85 * Manages inventory of group entries using trivial in-memory implementation.
86 */
87@Component(immediate = true)
88@Service
89public class DistributedGroupStore
90 extends AbstractStore<GroupEvent, GroupStoreDelegate>
91 implements GroupStore {
92
93 private final Logger log = getLogger(getClass());
94
95 private final int dummyId = 0xffffffff;
96 private final GroupId dummyGroupId = new DefaultGroupId(dummyId);
97
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070098 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
99 protected ClusterCommunicationService clusterCommunicator;
100
101 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
102 protected ClusterService clusterService;
103
104 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700105 protected StorageService storageService;
106
107 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700108 protected MastershipService mastershipService;
109
110 // Per device group table with (device id + app cookie) as key
111 private EventuallyConsistentMap<GroupStoreKeyMapKey,
112 StoredGroupEntry> groupStoreEntriesByKey = null;
113 // Per device group table with (device id + group id) as key
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700114 private final ConcurrentMap<DeviceId, ConcurrentMap<GroupId, StoredGroupEntry>>
115 groupEntriesById = new ConcurrentHashMap<>();
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700116 private EventuallyConsistentMap<GroupStoreKeyMapKey,
117 StoredGroupEntry> auditPendingReqQueue = null;
alshabib10580802015-02-18 18:30:33 -0800118 private final ConcurrentMap<DeviceId, ConcurrentMap<GroupId, Group>>
119 extraneousGroupEntriesById = new ConcurrentHashMap<>();
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700120 private ExecutorService messageHandlingExecutor;
121 private static final int MESSAGE_HANDLER_THREAD_POOL_SIZE = 1;
alshabib10580802015-02-18 18:30:33 -0800122
Sho SHIMIZU7a4087b2015-09-10 09:23:16 -0700123 private final HashMap<DeviceId, Boolean> deviceAuditStatus = new HashMap<>();
alshabib10580802015-02-18 18:30:33 -0800124
125 private final AtomicInteger groupIdGen = new AtomicInteger();
126
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700127 private KryoNamespace.Builder kryoBuilder = null;
128
Madan Jampanibcf1a482015-06-24 19:05:56 -0700129 private final AtomicLong sequenceNumber = new AtomicLong(0);
130
alshabib10580802015-02-18 18:30:33 -0800131 @Activate
132 public void activate() {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700133 kryoBuilder = new KryoNamespace.Builder()
Charles Chan138cd5a2015-09-29 16:57:41 -0700134 .register(KryoNamespaces.API)
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700135 .register(DefaultGroup.class,
136 DefaultGroupBucket.class,
137 DefaultGroupDescription.class,
138 DefaultGroupKey.class,
139 GroupDescription.Type.class,
140 Group.GroupState.class,
141 GroupBuckets.class,
142 DefaultGroupId.class,
143 GroupStoreMessage.class,
144 GroupStoreMessage.Type.class,
145 UpdateType.class,
146 GroupStoreMessageSubjects.class,
147 MultiValuedTimestamp.class,
148 GroupStoreKeyMapKey.class,
149 GroupStoreIdMapKey.class,
150 GroupStoreMapKey.class
Charles Chan138cd5a2015-09-29 16:57:41 -0700151 );
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700152
153 messageHandlingExecutor = Executors.
154 newFixedThreadPool(MESSAGE_HANDLER_THREAD_POOL_SIZE,
155 groupedThreads("onos/store/group",
156 "message-handlers"));
Madan Jampani01e05fb2015-08-13 13:29:36 -0700157
158 clusterCommunicator.addSubscriber(GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST,
159 kryoBuilder.build()::deserialize,
160 this::process,
161 messageHandlingExecutor);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700162
163 log.debug("Creating EC map groupstorekeymap");
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700164 EventuallyConsistentMapBuilder<GroupStoreKeyMapKey, StoredGroupEntry>
165 keyMapBuilder = storageService.eventuallyConsistentMapBuilder();
166
167 groupStoreEntriesByKey = keyMapBuilder
168 .withName("groupstorekeymap")
169 .withSerializer(kryoBuilder)
Madan Jampanibcf1a482015-06-24 19:05:56 -0700170 .withTimestampProvider((k, v) -> new MultiValuedTimestamp<>(System.currentTimeMillis(),
171 sequenceNumber.getAndIncrement()))
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700172 .build();
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700173 groupStoreEntriesByKey.addListener(new GroupStoreKeyMapListener());
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700174 log.debug("Current size of groupstorekeymap:{}",
175 groupStoreEntriesByKey.size());
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700176
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700177 log.debug("Creating EC map pendinggroupkeymap");
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700178 EventuallyConsistentMapBuilder<GroupStoreKeyMapKey, StoredGroupEntry>
179 auditMapBuilder = storageService.eventuallyConsistentMapBuilder();
180
181 auditPendingReqQueue = auditMapBuilder
182 .withName("pendinggroupkeymap")
183 .withSerializer(kryoBuilder)
Madan Jampanibcf1a482015-06-24 19:05:56 -0700184 .withTimestampProvider((k, v) -> new MultiValuedTimestamp<>(System.currentTimeMillis(),
185 sequenceNumber.getAndIncrement()))
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700186 .build();
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700187 log.debug("Current size of pendinggroupkeymap:{}",
188 auditPendingReqQueue.size());
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700189
alshabib10580802015-02-18 18:30:33 -0800190 log.info("Started");
191 }
192
193 @Deactivate
194 public void deactivate() {
Thomas Vachuska152f9fd2015-04-02 16:28:13 -0700195 groupStoreEntriesByKey.destroy();
Thomas Vachuska152f9fd2015-04-02 16:28:13 -0700196 auditPendingReqQueue.destroy();
alshabib10580802015-02-18 18:30:33 -0800197 log.info("Stopped");
198 }
199
alshabib10580802015-02-18 18:30:33 -0800200 private static NewConcurrentHashMap<GroupId, Group>
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700201 lazyEmptyExtraneousGroupIdTable() {
alshabib10580802015-02-18 18:30:33 -0800202 return NewConcurrentHashMap.<GroupId, Group>ifNeeded();
203 }
204
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700205 private static NewConcurrentHashMap<GroupId, StoredGroupEntry>
206 lazyEmptyGroupIdTable() {
207 return NewConcurrentHashMap.<GroupId, StoredGroupEntry>ifNeeded();
208 }
209
alshabib10580802015-02-18 18:30:33 -0800210 /**
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700211 * Returns the group store eventual consistent key map.
alshabib10580802015-02-18 18:30:33 -0800212 *
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700213 * @return Map representing group key table.
alshabib10580802015-02-18 18:30:33 -0800214 */
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700215 private EventuallyConsistentMap<GroupStoreKeyMapKey, StoredGroupEntry>
216 getGroupStoreKeyMap() {
217 return groupStoreEntriesByKey;
alshabib10580802015-02-18 18:30:33 -0800218 }
219
220 /**
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700221 * Returns the group id table for specified device.
alshabib10580802015-02-18 18:30:33 -0800222 *
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700223 * @param deviceId identifier of the device
224 * @return Map representing group key table of given device.
alshabib10580802015-02-18 18:30:33 -0800225 */
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700226 private ConcurrentMap<GroupId, StoredGroupEntry> getGroupIdTable(DeviceId deviceId) {
227 return createIfAbsentUnchecked(groupEntriesById,
228 deviceId, lazyEmptyGroupIdTable());
alshabib10580802015-02-18 18:30:33 -0800229 }
230
231 /**
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700232 * Returns the pending group request table.
alshabib10580802015-02-18 18:30:33 -0800233 *
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700234 * @return Map representing group key table.
alshabib10580802015-02-18 18:30:33 -0800235 */
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700236 private EventuallyConsistentMap<GroupStoreKeyMapKey, StoredGroupEntry>
237 getPendingGroupKeyTable() {
238 return auditPendingReqQueue;
alshabib10580802015-02-18 18:30:33 -0800239 }
240
241 /**
242 * Returns the extraneous group id table for specified device.
243 *
244 * @param deviceId identifier of the device
245 * @return Map representing group key table of given device.
246 */
247 private ConcurrentMap<GroupId, Group>
248 getExtraneousGroupIdTable(DeviceId deviceId) {
249 return createIfAbsentUnchecked(extraneousGroupEntriesById,
250 deviceId,
251 lazyEmptyExtraneousGroupIdTable());
252 }
253
254 /**
255 * Returns the number of groups for the specified device in the store.
256 *
257 * @return number of groups for the specified device
258 */
259 @Override
260 public int getGroupCount(DeviceId deviceId) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700261 return (getGroups(deviceId) != null) ?
262 Iterables.size(getGroups(deviceId)) : 0;
alshabib10580802015-02-18 18:30:33 -0800263 }
264
265 /**
266 * Returns the groups associated with a device.
267 *
268 * @param deviceId the device ID
269 *
270 * @return the group entries
271 */
272 @Override
273 public Iterable<Group> getGroups(DeviceId deviceId) {
274 // flatten and make iterator unmodifiable
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700275 return FluentIterable.from(getGroupStoreKeyMap().values())
276 .filter(input -> input.deviceId().equals(deviceId))
277 .transform(input -> input);
alshabib10580802015-02-18 18:30:33 -0800278 }
279
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700280 private Iterable<StoredGroupEntry> getStoredGroups(DeviceId deviceId) {
281 // flatten and make iterator unmodifiable
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700282 return FluentIterable.from(getGroupStoreKeyMap().values())
283 .filter(input -> input.deviceId().equals(deviceId));
284 }
285
alshabib10580802015-02-18 18:30:33 -0800286 /**
287 * Returns the stored group entry.
288 *
289 * @param deviceId the device ID
290 * @param appCookie the group key
291 *
292 * @return a group associated with the key
293 */
294 @Override
295 public Group getGroup(DeviceId deviceId, GroupKey appCookie) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700296 return getStoredGroupEntry(deviceId, appCookie);
297 }
298
299 private StoredGroupEntry getStoredGroupEntry(DeviceId deviceId,
300 GroupKey appCookie) {
301 return getGroupStoreKeyMap().get(new GroupStoreKeyMapKey(deviceId,
302 appCookie));
303 }
304
305 @Override
306 public Group getGroup(DeviceId deviceId, GroupId groupId) {
307 return getStoredGroupEntry(deviceId, groupId);
308 }
309
310 private StoredGroupEntry getStoredGroupEntry(DeviceId deviceId,
311 GroupId groupId) {
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700312 return getGroupIdTable(deviceId).get(groupId);
alshabib10580802015-02-18 18:30:33 -0800313 }
314
315 private int getFreeGroupIdValue(DeviceId deviceId) {
316 int freeId = groupIdGen.incrementAndGet();
317
318 while (true) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700319 Group existing = getGroup(deviceId, new DefaultGroupId(freeId));
alshabib10580802015-02-18 18:30:33 -0800320 if (existing == null) {
321 existing = (
322 extraneousGroupEntriesById.get(deviceId) != null) ?
323 extraneousGroupEntriesById.get(deviceId).
324 get(new DefaultGroupId(freeId)) :
325 null;
326 }
327 if (existing != null) {
328 freeId = groupIdGen.incrementAndGet();
329 } else {
330 break;
331 }
332 }
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700333 log.debug("getFreeGroupIdValue: Next Free ID is {}", freeId);
alshabib10580802015-02-18 18:30:33 -0800334 return freeId;
335 }
336
337 /**
338 * Stores a new group entry using the information from group description.
339 *
340 * @param groupDesc group description to be used to create group entry
341 */
342 @Override
343 public void storeGroupDescription(GroupDescription groupDesc) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700344 log.debug("In storeGroupDescription");
alshabib10580802015-02-18 18:30:33 -0800345 // Check if a group is existing with the same key
346 if (getGroup(groupDesc.deviceId(), groupDesc.appCookie()) != null) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700347 log.warn("Group already exists with the same key {}",
348 groupDesc.appCookie());
alshabib10580802015-02-18 18:30:33 -0800349 return;
350 }
351
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700352 // Check if group to be created by a remote instance
Madan Jampani175e8fd2015-05-20 14:10:45 -0700353 if (mastershipService.getLocalRole(groupDesc.deviceId()) != MastershipRole.MASTER) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700354 log.debug("storeGroupDescription: Device {} local role is not MASTER",
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700355 groupDesc.deviceId());
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700356 if (mastershipService.getMasterFor(groupDesc.deviceId()) == null) {
357 log.error("No Master for device {}..."
358 + "Can not perform add group operation",
359 groupDesc.deviceId());
360 //TODO: Send Group operation failure event
361 return;
362 }
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700363 GroupStoreMessage groupOp = GroupStoreMessage.
364 createGroupAddRequestMsg(groupDesc.deviceId(),
365 groupDesc);
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700366
Madan Jampani175e8fd2015-05-20 14:10:45 -0700367 clusterCommunicator.unicast(groupOp,
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700368 GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST,
369 m -> kryoBuilder.build().serialize(m),
Madan Jampani175e8fd2015-05-20 14:10:45 -0700370 mastershipService.getMasterFor(groupDesc.deviceId())).whenComplete((result, error) -> {
371 if (error != null) {
372 log.warn("Failed to send request to master: {} to {}",
373 groupOp,
374 mastershipService.getMasterFor(groupDesc.deviceId()));
375 //TODO: Send Group operation failure event
376 } else {
377 log.debug("Sent Group operation request for device {} "
378 + "to remote MASTER {}",
379 groupDesc.deviceId(),
380 mastershipService.getMasterFor(groupDesc.deviceId()));
381 }
382 });
alshabib10580802015-02-18 18:30:33 -0800383 return;
384 }
385
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700386 log.debug("Store group for device {} is getting handled locally",
387 groupDesc.deviceId());
alshabib10580802015-02-18 18:30:33 -0800388 storeGroupDescriptionInternal(groupDesc);
389 }
390
Srikanth Vavilapallie48b3cf2015-07-06 11:43:07 -0700391 private Group getMatchingExtraneousGroupbyId(DeviceId deviceId, Integer groupId) {
392 ConcurrentMap<GroupId, Group> extraneousMap =
393 extraneousGroupEntriesById.get(deviceId);
394 if (extraneousMap == null) {
395 return null;
396 }
397 return extraneousMap.get(new DefaultGroupId(groupId));
398 }
399
400 private Group getMatchingExtraneousGroupbyBuckets(DeviceId deviceId,
401 GroupBuckets buckets) {
402 ConcurrentMap<GroupId, Group> extraneousMap =
403 extraneousGroupEntriesById.get(deviceId);
404 if (extraneousMap == null) {
405 return null;
406 }
407
408 for (Group extraneousGroup:extraneousMap.values()) {
409 if (extraneousGroup.buckets().equals(buckets)) {
410 return extraneousGroup;
411 }
412 }
413 return null;
414 }
415
alshabib10580802015-02-18 18:30:33 -0800416 private void storeGroupDescriptionInternal(GroupDescription groupDesc) {
417 // Check if a group is existing with the same key
418 if (getGroup(groupDesc.deviceId(), groupDesc.appCookie()) != null) {
419 return;
420 }
421
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700422 if (deviceAuditStatus.get(groupDesc.deviceId()) == null) {
423 // Device group audit has not completed yet
424 // Add this group description to pending group key table
425 // Create a group entry object with Dummy Group ID
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700426 log.debug("storeGroupDescriptionInternal: Device {} AUDIT pending...Queuing Group ADD request",
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700427 groupDesc.deviceId());
428 StoredGroupEntry group = new DefaultGroup(dummyGroupId, groupDesc);
429 group.setState(GroupState.WAITING_AUDIT_COMPLETE);
430 EventuallyConsistentMap<GroupStoreKeyMapKey, StoredGroupEntry> pendingKeyTable =
431 getPendingGroupKeyTable();
432 pendingKeyTable.put(new GroupStoreKeyMapKey(groupDesc.deviceId(),
433 groupDesc.appCookie()),
434 group);
435 return;
436 }
437
Srikanth Vavilapallie48b3cf2015-07-06 11:43:07 -0700438 Group matchingExtraneousGroup = null;
439 if (groupDesc.givenGroupId() != null) {
440 //Check if there is a extraneous group existing with the same Id
441 matchingExtraneousGroup = getMatchingExtraneousGroupbyId(
442 groupDesc.deviceId(), groupDesc.givenGroupId());
443 if (matchingExtraneousGroup != null) {
444 log.debug("storeGroupDescriptionInternal: Matching extraneous group found in Device {} for group id {}",
445 groupDesc.deviceId(),
446 groupDesc.givenGroupId());
447 //Check if the group buckets matches with user provided buckets
448 if (matchingExtraneousGroup.buckets().equals(groupDesc.buckets())) {
449 //Group is already existing with the same buckets and Id
450 // Create a group entry object
451 log.debug("storeGroupDescriptionInternal: Buckets also matching in Device {} for group id {}",
452 groupDesc.deviceId(),
453 groupDesc.givenGroupId());
454 StoredGroupEntry group = new DefaultGroup(
455 matchingExtraneousGroup.id(), groupDesc);
456 // Insert the newly created group entry into key and id maps
457 getGroupStoreKeyMap().
458 put(new GroupStoreKeyMapKey(groupDesc.deviceId(),
459 groupDesc.appCookie()), group);
460 // Ensure it also inserted into group id based table to
461 // avoid any chances of duplication in group id generation
462 getGroupIdTable(groupDesc.deviceId()).
463 put(matchingExtraneousGroup.id(), group);
464 addOrUpdateGroupEntry(matchingExtraneousGroup);
465 removeExtraneousGroupEntry(matchingExtraneousGroup);
466 return;
467 } else {
468 //Group buckets are not matching. Update group
469 //with user provided buckets.
470 //TODO
471 log.debug("storeGroupDescriptionInternal: Buckets are not matching in Device {} for group id {}",
472 groupDesc.deviceId(),
473 groupDesc.givenGroupId());
474 }
475 }
476 } else {
477 //Check if there is an extraneous group with user provided buckets
478 matchingExtraneousGroup = getMatchingExtraneousGroupbyBuckets(
479 groupDesc.deviceId(), groupDesc.buckets());
480 if (matchingExtraneousGroup != null) {
481 //Group is already existing with the same buckets.
482 //So reuse this group.
483 log.debug("storeGroupDescriptionInternal: Matching extraneous group found in Device {}",
484 groupDesc.deviceId());
485 //Create a group entry object
486 StoredGroupEntry group = new DefaultGroup(
487 matchingExtraneousGroup.id(), groupDesc);
488 // Insert the newly created group entry into key and id maps
489 getGroupStoreKeyMap().
490 put(new GroupStoreKeyMapKey(groupDesc.deviceId(),
491 groupDesc.appCookie()), group);
492 // Ensure it also inserted into group id based table to
493 // avoid any chances of duplication in group id generation
494 getGroupIdTable(groupDesc.deviceId()).
495 put(matchingExtraneousGroup.id(), group);
496 addOrUpdateGroupEntry(matchingExtraneousGroup);
497 removeExtraneousGroupEntry(matchingExtraneousGroup);
498 return;
499 } else {
500 //TODO: Check if there are any empty groups that can be used here
501 log.debug("storeGroupDescriptionInternal: No matching extraneous groups found in Device {}",
502 groupDesc.deviceId());
503 }
504 }
505
Saurav Das100e3b82015-04-30 11:12:10 -0700506 GroupId id = null;
507 if (groupDesc.givenGroupId() == null) {
508 // Get a new group identifier
509 id = new DefaultGroupId(getFreeGroupIdValue(groupDesc.deviceId()));
510 } else {
511 id = new DefaultGroupId(groupDesc.givenGroupId());
512 }
alshabib10580802015-02-18 18:30:33 -0800513 // Create a group entry object
514 StoredGroupEntry group = new DefaultGroup(id, groupDesc);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700515 // Insert the newly created group entry into key and id maps
516 getGroupStoreKeyMap().
517 put(new GroupStoreKeyMapKey(groupDesc.deviceId(),
518 groupDesc.appCookie()), group);
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700519 // Ensure it also inserted into group id based table to
520 // avoid any chances of duplication in group id generation
521 getGroupIdTable(groupDesc.deviceId()).
522 put(id, group);
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700523 log.debug("storeGroupDescriptionInternal: Processing Group ADD request for Id {} in device {}",
524 id,
525 groupDesc.deviceId());
alshabib10580802015-02-18 18:30:33 -0800526 notifyDelegate(new GroupEvent(GroupEvent.Type.GROUP_ADD_REQUESTED,
527 group));
528 }
529
530 /**
531 * Updates the existing group entry with the information
532 * from group description.
533 *
534 * @param deviceId the device ID
535 * @param oldAppCookie the current group key
536 * @param type update type
537 * @param newBuckets group buckets for updates
538 * @param newAppCookie optional new group key
539 */
540 @Override
541 public void updateGroupDescription(DeviceId deviceId,
542 GroupKey oldAppCookie,
543 UpdateType type,
544 GroupBuckets newBuckets,
545 GroupKey newAppCookie) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700546 // Check if group update to be done by a remote instance
sangho52abe3a2015-05-05 14:13:34 -0700547 if (mastershipService.getMasterFor(deviceId) != null &&
548 mastershipService.getLocalRole(deviceId) != MastershipRole.MASTER) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700549 log.debug("updateGroupDescription: Device {} local role is not MASTER",
550 deviceId);
551 if (mastershipService.getMasterFor(deviceId) == null) {
552 log.error("No Master for device {}..."
553 + "Can not perform update group operation",
554 deviceId);
555 //TODO: Send Group operation failure event
556 return;
557 }
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700558 GroupStoreMessage groupOp = GroupStoreMessage.
559 createGroupUpdateRequestMsg(deviceId,
560 oldAppCookie,
561 type,
562 newBuckets,
563 newAppCookie);
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700564
Madan Jampani175e8fd2015-05-20 14:10:45 -0700565 clusterCommunicator.unicast(groupOp,
566 GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST,
567 m -> kryoBuilder.build().serialize(m),
568 mastershipService.getMasterFor(deviceId)).whenComplete((result, error) -> {
569 if (error != null) {
570 log.warn("Failed to send request to master: {} to {}",
571 groupOp,
572 mastershipService.getMasterFor(deviceId), error);
573 }
574 //TODO: Send Group operation failure event
575 });
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700576 return;
577 }
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700578 log.debug("updateGroupDescription for device {} is getting handled locally",
579 deviceId);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700580 updateGroupDescriptionInternal(deviceId,
581 oldAppCookie,
582 type,
583 newBuckets,
584 newAppCookie);
585 }
586
587 private void updateGroupDescriptionInternal(DeviceId deviceId,
588 GroupKey oldAppCookie,
589 UpdateType type,
590 GroupBuckets newBuckets,
591 GroupKey newAppCookie) {
alshabib10580802015-02-18 18:30:33 -0800592 // Check if a group is existing with the provided key
593 Group oldGroup = getGroup(deviceId, oldAppCookie);
594 if (oldGroup == null) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700595 log.warn("updateGroupDescriptionInternal: Group not found...strange");
alshabib10580802015-02-18 18:30:33 -0800596 return;
597 }
598
599 List<GroupBucket> newBucketList = getUpdatedBucketList(oldGroup,
600 type,
601 newBuckets);
602 if (newBucketList != null) {
603 // Create a new group object from the old group
604 GroupBuckets updatedBuckets = new GroupBuckets(newBucketList);
605 GroupKey newCookie = (newAppCookie != null) ? newAppCookie : oldAppCookie;
606 GroupDescription updatedGroupDesc = new DefaultGroupDescription(
607 oldGroup.deviceId(),
608 oldGroup.type(),
609 updatedBuckets,
610 newCookie,
Saurav Das100e3b82015-04-30 11:12:10 -0700611 oldGroup.givenGroupId(),
alshabib10580802015-02-18 18:30:33 -0800612 oldGroup.appId());
613 StoredGroupEntry newGroup = new DefaultGroup(oldGroup.id(),
614 updatedGroupDesc);
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700615 log.debug("updateGroupDescriptionInternal: group entry {} in device {} moving from {} to PENDING_UPDATE",
616 oldGroup.id(),
617 oldGroup.deviceId(),
618 oldGroup.state());
alshabib10580802015-02-18 18:30:33 -0800619 newGroup.setState(GroupState.PENDING_UPDATE);
620 newGroup.setLife(oldGroup.life());
621 newGroup.setPackets(oldGroup.packets());
622 newGroup.setBytes(oldGroup.bytes());
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700623 //Update the group entry in groupkey based map.
624 //Update to groupid based map will happen in the
625 //groupkey based map update listener
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700626 log.debug("updateGroupDescriptionInternal with type {}: Group updated with buckets",
627 type);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700628 getGroupStoreKeyMap().
629 put(new GroupStoreKeyMapKey(newGroup.deviceId(),
630 newGroup.appCookie()), newGroup);
alshabib10580802015-02-18 18:30:33 -0800631 notifyDelegate(new GroupEvent(Type.GROUP_UPDATE_REQUESTED, newGroup));
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700632 } else {
633 log.warn("updateGroupDescriptionInternal with type {}: No "
634 + "change in the buckets in update", type);
alshabib10580802015-02-18 18:30:33 -0800635 }
636 }
637
638 private List<GroupBucket> getUpdatedBucketList(Group oldGroup,
639 UpdateType type,
640 GroupBuckets buckets) {
641 GroupBuckets oldBuckets = oldGroup.buckets();
Sho SHIMIZU7a4087b2015-09-10 09:23:16 -0700642 List<GroupBucket> newBucketList = new ArrayList<>(oldBuckets.buckets());
alshabib10580802015-02-18 18:30:33 -0800643 boolean groupDescUpdated = false;
644
645 if (type == UpdateType.ADD) {
646 // Check if the any of the new buckets are part of
647 // the old bucket list
648 for (GroupBucket addBucket:buckets.buckets()) {
649 if (!newBucketList.contains(addBucket)) {
650 newBucketList.add(addBucket);
651 groupDescUpdated = true;
652 }
653 }
654 } else if (type == UpdateType.REMOVE) {
655 // Check if the to be removed buckets are part of the
656 // old bucket list
657 for (GroupBucket removeBucket:buckets.buckets()) {
658 if (newBucketList.contains(removeBucket)) {
659 newBucketList.remove(removeBucket);
660 groupDescUpdated = true;
661 }
662 }
663 }
664
665 if (groupDescUpdated) {
666 return newBucketList;
667 } else {
668 return null;
669 }
670 }
671
672 /**
673 * Triggers deleting the existing group entry.
674 *
675 * @param deviceId the device ID
676 * @param appCookie the group key
677 */
678 @Override
679 public void deleteGroupDescription(DeviceId deviceId,
680 GroupKey appCookie) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700681 // Check if group to be deleted by a remote instance
682 if (mastershipService.
683 getLocalRole(deviceId) != MastershipRole.MASTER) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700684 log.debug("deleteGroupDescription: Device {} local role is not MASTER",
685 deviceId);
686 if (mastershipService.getMasterFor(deviceId) == null) {
687 log.error("No Master for device {}..."
688 + "Can not perform delete group operation",
689 deviceId);
690 //TODO: Send Group operation failure event
691 return;
692 }
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700693 GroupStoreMessage groupOp = GroupStoreMessage.
694 createGroupDeleteRequestMsg(deviceId,
695 appCookie);
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700696
Madan Jampani175e8fd2015-05-20 14:10:45 -0700697 clusterCommunicator.unicast(groupOp,
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700698 GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST,
699 m -> kryoBuilder.build().serialize(m),
Madan Jampani175e8fd2015-05-20 14:10:45 -0700700 mastershipService.getMasterFor(deviceId)).whenComplete((result, error) -> {
701 if (error != null) {
702 log.warn("Failed to send request to master: {} to {}",
703 groupOp,
704 mastershipService.getMasterFor(deviceId), error);
705 }
706 //TODO: Send Group operation failure event
707 });
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700708 return;
709 }
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700710 log.debug("deleteGroupDescription in device {} is getting handled locally",
711 deviceId);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700712 deleteGroupDescriptionInternal(deviceId, appCookie);
713 }
714
715 private void deleteGroupDescriptionInternal(DeviceId deviceId,
716 GroupKey appCookie) {
alshabib10580802015-02-18 18:30:33 -0800717 // Check if a group is existing with the provided key
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700718 StoredGroupEntry existing = getStoredGroupEntry(deviceId, appCookie);
alshabib10580802015-02-18 18:30:33 -0800719 if (existing == null) {
720 return;
721 }
722
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700723 log.debug("deleteGroupDescriptionInternal: group entry {} in device {} moving from {} to PENDING_DELETE",
724 existing.id(),
725 existing.deviceId(),
726 existing.state());
alshabib10580802015-02-18 18:30:33 -0800727 synchronized (existing) {
728 existing.setState(GroupState.PENDING_DELETE);
729 }
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700730 log.debug("deleteGroupDescriptionInternal: in device {} issuing GROUP_REMOVE_REQUESTED",
731 deviceId);
alshabib10580802015-02-18 18:30:33 -0800732 notifyDelegate(new GroupEvent(Type.GROUP_REMOVE_REQUESTED, existing));
733 }
734
735 /**
736 * Stores a new group entry, or updates an existing entry.
737 *
738 * @param group group entry
739 */
740 @Override
741 public void addOrUpdateGroupEntry(Group group) {
742 // check if this new entry is an update to an existing entry
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700743 StoredGroupEntry existing = getStoredGroupEntry(group.deviceId(),
744 group.id());
alshabib10580802015-02-18 18:30:33 -0800745 GroupEvent event = null;
746
747 if (existing != null) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700748 log.debug("addOrUpdateGroupEntry: updating group entry {} in device {}",
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700749 group.id(),
750 group.deviceId());
alshabib10580802015-02-18 18:30:33 -0800751 synchronized (existing) {
Srikanth Vavilapalli10e75cd2015-04-13 16:21:24 -0700752 for (GroupBucket bucket:group.buckets().buckets()) {
Sho SHIMIZU30d639b2015-05-05 09:30:35 -0700753 Optional<GroupBucket> matchingBucket =
Srikanth Vavilapalli10e75cd2015-04-13 16:21:24 -0700754 existing.buckets().buckets()
755 .stream()
756 .filter((existingBucket)->(existingBucket.equals(bucket)))
757 .findFirst();
758 if (matchingBucket.isPresent()) {
759 ((StoredGroupBucketEntry) matchingBucket.
760 get()).setPackets(bucket.packets());
761 ((StoredGroupBucketEntry) matchingBucket.
762 get()).setBytes(bucket.bytes());
763 } else {
764 log.warn("addOrUpdateGroupEntry: No matching "
765 + "buckets to update stats");
766 }
767 }
alshabib10580802015-02-18 18:30:33 -0800768 existing.setLife(group.life());
769 existing.setPackets(group.packets());
770 existing.setBytes(group.bytes());
Srikanth Vavilapalli5428b6c2015-05-14 20:22:47 -0700771 if ((existing.state() == GroupState.PENDING_ADD) ||
772 (existing.state() == GroupState.PENDING_ADD_RETRY)) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700773 log.debug("addOrUpdateGroupEntry: group entry {} in device {} moving from {} to ADDED",
774 existing.id(),
775 existing.deviceId(),
Srikanth Vavilapalli5428b6c2015-05-14 20:22:47 -0700776 existing.state());
alshabib10580802015-02-18 18:30:33 -0800777 existing.setState(GroupState.ADDED);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700778 existing.setIsGroupStateAddedFirstTime(true);
alshabib10580802015-02-18 18:30:33 -0800779 event = new GroupEvent(Type.GROUP_ADDED, existing);
780 } else {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700781 log.debug("addOrUpdateGroupEntry: group entry {} in device {} moving from {} to ADDED",
782 existing.id(),
783 existing.deviceId(),
784 GroupState.PENDING_UPDATE);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700785 existing.setState(GroupState.ADDED);
786 existing.setIsGroupStateAddedFirstTime(false);
alshabib10580802015-02-18 18:30:33 -0800787 event = new GroupEvent(Type.GROUP_UPDATED, existing);
788 }
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700789 //Re-PUT map entries to trigger map update events
790 getGroupStoreKeyMap().
791 put(new GroupStoreKeyMapKey(existing.deviceId(),
792 existing.appCookie()), existing);
alshabib10580802015-02-18 18:30:33 -0800793 }
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700794 } else {
795 log.warn("addOrUpdateGroupEntry: Group update "
796 + "happening for a non-existing entry in the map");
alshabib10580802015-02-18 18:30:33 -0800797 }
798
799 if (event != null) {
800 notifyDelegate(event);
801 }
802 }
803
804 /**
805 * Removes the group entry from store.
806 *
807 * @param group group entry
808 */
809 @Override
810 public void removeGroupEntry(Group group) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700811 StoredGroupEntry existing = getStoredGroupEntry(group.deviceId(),
812 group.id());
alshabib10580802015-02-18 18:30:33 -0800813
814 if (existing != null) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700815 log.debug("removeGroupEntry: removing group entry {} in device {}",
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700816 group.id(),
817 group.deviceId());
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700818 //Removal from groupid based map will happen in the
819 //map update listener
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700820 getGroupStoreKeyMap().remove(new GroupStoreKeyMapKey(existing.deviceId(),
821 existing.appCookie()));
alshabib10580802015-02-18 18:30:33 -0800822 notifyDelegate(new GroupEvent(Type.GROUP_REMOVED, existing));
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700823 } else {
824 log.warn("removeGroupEntry for {} in device{} is "
825 + "not existing in our maps",
826 group.id(),
827 group.deviceId());
alshabib10580802015-02-18 18:30:33 -0800828 }
829 }
830
831 @Override
832 public void deviceInitialAuditCompleted(DeviceId deviceId,
833 boolean completed) {
834 synchronized (deviceAuditStatus) {
835 if (completed) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700836 log.debug("AUDIT completed for device {}",
837 deviceId);
alshabib10580802015-02-18 18:30:33 -0800838 deviceAuditStatus.put(deviceId, true);
839 // Execute all pending group requests
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700840 List<StoredGroupEntry> pendingGroupRequests =
841 getPendingGroupKeyTable().values()
842 .stream()
843 .filter(g-> g.deviceId().equals(deviceId))
844 .collect(Collectors.toList());
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700845 log.debug("processing pending group add requests for device {} and number of pending requests {}",
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700846 deviceId,
847 pendingGroupRequests.size());
848 for (Group group:pendingGroupRequests) {
alshabib10580802015-02-18 18:30:33 -0800849 GroupDescription tmp = new DefaultGroupDescription(
850 group.deviceId(),
851 group.type(),
852 group.buckets(),
853 group.appCookie(),
Saurav Das100e3b82015-04-30 11:12:10 -0700854 group.givenGroupId(),
alshabib10580802015-02-18 18:30:33 -0800855 group.appId());
856 storeGroupDescriptionInternal(tmp);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700857 getPendingGroupKeyTable().
858 remove(new GroupStoreKeyMapKey(deviceId, group.appCookie()));
alshabib10580802015-02-18 18:30:33 -0800859 }
alshabib10580802015-02-18 18:30:33 -0800860 } else {
Thomas Vachuskac40d4632015-04-09 16:55:03 -0700861 Boolean audited = deviceAuditStatus.get(deviceId);
862 if (audited != null && audited) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700863 log.debug("Clearing AUDIT status for device {}", deviceId);
alshabib10580802015-02-18 18:30:33 -0800864 deviceAuditStatus.put(deviceId, false);
865 }
866 }
867 }
868 }
869
870 @Override
871 public boolean deviceInitialAuditStatus(DeviceId deviceId) {
872 synchronized (deviceAuditStatus) {
Thomas Vachuskac40d4632015-04-09 16:55:03 -0700873 Boolean audited = deviceAuditStatus.get(deviceId);
874 return audited != null && audited;
alshabib10580802015-02-18 18:30:33 -0800875 }
876 }
877
878 @Override
879 public void groupOperationFailed(DeviceId deviceId, GroupOperation operation) {
880
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700881 StoredGroupEntry existing = getStoredGroupEntry(deviceId,
882 operation.groupId());
alshabib10580802015-02-18 18:30:33 -0800883
884 if (existing == null) {
885 log.warn("No group entry with ID {} found ", operation.groupId());
886 return;
887 }
888
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700889 log.warn("groupOperationFailed: group operation {} failed"
890 + "for group {} in device {}",
891 operation.opType(),
892 existing.id(),
893 existing.deviceId());
alshabib10580802015-02-18 18:30:33 -0800894 switch (operation.opType()) {
895 case ADD:
Srikanth Vavilapalli5428b6c2015-05-14 20:22:47 -0700896 if (existing.state() == GroupState.PENDING_ADD) {
897 //TODO: Need to add support for passing the group
898 //operation failure reason from group provider.
899 //If the error type is anything other than GROUP_EXISTS,
900 //then the GROUP_ADD_FAILED event should be raised even
901 //in PENDING_ADD_RETRY state also.
902 notifyDelegate(new GroupEvent(Type.GROUP_ADD_FAILED, existing));
903 log.warn("groupOperationFailed: cleaningup "
904 + "group {} from store in device {}....",
905 existing.id(),
906 existing.deviceId());
907 //Removal from groupid based map will happen in the
908 //map update listener
909 getGroupStoreKeyMap().remove(new GroupStoreKeyMapKey(existing.deviceId(),
910 existing.appCookie()));
911 }
alshabib10580802015-02-18 18:30:33 -0800912 break;
913 case MODIFY:
914 notifyDelegate(new GroupEvent(Type.GROUP_UPDATE_FAILED, existing));
915 break;
916 case DELETE:
917 notifyDelegate(new GroupEvent(Type.GROUP_REMOVE_FAILED, existing));
918 break;
919 default:
920 log.warn("Unknown group operation type {}", operation.opType());
921 }
alshabib10580802015-02-18 18:30:33 -0800922 }
923
924 @Override
925 public void addOrUpdateExtraneousGroupEntry(Group group) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700926 log.debug("add/update extraneous group entry {} in device {}",
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700927 group.id(),
928 group.deviceId());
alshabib10580802015-02-18 18:30:33 -0800929 ConcurrentMap<GroupId, Group> extraneousIdTable =
930 getExtraneousGroupIdTable(group.deviceId());
931 extraneousIdTable.put(group.id(), group);
Srikanth Vavilapallie48b3cf2015-07-06 11:43:07 -0700932 // Don't remove the extraneous groups, instead re-use it when
933 // a group request comes with the same set of buckets
alshabib10580802015-02-18 18:30:33 -0800934 }
935
936 @Override
937 public void removeExtraneousGroupEntry(Group group) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700938 log.debug("remove extraneous group entry {} of device {} from store",
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700939 group.id(),
940 group.deviceId());
alshabib10580802015-02-18 18:30:33 -0800941 ConcurrentMap<GroupId, Group> extraneousIdTable =
942 getExtraneousGroupIdTable(group.deviceId());
943 extraneousIdTable.remove(group.id());
944 }
945
946 @Override
947 public Iterable<Group> getExtraneousGroups(DeviceId deviceId) {
948 // flatten and make iterator unmodifiable
949 return FluentIterable.from(
950 getExtraneousGroupIdTable(deviceId).values());
951 }
952
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700953 /**
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700954 * Map handler to receive any events when the group key map is updated.
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700955 */
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700956 private class GroupStoreKeyMapListener implements
957 EventuallyConsistentMapListener<GroupStoreKeyMapKey, StoredGroupEntry> {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700958
959 @Override
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700960 public void event(EventuallyConsistentMapEvent<GroupStoreKeyMapKey,
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700961 StoredGroupEntry> mapEvent) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700962 GroupEvent groupEvent = null;
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700963 GroupStoreKeyMapKey key = mapEvent.key();
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700964 StoredGroupEntry group = mapEvent.value();
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700965 if ((key == null) && (group == null)) {
966 log.error("GroupStoreKeyMapListener: Received "
967 + "event {} with null entry", mapEvent.type());
968 return;
969 } else if (group == null) {
970 group = getGroupIdTable(key.deviceId()).values()
971 .stream()
972 .filter((storedGroup) -> (storedGroup.appCookie().equals(key.appCookie)))
973 .findFirst().get();
974 if (group == null) {
975 log.error("GroupStoreKeyMapListener: Received "
976 + "event {} with null entry... can not process", mapEvent.type());
977 return;
978 }
979 }
980 log.trace("received groupid map event {} for id {} in device {}",
981 mapEvent.type(),
982 group.id(),
983 key.deviceId());
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700984 if (mapEvent.type() == EventuallyConsistentMapEvent.Type.PUT) {
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700985 // Update the group ID table
986 getGroupIdTable(group.deviceId()).put(group.id(), group);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700987 if (mapEvent.value().state() == Group.GroupState.ADDED) {
988 if (mapEvent.value().isGroupStateAddedFirstTime()) {
989 groupEvent = new GroupEvent(Type.GROUP_ADDED,
990 mapEvent.value());
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700991 log.trace("Received first time GROUP_ADDED state update for id {} in device {}",
992 group.id(),
993 group.deviceId());
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700994 } else {
995 groupEvent = new GroupEvent(Type.GROUP_UPDATED,
996 mapEvent.value());
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700997 log.trace("Received following GROUP_ADDED state update for id {} in device {}",
998 group.id(),
999 group.deviceId());
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001000 }
1001 }
1002 } else if (mapEvent.type() == EventuallyConsistentMapEvent.Type.REMOVE) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001003 groupEvent = new GroupEvent(Type.GROUP_REMOVED, group);
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -07001004 // Remove the entry from the group ID table
1005 getGroupIdTable(group.deviceId()).remove(group.id(), group);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001006 }
1007
1008 if (groupEvent != null) {
1009 notifyDelegate(groupEvent);
1010 }
1011 }
1012 }
Madan Jampani01e05fb2015-08-13 13:29:36 -07001013
1014 private void process(GroupStoreMessage groupOp) {
1015 log.debug("Received remote group operation {} request for device {}",
1016 groupOp.type(),
1017 groupOp.deviceId());
1018 if (!mastershipService.isLocalMaster(groupOp.deviceId())) {
1019 log.warn("This node is not MASTER for device {}", groupOp.deviceId());
1020 return;
1021 }
1022 if (groupOp.type() == GroupStoreMessage.Type.ADD) {
1023 storeGroupDescriptionInternal(groupOp.groupDesc());
1024 } else if (groupOp.type() == GroupStoreMessage.Type.UPDATE) {
1025 updateGroupDescriptionInternal(groupOp.deviceId(),
1026 groupOp.appCookie(),
1027 groupOp.updateType(),
1028 groupOp.updateBuckets(),
1029 groupOp.newAppCookie());
1030 } else if (groupOp.type() == GroupStoreMessage.Type.DELETE) {
1031 deleteGroupDescriptionInternal(groupOp.deviceId(),
1032 groupOp.appCookie());
1033 }
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001034 }
1035
1036 /**
1037 * Flattened map key to be used to store group entries.
1038 */
Ray Milkeyb3c5ce22015-08-10 09:07:36 -07001039 protected static class GroupStoreMapKey {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001040 private final DeviceId deviceId;
1041
1042 public GroupStoreMapKey(DeviceId deviceId) {
1043 this.deviceId = deviceId;
1044 }
1045
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001046 public DeviceId deviceId() {
1047 return deviceId;
1048 }
1049
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001050 @Override
1051 public boolean equals(Object o) {
1052 if (this == o) {
1053 return true;
1054 }
1055 if (!(o instanceof GroupStoreMapKey)) {
1056 return false;
1057 }
1058 GroupStoreMapKey that = (GroupStoreMapKey) o;
1059 return this.deviceId.equals(that.deviceId);
1060 }
1061
1062 @Override
1063 public int hashCode() {
1064 int result = 17;
1065
1066 result = 31 * result + Objects.hash(this.deviceId);
1067
1068 return result;
1069 }
1070 }
1071
Ray Milkeyb3c5ce22015-08-10 09:07:36 -07001072 protected static class GroupStoreKeyMapKey extends GroupStoreMapKey {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001073 private final GroupKey appCookie;
1074 public GroupStoreKeyMapKey(DeviceId deviceId,
1075 GroupKey appCookie) {
1076 super(deviceId);
1077 this.appCookie = appCookie;
1078 }
1079
1080 @Override
1081 public boolean equals(Object o) {
1082 if (this == o) {
1083 return true;
1084 }
1085 if (!(o instanceof GroupStoreKeyMapKey)) {
1086 return false;
1087 }
1088 GroupStoreKeyMapKey that = (GroupStoreKeyMapKey) o;
1089 return (super.equals(that) &&
1090 this.appCookie.equals(that.appCookie));
1091 }
1092
1093 @Override
1094 public int hashCode() {
1095 int result = 17;
1096
1097 result = 31 * result + super.hashCode() + Objects.hash(this.appCookie);
1098
1099 return result;
1100 }
1101 }
1102
Ray Milkeyb3c5ce22015-08-10 09:07:36 -07001103 protected static class GroupStoreIdMapKey extends GroupStoreMapKey {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001104 private final GroupId groupId;
1105 public GroupStoreIdMapKey(DeviceId deviceId,
1106 GroupId groupId) {
1107 super(deviceId);
1108 this.groupId = groupId;
1109 }
1110
1111 @Override
1112 public boolean equals(Object o) {
1113 if (this == o) {
1114 return true;
1115 }
1116 if (!(o instanceof GroupStoreIdMapKey)) {
1117 return false;
1118 }
1119 GroupStoreIdMapKey that = (GroupStoreIdMapKey) o;
1120 return (super.equals(that) &&
1121 this.groupId.equals(that.groupId));
1122 }
1123
1124 @Override
1125 public int hashCode() {
1126 int result = 17;
1127
1128 result = 31 * result + super.hashCode() + Objects.hash(this.groupId);
1129
1130 return result;
1131 }
1132 }
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001133
1134 @Override
1135 public void pushGroupMetrics(DeviceId deviceId,
1136 Collection<Group> groupEntries) {
1137 boolean deviceInitialAuditStatus =
1138 deviceInitialAuditStatus(deviceId);
1139 Set<Group> southboundGroupEntries =
1140 Sets.newHashSet(groupEntries);
1141 Set<StoredGroupEntry> storedGroupEntries =
1142 Sets.newHashSet(getStoredGroups(deviceId));
1143 Set<Group> extraneousStoredEntries =
1144 Sets.newHashSet(getExtraneousGroups(deviceId));
1145
1146 log.trace("pushGroupMetrics: Displaying all ({}) southboundGroupEntries for device {}",
1147 southboundGroupEntries.size(),
1148 deviceId);
1149 for (Iterator<Group> it = southboundGroupEntries.iterator(); it.hasNext();) {
1150 Group group = it.next();
1151 log.trace("Group {} in device {}", group, deviceId);
1152 }
1153
1154 log.trace("Displaying all ({}) stored group entries for device {}",
1155 storedGroupEntries.size(),
1156 deviceId);
1157 for (Iterator<StoredGroupEntry> it1 = storedGroupEntries.iterator();
1158 it1.hasNext();) {
1159 Group group = it1.next();
1160 log.trace("Stored Group {} for device {}", group, deviceId);
1161 }
1162
1163 for (Iterator<Group> it2 = southboundGroupEntries.iterator(); it2.hasNext();) {
1164 Group group = it2.next();
1165 if (storedGroupEntries.remove(group)) {
1166 // we both have the group, let's update some info then.
1167 log.trace("Group AUDIT: group {} exists in both planes for device {}",
1168 group.id(), deviceId);
1169 groupAdded(group);
1170 it2.remove();
1171 }
1172 }
1173 for (Group group : southboundGroupEntries) {
1174 if (getGroup(group.deviceId(), group.id()) != null) {
1175 // There is a group existing with the same id
1176 // It is possible that group update is
1177 // in progress while we got a stale info from switch
1178 if (!storedGroupEntries.remove(getGroup(
1179 group.deviceId(), group.id()))) {
1180 log.warn("Group AUDIT: Inconsistent state:"
1181 + "Group exists in ID based table while "
1182 + "not present in key based table");
1183 }
1184 } else {
1185 // there are groups in the switch that aren't in the store
1186 log.debug("Group AUDIT: extraneous group {} exists in data plane for device {}",
1187 group.id(), deviceId);
1188 extraneousStoredEntries.remove(group);
1189 extraneousGroup(group);
1190 }
1191 }
1192 for (Group group : storedGroupEntries) {
1193 // there are groups in the store that aren't in the switch
1194 log.debug("Group AUDIT: group {} missing in data plane for device {}",
1195 group.id(), deviceId);
1196 groupMissing(group);
1197 }
1198 for (Group group : extraneousStoredEntries) {
1199 // there are groups in the extraneous store that
1200 // aren't in the switch
1201 log.debug("Group AUDIT: clearing extransoeus group {} from store for device {}",
1202 group.id(), deviceId);
1203 removeExtraneousGroupEntry(group);
1204 }
1205
1206 if (!deviceInitialAuditStatus) {
1207 log.debug("Group AUDIT: Setting device {} initial AUDIT completed",
1208 deviceId);
1209 deviceInitialAuditCompleted(deviceId, true);
1210 }
1211 }
1212
1213 private void groupMissing(Group group) {
1214 switch (group.state()) {
1215 case PENDING_DELETE:
1216 log.debug("Group {} delete confirmation from device {}",
1217 group, group.deviceId());
1218 removeGroupEntry(group);
1219 break;
1220 case ADDED:
1221 case PENDING_ADD:
Srikanth Vavilapalli5428b6c2015-05-14 20:22:47 -07001222 case PENDING_ADD_RETRY:
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001223 case PENDING_UPDATE:
1224 log.debug("Group {} is in store but not on device {}",
1225 group, group.deviceId());
1226 StoredGroupEntry existing =
1227 getStoredGroupEntry(group.deviceId(), group.id());
Srikanth Vavilapalli5428b6c2015-05-14 20:22:47 -07001228 log.debug("groupMissing: group entry {} in device {} moving from {} to PENDING_ADD_RETRY",
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001229 existing.id(),
1230 existing.deviceId(),
1231 existing.state());
Srikanth Vavilapalli5428b6c2015-05-14 20:22:47 -07001232 existing.setState(Group.GroupState.PENDING_ADD_RETRY);
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001233 //Re-PUT map entries to trigger map update events
1234 getGroupStoreKeyMap().
1235 put(new GroupStoreKeyMapKey(existing.deviceId(),
1236 existing.appCookie()), existing);
1237 notifyDelegate(new GroupEvent(GroupEvent.Type.GROUP_ADD_REQUESTED,
1238 group));
1239 break;
1240 default:
1241 log.debug("Group {} has not been installed.", group);
1242 break;
1243 }
1244 }
1245
1246 private void extraneousGroup(Group group) {
1247 log.debug("Group {} is on device {} but not in store.",
1248 group, group.deviceId());
1249 addOrUpdateExtraneousGroupEntry(group);
1250 }
1251
1252 private void groupAdded(Group group) {
1253 log.trace("Group {} Added or Updated in device {}",
1254 group, group.deviceId());
1255 addOrUpdateGroupEntry(group);
1256 }
alshabib10580802015-02-18 18:30:33 -08001257}