blob: a999ee7f5dc6d7fb8153ccaa4e42b5c85872a268 [file] [log] [blame]
alshabib10580802015-02-18 18:30:33 -08001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.group.impl;
17
Jonathan Hart6ec029a2015-03-24 17:12:35 -070018import com.google.common.collect.FluentIterable;
19import com.google.common.collect.Iterables;
Srikanth Vavilapalli23181912015-05-04 09:48:09 -070020import com.google.common.collect.Sets;
Srikanth Vavilapalli10e75cd2015-04-13 16:21:24 -070021
alshabib10580802015-02-18 18:30:33 -080022import org.apache.felix.scr.annotations.Activate;
23import org.apache.felix.scr.annotations.Component;
24import org.apache.felix.scr.annotations.Deactivate;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070025import org.apache.felix.scr.annotations.Reference;
26import org.apache.felix.scr.annotations.ReferenceCardinality;
alshabib10580802015-02-18 18:30:33 -080027import org.apache.felix.scr.annotations.Service;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070028import org.onlab.util.KryoNamespace;
alshabib10580802015-02-18 18:30:33 -080029import org.onlab.util.NewConcurrentHashMap;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070030import org.onosproject.cluster.ClusterService;
alshabib10580802015-02-18 18:30:33 -080031import org.onosproject.core.DefaultGroupId;
32import org.onosproject.core.GroupId;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070033import org.onosproject.mastership.MastershipService;
alshabib10580802015-02-18 18:30:33 -080034import org.onosproject.net.DeviceId;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070035import org.onosproject.net.MastershipRole;
alshabib10580802015-02-18 18:30:33 -080036import org.onosproject.net.group.DefaultGroup;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070037import org.onosproject.net.group.DefaultGroupBucket;
alshabib10580802015-02-18 18:30:33 -080038import org.onosproject.net.group.DefaultGroupDescription;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070039import org.onosproject.net.group.DefaultGroupKey;
alshabib10580802015-02-18 18:30:33 -080040import org.onosproject.net.group.Group;
41import org.onosproject.net.group.Group.GroupState;
42import org.onosproject.net.group.GroupBucket;
43import org.onosproject.net.group.GroupBuckets;
44import org.onosproject.net.group.GroupDescription;
45import org.onosproject.net.group.GroupEvent;
46import org.onosproject.net.group.GroupEvent.Type;
47import org.onosproject.net.group.GroupKey;
48import org.onosproject.net.group.GroupOperation;
49import org.onosproject.net.group.GroupStore;
50import org.onosproject.net.group.GroupStoreDelegate;
Srikanth Vavilapalli10e75cd2015-04-13 16:21:24 -070051import org.onosproject.net.group.StoredGroupBucketEntry;
alshabib10580802015-02-18 18:30:33 -080052import org.onosproject.net.group.StoredGroupEntry;
53import org.onosproject.store.AbstractStore;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070054import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
Jonathan Hart63939a32015-05-08 11:57:03 -070055import org.onosproject.store.service.MultiValuedTimestamp;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070056import org.onosproject.store.serializers.KryoNamespaces;
Jonathan Hart6ec029a2015-03-24 17:12:35 -070057import org.onosproject.store.service.EventuallyConsistentMap;
58import org.onosproject.store.service.EventuallyConsistentMapBuilder;
59import org.onosproject.store.service.EventuallyConsistentMapEvent;
60import org.onosproject.store.service.EventuallyConsistentMapListener;
61import org.onosproject.store.service.StorageService;
alshabib10580802015-02-18 18:30:33 -080062import org.slf4j.Logger;
63
Jonathan Hart6ec029a2015-03-24 17:12:35 -070064import java.util.ArrayList;
Srikanth Vavilapalli23181912015-05-04 09:48:09 -070065import java.util.Collection;
Jonathan Hart6ec029a2015-03-24 17:12:35 -070066import java.util.HashMap;
Srikanth Vavilapalli23181912015-05-04 09:48:09 -070067import java.util.Iterator;
Jonathan Hart6ec029a2015-03-24 17:12:35 -070068import java.util.List;
69import java.util.Objects;
Sho SHIMIZU30d639b2015-05-05 09:30:35 -070070import java.util.Optional;
Srikanth Vavilapalli23181912015-05-04 09:48:09 -070071import java.util.Set;
Jonathan Hart6ec029a2015-03-24 17:12:35 -070072import java.util.concurrent.ConcurrentHashMap;
73import java.util.concurrent.ConcurrentMap;
74import java.util.concurrent.ExecutorService;
75import java.util.concurrent.Executors;
76import java.util.concurrent.atomic.AtomicInteger;
77import java.util.concurrent.atomic.AtomicLong;
78import java.util.stream.Collectors;
79
80import static org.apache.commons.lang3.concurrent.ConcurrentUtils.createIfAbsentUnchecked;
81import static org.onlab.util.Tools.groupedThreads;
82import static org.slf4j.LoggerFactory.getLogger;
alshabib10580802015-02-18 18:30:33 -080083
84/**
85 * Manages inventory of group entries using trivial in-memory implementation.
86 */
87@Component(immediate = true)
88@Service
89public class DistributedGroupStore
90 extends AbstractStore<GroupEvent, GroupStoreDelegate>
91 implements GroupStore {
92
93 private final Logger log = getLogger(getClass());
94
95 private final int dummyId = 0xffffffff;
96 private final GroupId dummyGroupId = new DefaultGroupId(dummyId);
97
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070098 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
99 protected ClusterCommunicationService clusterCommunicator;
100
101 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
102 protected ClusterService clusterService;
103
104 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700105 protected StorageService storageService;
106
107 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700108 protected MastershipService mastershipService;
109
110 // Per device group table with (device id + app cookie) as key
111 private EventuallyConsistentMap<GroupStoreKeyMapKey,
112 StoredGroupEntry> groupStoreEntriesByKey = null;
113 // Per device group table with (device id + group id) as key
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700114 private final ConcurrentMap<DeviceId, ConcurrentMap<GroupId, StoredGroupEntry>>
115 groupEntriesById = new ConcurrentHashMap<>();
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700116 private EventuallyConsistentMap<GroupStoreKeyMapKey,
117 StoredGroupEntry> auditPendingReqQueue = null;
alshabib10580802015-02-18 18:30:33 -0800118 private final ConcurrentMap<DeviceId, ConcurrentMap<GroupId, Group>>
119 extraneousGroupEntriesById = new ConcurrentHashMap<>();
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700120 private ExecutorService messageHandlingExecutor;
121 private static final int MESSAGE_HANDLER_THREAD_POOL_SIZE = 1;
alshabib10580802015-02-18 18:30:33 -0800122
Sho SHIMIZU7a4087b2015-09-10 09:23:16 -0700123 private final HashMap<DeviceId, Boolean> deviceAuditStatus = new HashMap<>();
alshabib10580802015-02-18 18:30:33 -0800124
125 private final AtomicInteger groupIdGen = new AtomicInteger();
126
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700127 private KryoNamespace.Builder kryoBuilder = null;
128
Madan Jampanibcf1a482015-06-24 19:05:56 -0700129 private final AtomicLong sequenceNumber = new AtomicLong(0);
130
HIGUCHI Yuta180d70f2015-10-01 16:13:56 -0700131 private KryoNamespace clusterMsgSerializer;
132
alshabib10580802015-02-18 18:30:33 -0800133 @Activate
134 public void activate() {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700135 kryoBuilder = new KryoNamespace.Builder()
Charles Chan138cd5a2015-09-29 16:57:41 -0700136 .register(KryoNamespaces.API)
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700137 .register(DefaultGroup.class,
138 DefaultGroupBucket.class,
139 DefaultGroupDescription.class,
140 DefaultGroupKey.class,
141 GroupDescription.Type.class,
142 Group.GroupState.class,
143 GroupBuckets.class,
144 DefaultGroupId.class,
145 GroupStoreMessage.class,
146 GroupStoreMessage.Type.class,
147 UpdateType.class,
148 GroupStoreMessageSubjects.class,
149 MultiValuedTimestamp.class,
150 GroupStoreKeyMapKey.class,
151 GroupStoreIdMapKey.class,
152 GroupStoreMapKey.class
Charles Chan138cd5a2015-09-29 16:57:41 -0700153 );
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700154
HIGUCHI Yuta180d70f2015-10-01 16:13:56 -0700155 clusterMsgSerializer = kryoBuilder.build();
156
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700157 messageHandlingExecutor = Executors.
158 newFixedThreadPool(MESSAGE_HANDLER_THREAD_POOL_SIZE,
159 groupedThreads("onos/store/group",
160 "message-handlers"));
Madan Jampani01e05fb2015-08-13 13:29:36 -0700161
162 clusterCommunicator.addSubscriber(GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST,
HIGUCHI Yuta180d70f2015-10-01 16:13:56 -0700163 clusterMsgSerializer::deserialize,
Madan Jampani01e05fb2015-08-13 13:29:36 -0700164 this::process,
165 messageHandlingExecutor);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700166
167 log.debug("Creating EC map groupstorekeymap");
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700168 EventuallyConsistentMapBuilder<GroupStoreKeyMapKey, StoredGroupEntry>
169 keyMapBuilder = storageService.eventuallyConsistentMapBuilder();
170
171 groupStoreEntriesByKey = keyMapBuilder
172 .withName("groupstorekeymap")
173 .withSerializer(kryoBuilder)
Madan Jampanibcf1a482015-06-24 19:05:56 -0700174 .withTimestampProvider((k, v) -> new MultiValuedTimestamp<>(System.currentTimeMillis(),
175 sequenceNumber.getAndIncrement()))
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700176 .build();
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700177 groupStoreEntriesByKey.addListener(new GroupStoreKeyMapListener());
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700178 log.debug("Current size of groupstorekeymap:{}",
179 groupStoreEntriesByKey.size());
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700180
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700181 log.debug("Creating EC map pendinggroupkeymap");
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700182 EventuallyConsistentMapBuilder<GroupStoreKeyMapKey, StoredGroupEntry>
183 auditMapBuilder = storageService.eventuallyConsistentMapBuilder();
184
185 auditPendingReqQueue = auditMapBuilder
186 .withName("pendinggroupkeymap")
187 .withSerializer(kryoBuilder)
Madan Jampanibcf1a482015-06-24 19:05:56 -0700188 .withTimestampProvider((k, v) -> new MultiValuedTimestamp<>(System.currentTimeMillis(),
189 sequenceNumber.getAndIncrement()))
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700190 .build();
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700191 log.debug("Current size of pendinggroupkeymap:{}",
192 auditPendingReqQueue.size());
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700193
alshabib10580802015-02-18 18:30:33 -0800194 log.info("Started");
195 }
196
197 @Deactivate
198 public void deactivate() {
HIGUCHI Yuta180d70f2015-10-01 16:13:56 -0700199 clusterCommunicator.removeSubscriber(GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST);
Thomas Vachuska152f9fd2015-04-02 16:28:13 -0700200 groupStoreEntriesByKey.destroy();
Thomas Vachuska152f9fd2015-04-02 16:28:13 -0700201 auditPendingReqQueue.destroy();
alshabib10580802015-02-18 18:30:33 -0800202 log.info("Stopped");
203 }
204
alshabib10580802015-02-18 18:30:33 -0800205 private static NewConcurrentHashMap<GroupId, Group>
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700206 lazyEmptyExtraneousGroupIdTable() {
alshabib10580802015-02-18 18:30:33 -0800207 return NewConcurrentHashMap.<GroupId, Group>ifNeeded();
208 }
209
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700210 private static NewConcurrentHashMap<GroupId, StoredGroupEntry>
211 lazyEmptyGroupIdTable() {
212 return NewConcurrentHashMap.<GroupId, StoredGroupEntry>ifNeeded();
213 }
214
alshabib10580802015-02-18 18:30:33 -0800215 /**
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700216 * Returns the group store eventual consistent key map.
alshabib10580802015-02-18 18:30:33 -0800217 *
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700218 * @return Map representing group key table.
alshabib10580802015-02-18 18:30:33 -0800219 */
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700220 private EventuallyConsistentMap<GroupStoreKeyMapKey, StoredGroupEntry>
221 getGroupStoreKeyMap() {
222 return groupStoreEntriesByKey;
alshabib10580802015-02-18 18:30:33 -0800223 }
224
225 /**
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700226 * Returns the group id table for specified device.
alshabib10580802015-02-18 18:30:33 -0800227 *
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700228 * @param deviceId identifier of the device
229 * @return Map representing group key table of given device.
alshabib10580802015-02-18 18:30:33 -0800230 */
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700231 private ConcurrentMap<GroupId, StoredGroupEntry> getGroupIdTable(DeviceId deviceId) {
232 return createIfAbsentUnchecked(groupEntriesById,
233 deviceId, lazyEmptyGroupIdTable());
alshabib10580802015-02-18 18:30:33 -0800234 }
235
236 /**
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700237 * Returns the pending group request table.
alshabib10580802015-02-18 18:30:33 -0800238 *
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700239 * @return Map representing group key table.
alshabib10580802015-02-18 18:30:33 -0800240 */
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700241 private EventuallyConsistentMap<GroupStoreKeyMapKey, StoredGroupEntry>
242 getPendingGroupKeyTable() {
243 return auditPendingReqQueue;
alshabib10580802015-02-18 18:30:33 -0800244 }
245
246 /**
247 * Returns the extraneous group id table for specified device.
248 *
249 * @param deviceId identifier of the device
250 * @return Map representing group key table of given device.
251 */
252 private ConcurrentMap<GroupId, Group>
253 getExtraneousGroupIdTable(DeviceId deviceId) {
254 return createIfAbsentUnchecked(extraneousGroupEntriesById,
255 deviceId,
256 lazyEmptyExtraneousGroupIdTable());
257 }
258
259 /**
260 * Returns the number of groups for the specified device in the store.
261 *
262 * @return number of groups for the specified device
263 */
264 @Override
265 public int getGroupCount(DeviceId deviceId) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700266 return (getGroups(deviceId) != null) ?
267 Iterables.size(getGroups(deviceId)) : 0;
alshabib10580802015-02-18 18:30:33 -0800268 }
269
270 /**
271 * Returns the groups associated with a device.
272 *
273 * @param deviceId the device ID
274 *
275 * @return the group entries
276 */
277 @Override
278 public Iterable<Group> getGroups(DeviceId deviceId) {
279 // flatten and make iterator unmodifiable
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700280 return FluentIterable.from(getGroupStoreKeyMap().values())
281 .filter(input -> input.deviceId().equals(deviceId))
282 .transform(input -> input);
alshabib10580802015-02-18 18:30:33 -0800283 }
284
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700285 private Iterable<StoredGroupEntry> getStoredGroups(DeviceId deviceId) {
286 // flatten and make iterator unmodifiable
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700287 return FluentIterable.from(getGroupStoreKeyMap().values())
288 .filter(input -> input.deviceId().equals(deviceId));
289 }
290
alshabib10580802015-02-18 18:30:33 -0800291 /**
292 * Returns the stored group entry.
293 *
294 * @param deviceId the device ID
295 * @param appCookie the group key
296 *
297 * @return a group associated with the key
298 */
299 @Override
300 public Group getGroup(DeviceId deviceId, GroupKey appCookie) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700301 return getStoredGroupEntry(deviceId, appCookie);
302 }
303
304 private StoredGroupEntry getStoredGroupEntry(DeviceId deviceId,
305 GroupKey appCookie) {
306 return getGroupStoreKeyMap().get(new GroupStoreKeyMapKey(deviceId,
307 appCookie));
308 }
309
310 @Override
311 public Group getGroup(DeviceId deviceId, GroupId groupId) {
312 return getStoredGroupEntry(deviceId, groupId);
313 }
314
315 private StoredGroupEntry getStoredGroupEntry(DeviceId deviceId,
316 GroupId groupId) {
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700317 return getGroupIdTable(deviceId).get(groupId);
alshabib10580802015-02-18 18:30:33 -0800318 }
319
320 private int getFreeGroupIdValue(DeviceId deviceId) {
321 int freeId = groupIdGen.incrementAndGet();
322
323 while (true) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700324 Group existing = getGroup(deviceId, new DefaultGroupId(freeId));
alshabib10580802015-02-18 18:30:33 -0800325 if (existing == null) {
326 existing = (
327 extraneousGroupEntriesById.get(deviceId) != null) ?
328 extraneousGroupEntriesById.get(deviceId).
329 get(new DefaultGroupId(freeId)) :
330 null;
331 }
332 if (existing != null) {
333 freeId = groupIdGen.incrementAndGet();
334 } else {
335 break;
336 }
337 }
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700338 log.debug("getFreeGroupIdValue: Next Free ID is {}", freeId);
alshabib10580802015-02-18 18:30:33 -0800339 return freeId;
340 }
341
342 /**
343 * Stores a new group entry using the information from group description.
344 *
345 * @param groupDesc group description to be used to create group entry
346 */
347 @Override
348 public void storeGroupDescription(GroupDescription groupDesc) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700349 log.debug("In storeGroupDescription");
alshabib10580802015-02-18 18:30:33 -0800350 // Check if a group is existing with the same key
351 if (getGroup(groupDesc.deviceId(), groupDesc.appCookie()) != null) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700352 log.warn("Group already exists with the same key {}",
353 groupDesc.appCookie());
alshabib10580802015-02-18 18:30:33 -0800354 return;
355 }
356
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700357 // Check if group to be created by a remote instance
Madan Jampani175e8fd2015-05-20 14:10:45 -0700358 if (mastershipService.getLocalRole(groupDesc.deviceId()) != MastershipRole.MASTER) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700359 log.debug("storeGroupDescription: Device {} local role is not MASTER",
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700360 groupDesc.deviceId());
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700361 if (mastershipService.getMasterFor(groupDesc.deviceId()) == null) {
362 log.error("No Master for device {}..."
363 + "Can not perform add group operation",
364 groupDesc.deviceId());
365 //TODO: Send Group operation failure event
366 return;
367 }
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700368 GroupStoreMessage groupOp = GroupStoreMessage.
369 createGroupAddRequestMsg(groupDesc.deviceId(),
370 groupDesc);
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700371
Madan Jampani175e8fd2015-05-20 14:10:45 -0700372 clusterCommunicator.unicast(groupOp,
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700373 GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST,
HIGUCHI Yuta180d70f2015-10-01 16:13:56 -0700374 clusterMsgSerializer::serialize,
Madan Jampani175e8fd2015-05-20 14:10:45 -0700375 mastershipService.getMasterFor(groupDesc.deviceId())).whenComplete((result, error) -> {
376 if (error != null) {
377 log.warn("Failed to send request to master: {} to {}",
378 groupOp,
379 mastershipService.getMasterFor(groupDesc.deviceId()));
380 //TODO: Send Group operation failure event
381 } else {
382 log.debug("Sent Group operation request for device {} "
383 + "to remote MASTER {}",
384 groupDesc.deviceId(),
385 mastershipService.getMasterFor(groupDesc.deviceId()));
386 }
387 });
alshabib10580802015-02-18 18:30:33 -0800388 return;
389 }
390
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700391 log.debug("Store group for device {} is getting handled locally",
392 groupDesc.deviceId());
alshabib10580802015-02-18 18:30:33 -0800393 storeGroupDescriptionInternal(groupDesc);
394 }
395
Srikanth Vavilapallie48b3cf2015-07-06 11:43:07 -0700396 private Group getMatchingExtraneousGroupbyId(DeviceId deviceId, Integer groupId) {
397 ConcurrentMap<GroupId, Group> extraneousMap =
398 extraneousGroupEntriesById.get(deviceId);
399 if (extraneousMap == null) {
400 return null;
401 }
402 return extraneousMap.get(new DefaultGroupId(groupId));
403 }
404
405 private Group getMatchingExtraneousGroupbyBuckets(DeviceId deviceId,
406 GroupBuckets buckets) {
407 ConcurrentMap<GroupId, Group> extraneousMap =
408 extraneousGroupEntriesById.get(deviceId);
409 if (extraneousMap == null) {
410 return null;
411 }
412
413 for (Group extraneousGroup:extraneousMap.values()) {
414 if (extraneousGroup.buckets().equals(buckets)) {
415 return extraneousGroup;
416 }
417 }
418 return null;
419 }
420
alshabib10580802015-02-18 18:30:33 -0800421 private void storeGroupDescriptionInternal(GroupDescription groupDesc) {
422 // Check if a group is existing with the same key
423 if (getGroup(groupDesc.deviceId(), groupDesc.appCookie()) != null) {
424 return;
425 }
426
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700427 if (deviceAuditStatus.get(groupDesc.deviceId()) == null) {
428 // Device group audit has not completed yet
429 // Add this group description to pending group key table
430 // Create a group entry object with Dummy Group ID
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700431 log.debug("storeGroupDescriptionInternal: Device {} AUDIT pending...Queuing Group ADD request",
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700432 groupDesc.deviceId());
433 StoredGroupEntry group = new DefaultGroup(dummyGroupId, groupDesc);
434 group.setState(GroupState.WAITING_AUDIT_COMPLETE);
435 EventuallyConsistentMap<GroupStoreKeyMapKey, StoredGroupEntry> pendingKeyTable =
436 getPendingGroupKeyTable();
437 pendingKeyTable.put(new GroupStoreKeyMapKey(groupDesc.deviceId(),
438 groupDesc.appCookie()),
439 group);
440 return;
441 }
442
Srikanth Vavilapallie48b3cf2015-07-06 11:43:07 -0700443 Group matchingExtraneousGroup = null;
444 if (groupDesc.givenGroupId() != null) {
445 //Check if there is a extraneous group existing with the same Id
446 matchingExtraneousGroup = getMatchingExtraneousGroupbyId(
447 groupDesc.deviceId(), groupDesc.givenGroupId());
448 if (matchingExtraneousGroup != null) {
449 log.debug("storeGroupDescriptionInternal: Matching extraneous group found in Device {} for group id {}",
450 groupDesc.deviceId(),
451 groupDesc.givenGroupId());
452 //Check if the group buckets matches with user provided buckets
453 if (matchingExtraneousGroup.buckets().equals(groupDesc.buckets())) {
454 //Group is already existing with the same buckets and Id
455 // Create a group entry object
456 log.debug("storeGroupDescriptionInternal: Buckets also matching in Device {} for group id {}",
457 groupDesc.deviceId(),
458 groupDesc.givenGroupId());
459 StoredGroupEntry group = new DefaultGroup(
460 matchingExtraneousGroup.id(), groupDesc);
461 // Insert the newly created group entry into key and id maps
462 getGroupStoreKeyMap().
463 put(new GroupStoreKeyMapKey(groupDesc.deviceId(),
464 groupDesc.appCookie()), group);
465 // Ensure it also inserted into group id based table to
466 // avoid any chances of duplication in group id generation
467 getGroupIdTable(groupDesc.deviceId()).
468 put(matchingExtraneousGroup.id(), group);
469 addOrUpdateGroupEntry(matchingExtraneousGroup);
470 removeExtraneousGroupEntry(matchingExtraneousGroup);
471 return;
472 } else {
473 //Group buckets are not matching. Update group
474 //with user provided buckets.
475 //TODO
476 log.debug("storeGroupDescriptionInternal: Buckets are not matching in Device {} for group id {}",
477 groupDesc.deviceId(),
478 groupDesc.givenGroupId());
479 }
480 }
481 } else {
482 //Check if there is an extraneous group with user provided buckets
483 matchingExtraneousGroup = getMatchingExtraneousGroupbyBuckets(
484 groupDesc.deviceId(), groupDesc.buckets());
485 if (matchingExtraneousGroup != null) {
486 //Group is already existing with the same buckets.
487 //So reuse this group.
488 log.debug("storeGroupDescriptionInternal: Matching extraneous group found in Device {}",
489 groupDesc.deviceId());
490 //Create a group entry object
491 StoredGroupEntry group = new DefaultGroup(
492 matchingExtraneousGroup.id(), groupDesc);
493 // Insert the newly created group entry into key and id maps
494 getGroupStoreKeyMap().
495 put(new GroupStoreKeyMapKey(groupDesc.deviceId(),
496 groupDesc.appCookie()), group);
497 // Ensure it also inserted into group id based table to
498 // avoid any chances of duplication in group id generation
499 getGroupIdTable(groupDesc.deviceId()).
500 put(matchingExtraneousGroup.id(), group);
501 addOrUpdateGroupEntry(matchingExtraneousGroup);
502 removeExtraneousGroupEntry(matchingExtraneousGroup);
503 return;
504 } else {
505 //TODO: Check if there are any empty groups that can be used here
506 log.debug("storeGroupDescriptionInternal: No matching extraneous groups found in Device {}",
507 groupDesc.deviceId());
508 }
509 }
510
Saurav Das100e3b82015-04-30 11:12:10 -0700511 GroupId id = null;
512 if (groupDesc.givenGroupId() == null) {
513 // Get a new group identifier
514 id = new DefaultGroupId(getFreeGroupIdValue(groupDesc.deviceId()));
515 } else {
516 id = new DefaultGroupId(groupDesc.givenGroupId());
517 }
alshabib10580802015-02-18 18:30:33 -0800518 // Create a group entry object
519 StoredGroupEntry group = new DefaultGroup(id, groupDesc);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700520 // Insert the newly created group entry into key and id maps
521 getGroupStoreKeyMap().
522 put(new GroupStoreKeyMapKey(groupDesc.deviceId(),
523 groupDesc.appCookie()), group);
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700524 // Ensure it also inserted into group id based table to
525 // avoid any chances of duplication in group id generation
526 getGroupIdTable(groupDesc.deviceId()).
527 put(id, group);
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700528 log.debug("storeGroupDescriptionInternal: Processing Group ADD request for Id {} in device {}",
529 id,
530 groupDesc.deviceId());
alshabib10580802015-02-18 18:30:33 -0800531 notifyDelegate(new GroupEvent(GroupEvent.Type.GROUP_ADD_REQUESTED,
532 group));
533 }
534
535 /**
536 * Updates the existing group entry with the information
537 * from group description.
538 *
539 * @param deviceId the device ID
540 * @param oldAppCookie the current group key
541 * @param type update type
542 * @param newBuckets group buckets for updates
543 * @param newAppCookie optional new group key
544 */
545 @Override
546 public void updateGroupDescription(DeviceId deviceId,
547 GroupKey oldAppCookie,
548 UpdateType type,
549 GroupBuckets newBuckets,
550 GroupKey newAppCookie) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700551 // Check if group update to be done by a remote instance
sangho52abe3a2015-05-05 14:13:34 -0700552 if (mastershipService.getMasterFor(deviceId) != null &&
553 mastershipService.getLocalRole(deviceId) != MastershipRole.MASTER) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700554 log.debug("updateGroupDescription: Device {} local role is not MASTER",
555 deviceId);
556 if (mastershipService.getMasterFor(deviceId) == null) {
557 log.error("No Master for device {}..."
558 + "Can not perform update group operation",
559 deviceId);
560 //TODO: Send Group operation failure event
561 return;
562 }
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700563 GroupStoreMessage groupOp = GroupStoreMessage.
564 createGroupUpdateRequestMsg(deviceId,
565 oldAppCookie,
566 type,
567 newBuckets,
568 newAppCookie);
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700569
Madan Jampani175e8fd2015-05-20 14:10:45 -0700570 clusterCommunicator.unicast(groupOp,
571 GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST,
HIGUCHI Yuta180d70f2015-10-01 16:13:56 -0700572 clusterMsgSerializer::serialize,
Madan Jampani175e8fd2015-05-20 14:10:45 -0700573 mastershipService.getMasterFor(deviceId)).whenComplete((result, error) -> {
574 if (error != null) {
575 log.warn("Failed to send request to master: {} to {}",
576 groupOp,
577 mastershipService.getMasterFor(deviceId), error);
578 }
579 //TODO: Send Group operation failure event
580 });
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700581 return;
582 }
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700583 log.debug("updateGroupDescription for device {} is getting handled locally",
584 deviceId);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700585 updateGroupDescriptionInternal(deviceId,
586 oldAppCookie,
587 type,
588 newBuckets,
589 newAppCookie);
590 }
591
592 private void updateGroupDescriptionInternal(DeviceId deviceId,
593 GroupKey oldAppCookie,
594 UpdateType type,
595 GroupBuckets newBuckets,
596 GroupKey newAppCookie) {
alshabib10580802015-02-18 18:30:33 -0800597 // Check if a group is existing with the provided key
598 Group oldGroup = getGroup(deviceId, oldAppCookie);
599 if (oldGroup == null) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700600 log.warn("updateGroupDescriptionInternal: Group not found...strange");
alshabib10580802015-02-18 18:30:33 -0800601 return;
602 }
603
604 List<GroupBucket> newBucketList = getUpdatedBucketList(oldGroup,
605 type,
606 newBuckets);
607 if (newBucketList != null) {
608 // Create a new group object from the old group
609 GroupBuckets updatedBuckets = new GroupBuckets(newBucketList);
610 GroupKey newCookie = (newAppCookie != null) ? newAppCookie : oldAppCookie;
611 GroupDescription updatedGroupDesc = new DefaultGroupDescription(
612 oldGroup.deviceId(),
613 oldGroup.type(),
614 updatedBuckets,
615 newCookie,
Saurav Das100e3b82015-04-30 11:12:10 -0700616 oldGroup.givenGroupId(),
alshabib10580802015-02-18 18:30:33 -0800617 oldGroup.appId());
618 StoredGroupEntry newGroup = new DefaultGroup(oldGroup.id(),
619 updatedGroupDesc);
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700620 log.debug("updateGroupDescriptionInternal: group entry {} in device {} moving from {} to PENDING_UPDATE",
621 oldGroup.id(),
622 oldGroup.deviceId(),
623 oldGroup.state());
alshabib10580802015-02-18 18:30:33 -0800624 newGroup.setState(GroupState.PENDING_UPDATE);
625 newGroup.setLife(oldGroup.life());
626 newGroup.setPackets(oldGroup.packets());
627 newGroup.setBytes(oldGroup.bytes());
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700628 //Update the group entry in groupkey based map.
629 //Update to groupid based map will happen in the
630 //groupkey based map update listener
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700631 log.debug("updateGroupDescriptionInternal with type {}: Group updated with buckets",
632 type);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700633 getGroupStoreKeyMap().
634 put(new GroupStoreKeyMapKey(newGroup.deviceId(),
635 newGroup.appCookie()), newGroup);
alshabib10580802015-02-18 18:30:33 -0800636 notifyDelegate(new GroupEvent(Type.GROUP_UPDATE_REQUESTED, newGroup));
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700637 } else {
638 log.warn("updateGroupDescriptionInternal with type {}: No "
639 + "change in the buckets in update", type);
alshabib10580802015-02-18 18:30:33 -0800640 }
641 }
642
643 private List<GroupBucket> getUpdatedBucketList(Group oldGroup,
644 UpdateType type,
645 GroupBuckets buckets) {
646 GroupBuckets oldBuckets = oldGroup.buckets();
Sho SHIMIZU7a4087b2015-09-10 09:23:16 -0700647 List<GroupBucket> newBucketList = new ArrayList<>(oldBuckets.buckets());
alshabib10580802015-02-18 18:30:33 -0800648 boolean groupDescUpdated = false;
649
650 if (type == UpdateType.ADD) {
651 // Check if the any of the new buckets are part of
652 // the old bucket list
653 for (GroupBucket addBucket:buckets.buckets()) {
654 if (!newBucketList.contains(addBucket)) {
655 newBucketList.add(addBucket);
656 groupDescUpdated = true;
657 }
658 }
659 } else if (type == UpdateType.REMOVE) {
660 // Check if the to be removed buckets are part of the
661 // old bucket list
662 for (GroupBucket removeBucket:buckets.buckets()) {
663 if (newBucketList.contains(removeBucket)) {
664 newBucketList.remove(removeBucket);
665 groupDescUpdated = true;
666 }
667 }
668 }
669
670 if (groupDescUpdated) {
671 return newBucketList;
672 } else {
673 return null;
674 }
675 }
676
677 /**
678 * Triggers deleting the existing group entry.
679 *
680 * @param deviceId the device ID
681 * @param appCookie the group key
682 */
683 @Override
684 public void deleteGroupDescription(DeviceId deviceId,
685 GroupKey appCookie) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700686 // Check if group to be deleted by a remote instance
687 if (mastershipService.
688 getLocalRole(deviceId) != MastershipRole.MASTER) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700689 log.debug("deleteGroupDescription: Device {} local role is not MASTER",
690 deviceId);
691 if (mastershipService.getMasterFor(deviceId) == null) {
692 log.error("No Master for device {}..."
693 + "Can not perform delete group operation",
694 deviceId);
695 //TODO: Send Group operation failure event
696 return;
697 }
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700698 GroupStoreMessage groupOp = GroupStoreMessage.
699 createGroupDeleteRequestMsg(deviceId,
700 appCookie);
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700701
Madan Jampani175e8fd2015-05-20 14:10:45 -0700702 clusterCommunicator.unicast(groupOp,
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700703 GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST,
HIGUCHI Yuta180d70f2015-10-01 16:13:56 -0700704 clusterMsgSerializer::serialize,
Madan Jampani175e8fd2015-05-20 14:10:45 -0700705 mastershipService.getMasterFor(deviceId)).whenComplete((result, error) -> {
706 if (error != null) {
707 log.warn("Failed to send request to master: {} to {}",
708 groupOp,
709 mastershipService.getMasterFor(deviceId), error);
710 }
711 //TODO: Send Group operation failure event
712 });
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700713 return;
714 }
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700715 log.debug("deleteGroupDescription in device {} is getting handled locally",
716 deviceId);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700717 deleteGroupDescriptionInternal(deviceId, appCookie);
718 }
719
720 private void deleteGroupDescriptionInternal(DeviceId deviceId,
721 GroupKey appCookie) {
alshabib10580802015-02-18 18:30:33 -0800722 // Check if a group is existing with the provided key
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700723 StoredGroupEntry existing = getStoredGroupEntry(deviceId, appCookie);
alshabib10580802015-02-18 18:30:33 -0800724 if (existing == null) {
725 return;
726 }
727
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700728 log.debug("deleteGroupDescriptionInternal: group entry {} in device {} moving from {} to PENDING_DELETE",
729 existing.id(),
730 existing.deviceId(),
731 existing.state());
alshabib10580802015-02-18 18:30:33 -0800732 synchronized (existing) {
733 existing.setState(GroupState.PENDING_DELETE);
734 }
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700735 log.debug("deleteGroupDescriptionInternal: in device {} issuing GROUP_REMOVE_REQUESTED",
736 deviceId);
alshabib10580802015-02-18 18:30:33 -0800737 notifyDelegate(new GroupEvent(Type.GROUP_REMOVE_REQUESTED, existing));
738 }
739
740 /**
741 * Stores a new group entry, or updates an existing entry.
742 *
743 * @param group group entry
744 */
745 @Override
746 public void addOrUpdateGroupEntry(Group group) {
747 // check if this new entry is an update to an existing entry
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700748 StoredGroupEntry existing = getStoredGroupEntry(group.deviceId(),
749 group.id());
alshabib10580802015-02-18 18:30:33 -0800750 GroupEvent event = null;
751
752 if (existing != null) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700753 log.debug("addOrUpdateGroupEntry: updating group entry {} in device {}",
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700754 group.id(),
755 group.deviceId());
alshabib10580802015-02-18 18:30:33 -0800756 synchronized (existing) {
Srikanth Vavilapalli10e75cd2015-04-13 16:21:24 -0700757 for (GroupBucket bucket:group.buckets().buckets()) {
Sho SHIMIZU30d639b2015-05-05 09:30:35 -0700758 Optional<GroupBucket> matchingBucket =
Srikanth Vavilapalli10e75cd2015-04-13 16:21:24 -0700759 existing.buckets().buckets()
760 .stream()
761 .filter((existingBucket)->(existingBucket.equals(bucket)))
762 .findFirst();
763 if (matchingBucket.isPresent()) {
764 ((StoredGroupBucketEntry) matchingBucket.
765 get()).setPackets(bucket.packets());
766 ((StoredGroupBucketEntry) matchingBucket.
767 get()).setBytes(bucket.bytes());
768 } else {
769 log.warn("addOrUpdateGroupEntry: No matching "
770 + "buckets to update stats");
771 }
772 }
alshabib10580802015-02-18 18:30:33 -0800773 existing.setLife(group.life());
774 existing.setPackets(group.packets());
775 existing.setBytes(group.bytes());
Srikanth Vavilapalli5428b6c2015-05-14 20:22:47 -0700776 if ((existing.state() == GroupState.PENDING_ADD) ||
777 (existing.state() == GroupState.PENDING_ADD_RETRY)) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700778 log.debug("addOrUpdateGroupEntry: group entry {} in device {} moving from {} to ADDED",
779 existing.id(),
780 existing.deviceId(),
Srikanth Vavilapalli5428b6c2015-05-14 20:22:47 -0700781 existing.state());
alshabib10580802015-02-18 18:30:33 -0800782 existing.setState(GroupState.ADDED);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700783 existing.setIsGroupStateAddedFirstTime(true);
alshabib10580802015-02-18 18:30:33 -0800784 event = new GroupEvent(Type.GROUP_ADDED, existing);
785 } else {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700786 log.debug("addOrUpdateGroupEntry: group entry {} in device {} moving from {} to ADDED",
787 existing.id(),
788 existing.deviceId(),
789 GroupState.PENDING_UPDATE);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700790 existing.setState(GroupState.ADDED);
791 existing.setIsGroupStateAddedFirstTime(false);
alshabib10580802015-02-18 18:30:33 -0800792 event = new GroupEvent(Type.GROUP_UPDATED, existing);
793 }
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700794 //Re-PUT map entries to trigger map update events
795 getGroupStoreKeyMap().
796 put(new GroupStoreKeyMapKey(existing.deviceId(),
797 existing.appCookie()), existing);
alshabib10580802015-02-18 18:30:33 -0800798 }
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700799 } else {
800 log.warn("addOrUpdateGroupEntry: Group update "
801 + "happening for a non-existing entry in the map");
alshabib10580802015-02-18 18:30:33 -0800802 }
803
804 if (event != null) {
805 notifyDelegate(event);
806 }
807 }
808
809 /**
810 * Removes the group entry from store.
811 *
812 * @param group group entry
813 */
814 @Override
815 public void removeGroupEntry(Group group) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700816 StoredGroupEntry existing = getStoredGroupEntry(group.deviceId(),
817 group.id());
alshabib10580802015-02-18 18:30:33 -0800818
819 if (existing != null) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700820 log.debug("removeGroupEntry: removing group entry {} in device {}",
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700821 group.id(),
822 group.deviceId());
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700823 //Removal from groupid based map will happen in the
824 //map update listener
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700825 getGroupStoreKeyMap().remove(new GroupStoreKeyMapKey(existing.deviceId(),
826 existing.appCookie()));
alshabib10580802015-02-18 18:30:33 -0800827 notifyDelegate(new GroupEvent(Type.GROUP_REMOVED, existing));
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700828 } else {
829 log.warn("removeGroupEntry for {} in device{} is "
830 + "not existing in our maps",
831 group.id(),
832 group.deviceId());
alshabib10580802015-02-18 18:30:33 -0800833 }
834 }
835
836 @Override
837 public void deviceInitialAuditCompleted(DeviceId deviceId,
838 boolean completed) {
839 synchronized (deviceAuditStatus) {
840 if (completed) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700841 log.debug("AUDIT completed for device {}",
842 deviceId);
alshabib10580802015-02-18 18:30:33 -0800843 deviceAuditStatus.put(deviceId, true);
844 // Execute all pending group requests
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700845 List<StoredGroupEntry> pendingGroupRequests =
846 getPendingGroupKeyTable().values()
847 .stream()
848 .filter(g-> g.deviceId().equals(deviceId))
849 .collect(Collectors.toList());
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700850 log.debug("processing pending group add requests for device {} and number of pending requests {}",
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700851 deviceId,
852 pendingGroupRequests.size());
853 for (Group group:pendingGroupRequests) {
alshabib10580802015-02-18 18:30:33 -0800854 GroupDescription tmp = new DefaultGroupDescription(
855 group.deviceId(),
856 group.type(),
857 group.buckets(),
858 group.appCookie(),
Saurav Das100e3b82015-04-30 11:12:10 -0700859 group.givenGroupId(),
alshabib10580802015-02-18 18:30:33 -0800860 group.appId());
861 storeGroupDescriptionInternal(tmp);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700862 getPendingGroupKeyTable().
863 remove(new GroupStoreKeyMapKey(deviceId, group.appCookie()));
alshabib10580802015-02-18 18:30:33 -0800864 }
alshabib10580802015-02-18 18:30:33 -0800865 } else {
Thomas Vachuskac40d4632015-04-09 16:55:03 -0700866 Boolean audited = deviceAuditStatus.get(deviceId);
867 if (audited != null && audited) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700868 log.debug("Clearing AUDIT status for device {}", deviceId);
alshabib10580802015-02-18 18:30:33 -0800869 deviceAuditStatus.put(deviceId, false);
870 }
871 }
872 }
873 }
874
875 @Override
876 public boolean deviceInitialAuditStatus(DeviceId deviceId) {
877 synchronized (deviceAuditStatus) {
Thomas Vachuskac40d4632015-04-09 16:55:03 -0700878 Boolean audited = deviceAuditStatus.get(deviceId);
879 return audited != null && audited;
alshabib10580802015-02-18 18:30:33 -0800880 }
881 }
882
883 @Override
884 public void groupOperationFailed(DeviceId deviceId, GroupOperation operation) {
885
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700886 StoredGroupEntry existing = getStoredGroupEntry(deviceId,
887 operation.groupId());
alshabib10580802015-02-18 18:30:33 -0800888
889 if (existing == null) {
890 log.warn("No group entry with ID {} found ", operation.groupId());
891 return;
892 }
893
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700894 log.warn("groupOperationFailed: group operation {} failed"
895 + "for group {} in device {}",
896 operation.opType(),
897 existing.id(),
898 existing.deviceId());
alshabib10580802015-02-18 18:30:33 -0800899 switch (operation.opType()) {
900 case ADD:
Srikanth Vavilapalli5428b6c2015-05-14 20:22:47 -0700901 if (existing.state() == GroupState.PENDING_ADD) {
902 //TODO: Need to add support for passing the group
903 //operation failure reason from group provider.
904 //If the error type is anything other than GROUP_EXISTS,
905 //then the GROUP_ADD_FAILED event should be raised even
906 //in PENDING_ADD_RETRY state also.
907 notifyDelegate(new GroupEvent(Type.GROUP_ADD_FAILED, existing));
908 log.warn("groupOperationFailed: cleaningup "
909 + "group {} from store in device {}....",
910 existing.id(),
911 existing.deviceId());
912 //Removal from groupid based map will happen in the
913 //map update listener
914 getGroupStoreKeyMap().remove(new GroupStoreKeyMapKey(existing.deviceId(),
915 existing.appCookie()));
916 }
alshabib10580802015-02-18 18:30:33 -0800917 break;
918 case MODIFY:
919 notifyDelegate(new GroupEvent(Type.GROUP_UPDATE_FAILED, existing));
920 break;
921 case DELETE:
922 notifyDelegate(new GroupEvent(Type.GROUP_REMOVE_FAILED, existing));
923 break;
924 default:
925 log.warn("Unknown group operation type {}", operation.opType());
926 }
alshabib10580802015-02-18 18:30:33 -0800927 }
928
929 @Override
930 public void addOrUpdateExtraneousGroupEntry(Group group) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700931 log.debug("add/update extraneous group entry {} in device {}",
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700932 group.id(),
933 group.deviceId());
alshabib10580802015-02-18 18:30:33 -0800934 ConcurrentMap<GroupId, Group> extraneousIdTable =
935 getExtraneousGroupIdTable(group.deviceId());
936 extraneousIdTable.put(group.id(), group);
Srikanth Vavilapallie48b3cf2015-07-06 11:43:07 -0700937 // Don't remove the extraneous groups, instead re-use it when
938 // a group request comes with the same set of buckets
alshabib10580802015-02-18 18:30:33 -0800939 }
940
941 @Override
942 public void removeExtraneousGroupEntry(Group group) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700943 log.debug("remove extraneous group entry {} of device {} from store",
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700944 group.id(),
945 group.deviceId());
alshabib10580802015-02-18 18:30:33 -0800946 ConcurrentMap<GroupId, Group> extraneousIdTable =
947 getExtraneousGroupIdTable(group.deviceId());
948 extraneousIdTable.remove(group.id());
949 }
950
951 @Override
952 public Iterable<Group> getExtraneousGroups(DeviceId deviceId) {
953 // flatten and make iterator unmodifiable
954 return FluentIterable.from(
955 getExtraneousGroupIdTable(deviceId).values());
956 }
957
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700958 /**
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700959 * Map handler to receive any events when the group key map is updated.
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700960 */
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700961 private class GroupStoreKeyMapListener implements
962 EventuallyConsistentMapListener<GroupStoreKeyMapKey, StoredGroupEntry> {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700963
964 @Override
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700965 public void event(EventuallyConsistentMapEvent<GroupStoreKeyMapKey,
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700966 StoredGroupEntry> mapEvent) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700967 GroupEvent groupEvent = null;
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700968 GroupStoreKeyMapKey key = mapEvent.key();
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700969 StoredGroupEntry group = mapEvent.value();
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700970 if ((key == null) && (group == null)) {
971 log.error("GroupStoreKeyMapListener: Received "
972 + "event {} with null entry", mapEvent.type());
973 return;
974 } else if (group == null) {
975 group = getGroupIdTable(key.deviceId()).values()
976 .stream()
977 .filter((storedGroup) -> (storedGroup.appCookie().equals(key.appCookie)))
978 .findFirst().get();
979 if (group == null) {
980 log.error("GroupStoreKeyMapListener: Received "
981 + "event {} with null entry... can not process", mapEvent.type());
982 return;
983 }
984 }
985 log.trace("received groupid map event {} for id {} in device {}",
986 mapEvent.type(),
987 group.id(),
988 key.deviceId());
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700989 if (mapEvent.type() == EventuallyConsistentMapEvent.Type.PUT) {
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700990 // Update the group ID table
991 getGroupIdTable(group.deviceId()).put(group.id(), group);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700992 if (mapEvent.value().state() == Group.GroupState.ADDED) {
993 if (mapEvent.value().isGroupStateAddedFirstTime()) {
994 groupEvent = new GroupEvent(Type.GROUP_ADDED,
995 mapEvent.value());
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700996 log.trace("Received first time GROUP_ADDED state update for id {} in device {}",
997 group.id(),
998 group.deviceId());
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700999 } else {
1000 groupEvent = new GroupEvent(Type.GROUP_UPDATED,
1001 mapEvent.value());
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001002 log.trace("Received following GROUP_ADDED state update for id {} in device {}",
1003 group.id(),
1004 group.deviceId());
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001005 }
1006 }
1007 } else if (mapEvent.type() == EventuallyConsistentMapEvent.Type.REMOVE) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001008 groupEvent = new GroupEvent(Type.GROUP_REMOVED, group);
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -07001009 // Remove the entry from the group ID table
1010 getGroupIdTable(group.deviceId()).remove(group.id(), group);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001011 }
1012
1013 if (groupEvent != null) {
1014 notifyDelegate(groupEvent);
1015 }
1016 }
1017 }
Madan Jampani01e05fb2015-08-13 13:29:36 -07001018
1019 private void process(GroupStoreMessage groupOp) {
1020 log.debug("Received remote group operation {} request for device {}",
1021 groupOp.type(),
1022 groupOp.deviceId());
1023 if (!mastershipService.isLocalMaster(groupOp.deviceId())) {
1024 log.warn("This node is not MASTER for device {}", groupOp.deviceId());
1025 return;
1026 }
1027 if (groupOp.type() == GroupStoreMessage.Type.ADD) {
1028 storeGroupDescriptionInternal(groupOp.groupDesc());
1029 } else if (groupOp.type() == GroupStoreMessage.Type.UPDATE) {
1030 updateGroupDescriptionInternal(groupOp.deviceId(),
1031 groupOp.appCookie(),
1032 groupOp.updateType(),
1033 groupOp.updateBuckets(),
1034 groupOp.newAppCookie());
1035 } else if (groupOp.type() == GroupStoreMessage.Type.DELETE) {
1036 deleteGroupDescriptionInternal(groupOp.deviceId(),
1037 groupOp.appCookie());
1038 }
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001039 }
1040
1041 /**
1042 * Flattened map key to be used to store group entries.
1043 */
Ray Milkeyb3c5ce22015-08-10 09:07:36 -07001044 protected static class GroupStoreMapKey {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001045 private final DeviceId deviceId;
1046
1047 public GroupStoreMapKey(DeviceId deviceId) {
1048 this.deviceId = deviceId;
1049 }
1050
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001051 public DeviceId deviceId() {
1052 return deviceId;
1053 }
1054
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001055 @Override
1056 public boolean equals(Object o) {
1057 if (this == o) {
1058 return true;
1059 }
1060 if (!(o instanceof GroupStoreMapKey)) {
1061 return false;
1062 }
1063 GroupStoreMapKey that = (GroupStoreMapKey) o;
1064 return this.deviceId.equals(that.deviceId);
1065 }
1066
1067 @Override
1068 public int hashCode() {
1069 int result = 17;
1070
1071 result = 31 * result + Objects.hash(this.deviceId);
1072
1073 return result;
1074 }
1075 }
1076
Ray Milkeyb3c5ce22015-08-10 09:07:36 -07001077 protected static class GroupStoreKeyMapKey extends GroupStoreMapKey {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001078 private final GroupKey appCookie;
1079 public GroupStoreKeyMapKey(DeviceId deviceId,
1080 GroupKey appCookie) {
1081 super(deviceId);
1082 this.appCookie = appCookie;
1083 }
1084
1085 @Override
1086 public boolean equals(Object o) {
1087 if (this == o) {
1088 return true;
1089 }
1090 if (!(o instanceof GroupStoreKeyMapKey)) {
1091 return false;
1092 }
1093 GroupStoreKeyMapKey that = (GroupStoreKeyMapKey) o;
1094 return (super.equals(that) &&
1095 this.appCookie.equals(that.appCookie));
1096 }
1097
1098 @Override
1099 public int hashCode() {
1100 int result = 17;
1101
1102 result = 31 * result + super.hashCode() + Objects.hash(this.appCookie);
1103
1104 return result;
1105 }
1106 }
1107
Ray Milkeyb3c5ce22015-08-10 09:07:36 -07001108 protected static class GroupStoreIdMapKey extends GroupStoreMapKey {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001109 private final GroupId groupId;
1110 public GroupStoreIdMapKey(DeviceId deviceId,
1111 GroupId groupId) {
1112 super(deviceId);
1113 this.groupId = groupId;
1114 }
1115
1116 @Override
1117 public boolean equals(Object o) {
1118 if (this == o) {
1119 return true;
1120 }
1121 if (!(o instanceof GroupStoreIdMapKey)) {
1122 return false;
1123 }
1124 GroupStoreIdMapKey that = (GroupStoreIdMapKey) o;
1125 return (super.equals(that) &&
1126 this.groupId.equals(that.groupId));
1127 }
1128
1129 @Override
1130 public int hashCode() {
1131 int result = 17;
1132
1133 result = 31 * result + super.hashCode() + Objects.hash(this.groupId);
1134
1135 return result;
1136 }
1137 }
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001138
1139 @Override
1140 public void pushGroupMetrics(DeviceId deviceId,
1141 Collection<Group> groupEntries) {
1142 boolean deviceInitialAuditStatus =
1143 deviceInitialAuditStatus(deviceId);
1144 Set<Group> southboundGroupEntries =
1145 Sets.newHashSet(groupEntries);
1146 Set<StoredGroupEntry> storedGroupEntries =
1147 Sets.newHashSet(getStoredGroups(deviceId));
1148 Set<Group> extraneousStoredEntries =
1149 Sets.newHashSet(getExtraneousGroups(deviceId));
1150
1151 log.trace("pushGroupMetrics: Displaying all ({}) southboundGroupEntries for device {}",
1152 southboundGroupEntries.size(),
1153 deviceId);
1154 for (Iterator<Group> it = southboundGroupEntries.iterator(); it.hasNext();) {
1155 Group group = it.next();
1156 log.trace("Group {} in device {}", group, deviceId);
1157 }
1158
1159 log.trace("Displaying all ({}) stored group entries for device {}",
1160 storedGroupEntries.size(),
1161 deviceId);
1162 for (Iterator<StoredGroupEntry> it1 = storedGroupEntries.iterator();
1163 it1.hasNext();) {
1164 Group group = it1.next();
1165 log.trace("Stored Group {} for device {}", group, deviceId);
1166 }
1167
1168 for (Iterator<Group> it2 = southboundGroupEntries.iterator(); it2.hasNext();) {
1169 Group group = it2.next();
1170 if (storedGroupEntries.remove(group)) {
1171 // we both have the group, let's update some info then.
1172 log.trace("Group AUDIT: group {} exists in both planes for device {}",
1173 group.id(), deviceId);
1174 groupAdded(group);
1175 it2.remove();
1176 }
1177 }
1178 for (Group group : southboundGroupEntries) {
1179 if (getGroup(group.deviceId(), group.id()) != null) {
1180 // There is a group existing with the same id
1181 // It is possible that group update is
1182 // in progress while we got a stale info from switch
1183 if (!storedGroupEntries.remove(getGroup(
1184 group.deviceId(), group.id()))) {
1185 log.warn("Group AUDIT: Inconsistent state:"
1186 + "Group exists in ID based table while "
1187 + "not present in key based table");
1188 }
1189 } else {
1190 // there are groups in the switch that aren't in the store
1191 log.debug("Group AUDIT: extraneous group {} exists in data plane for device {}",
1192 group.id(), deviceId);
1193 extraneousStoredEntries.remove(group);
1194 extraneousGroup(group);
1195 }
1196 }
1197 for (Group group : storedGroupEntries) {
1198 // there are groups in the store that aren't in the switch
1199 log.debug("Group AUDIT: group {} missing in data plane for device {}",
1200 group.id(), deviceId);
1201 groupMissing(group);
1202 }
1203 for (Group group : extraneousStoredEntries) {
1204 // there are groups in the extraneous store that
1205 // aren't in the switch
1206 log.debug("Group AUDIT: clearing extransoeus group {} from store for device {}",
1207 group.id(), deviceId);
1208 removeExtraneousGroupEntry(group);
1209 }
1210
1211 if (!deviceInitialAuditStatus) {
1212 log.debug("Group AUDIT: Setting device {} initial AUDIT completed",
1213 deviceId);
1214 deviceInitialAuditCompleted(deviceId, true);
1215 }
1216 }
1217
1218 private void groupMissing(Group group) {
1219 switch (group.state()) {
1220 case PENDING_DELETE:
1221 log.debug("Group {} delete confirmation from device {}",
1222 group, group.deviceId());
1223 removeGroupEntry(group);
1224 break;
1225 case ADDED:
1226 case PENDING_ADD:
Srikanth Vavilapalli5428b6c2015-05-14 20:22:47 -07001227 case PENDING_ADD_RETRY:
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001228 case PENDING_UPDATE:
1229 log.debug("Group {} is in store but not on device {}",
1230 group, group.deviceId());
1231 StoredGroupEntry existing =
1232 getStoredGroupEntry(group.deviceId(), group.id());
Srikanth Vavilapalli5428b6c2015-05-14 20:22:47 -07001233 log.debug("groupMissing: group entry {} in device {} moving from {} to PENDING_ADD_RETRY",
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001234 existing.id(),
1235 existing.deviceId(),
1236 existing.state());
Srikanth Vavilapalli5428b6c2015-05-14 20:22:47 -07001237 existing.setState(Group.GroupState.PENDING_ADD_RETRY);
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001238 //Re-PUT map entries to trigger map update events
1239 getGroupStoreKeyMap().
1240 put(new GroupStoreKeyMapKey(existing.deviceId(),
1241 existing.appCookie()), existing);
1242 notifyDelegate(new GroupEvent(GroupEvent.Type.GROUP_ADD_REQUESTED,
1243 group));
1244 break;
1245 default:
1246 log.debug("Group {} has not been installed.", group);
1247 break;
1248 }
1249 }
1250
1251 private void extraneousGroup(Group group) {
1252 log.debug("Group {} is on device {} but not in store.",
1253 group, group.deviceId());
1254 addOrUpdateExtraneousGroupEntry(group);
1255 }
1256
1257 private void groupAdded(Group group) {
1258 log.trace("Group {} Added or Updated in device {}",
1259 group, group.deviceId());
1260 addOrUpdateGroupEntry(group);
1261 }
alshabib10580802015-02-18 18:30:33 -08001262}