blob: cc32a735e1c2ff7be05db13d18694d12b2f48f0c [file] [log] [blame]
alshabib10580802015-02-18 18:30:33 -08001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.group.impl;
17
Jonathan Hart6ec029a2015-03-24 17:12:35 -070018import com.google.common.collect.FluentIterable;
19import com.google.common.collect.Iterables;
Srikanth Vavilapalli23181912015-05-04 09:48:09 -070020import com.google.common.collect.Sets;
Srikanth Vavilapalli10e75cd2015-04-13 16:21:24 -070021
alshabib10580802015-02-18 18:30:33 -080022import org.apache.felix.scr.annotations.Activate;
23import org.apache.felix.scr.annotations.Component;
24import org.apache.felix.scr.annotations.Deactivate;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070025import org.apache.felix.scr.annotations.Reference;
26import org.apache.felix.scr.annotations.ReferenceCardinality;
alshabib10580802015-02-18 18:30:33 -080027import org.apache.felix.scr.annotations.Service;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070028import org.onlab.util.KryoNamespace;
alshabib10580802015-02-18 18:30:33 -080029import org.onlab.util.NewConcurrentHashMap;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070030import org.onosproject.cluster.ClusterService;
alshabib10580802015-02-18 18:30:33 -080031import org.onosproject.core.DefaultGroupId;
32import org.onosproject.core.GroupId;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070033import org.onosproject.mastership.MastershipService;
alshabib10580802015-02-18 18:30:33 -080034import org.onosproject.net.DeviceId;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070035import org.onosproject.net.MastershipRole;
alshabib10580802015-02-18 18:30:33 -080036import org.onosproject.net.group.DefaultGroup;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070037import org.onosproject.net.group.DefaultGroupBucket;
alshabib10580802015-02-18 18:30:33 -080038import org.onosproject.net.group.DefaultGroupDescription;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070039import org.onosproject.net.group.DefaultGroupKey;
alshabib10580802015-02-18 18:30:33 -080040import org.onosproject.net.group.Group;
41import org.onosproject.net.group.Group.GroupState;
42import org.onosproject.net.group.GroupBucket;
43import org.onosproject.net.group.GroupBuckets;
44import org.onosproject.net.group.GroupDescription;
45import org.onosproject.net.group.GroupEvent;
46import org.onosproject.net.group.GroupEvent.Type;
47import org.onosproject.net.group.GroupKey;
48import org.onosproject.net.group.GroupOperation;
49import org.onosproject.net.group.GroupStore;
50import org.onosproject.net.group.GroupStoreDelegate;
Srikanth Vavilapalli10e75cd2015-04-13 16:21:24 -070051import org.onosproject.net.group.StoredGroupBucketEntry;
alshabib10580802015-02-18 18:30:33 -080052import org.onosproject.net.group.StoredGroupEntry;
53import org.onosproject.store.AbstractStore;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070054import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
Jonathan Hart63939a32015-05-08 11:57:03 -070055import org.onosproject.store.service.MultiValuedTimestamp;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070056import org.onosproject.store.serializers.KryoNamespaces;
Jonathan Hart6ec029a2015-03-24 17:12:35 -070057import org.onosproject.store.service.EventuallyConsistentMap;
58import org.onosproject.store.service.EventuallyConsistentMapBuilder;
59import org.onosproject.store.service.EventuallyConsistentMapEvent;
60import org.onosproject.store.service.EventuallyConsistentMapListener;
61import org.onosproject.store.service.StorageService;
alshabib10580802015-02-18 18:30:33 -080062import org.slf4j.Logger;
63
Jonathan Hart6ec029a2015-03-24 17:12:35 -070064import java.util.ArrayList;
Srikanth Vavilapalli23181912015-05-04 09:48:09 -070065import java.util.Collection;
Jonathan Hart6ec029a2015-03-24 17:12:35 -070066import java.util.HashMap;
Srikanth Vavilapalli23181912015-05-04 09:48:09 -070067import java.util.Iterator;
Jonathan Hart6ec029a2015-03-24 17:12:35 -070068import java.util.List;
69import java.util.Objects;
Sho SHIMIZU30d639b2015-05-05 09:30:35 -070070import java.util.Optional;
Srikanth Vavilapalli23181912015-05-04 09:48:09 -070071import java.util.Set;
Jonathan Hart6ec029a2015-03-24 17:12:35 -070072import java.util.concurrent.ConcurrentHashMap;
73import java.util.concurrent.ConcurrentMap;
74import java.util.concurrent.ExecutorService;
75import java.util.concurrent.Executors;
76import java.util.concurrent.atomic.AtomicInteger;
77import java.util.concurrent.atomic.AtomicLong;
78import java.util.stream.Collectors;
79
80import static org.apache.commons.lang3.concurrent.ConcurrentUtils.createIfAbsentUnchecked;
81import static org.onlab.util.Tools.groupedThreads;
82import static org.slf4j.LoggerFactory.getLogger;
alshabib10580802015-02-18 18:30:33 -080083
84/**
85 * Manages inventory of group entries using trivial in-memory implementation.
86 */
87@Component(immediate = true)
88@Service
89public class DistributedGroupStore
90 extends AbstractStore<GroupEvent, GroupStoreDelegate>
91 implements GroupStore {
92
93 private final Logger log = getLogger(getClass());
94
95 private final int dummyId = 0xffffffff;
96 private final GroupId dummyGroupId = new DefaultGroupId(dummyId);
97
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070098 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
99 protected ClusterCommunicationService clusterCommunicator;
100
101 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
102 protected ClusterService clusterService;
103
104 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700105 protected StorageService storageService;
106
107 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700108 protected MastershipService mastershipService;
109
110 // Per device group table with (device id + app cookie) as key
111 private EventuallyConsistentMap<GroupStoreKeyMapKey,
112 StoredGroupEntry> groupStoreEntriesByKey = null;
113 // Per device group table with (device id + group id) as key
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700114 private final ConcurrentMap<DeviceId, ConcurrentMap<GroupId, StoredGroupEntry>>
115 groupEntriesById = new ConcurrentHashMap<>();
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700116 private EventuallyConsistentMap<GroupStoreKeyMapKey,
117 StoredGroupEntry> auditPendingReqQueue = null;
alshabib10580802015-02-18 18:30:33 -0800118 private final ConcurrentMap<DeviceId, ConcurrentMap<GroupId, Group>>
119 extraneousGroupEntriesById = new ConcurrentHashMap<>();
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700120 private ExecutorService messageHandlingExecutor;
121 private static final int MESSAGE_HANDLER_THREAD_POOL_SIZE = 1;
alshabib10580802015-02-18 18:30:33 -0800122
Sho SHIMIZU7a4087b2015-09-10 09:23:16 -0700123 private final HashMap<DeviceId, Boolean> deviceAuditStatus = new HashMap<>();
alshabib10580802015-02-18 18:30:33 -0800124
125 private final AtomicInteger groupIdGen = new AtomicInteger();
126
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700127 private KryoNamespace.Builder kryoBuilder = null;
128
Madan Jampanibcf1a482015-06-24 19:05:56 -0700129 private final AtomicLong sequenceNumber = new AtomicLong(0);
130
HIGUCHI Yuta180d70f2015-10-01 16:13:56 -0700131 private KryoNamespace clusterMsgSerializer;
132
alshabib10580802015-02-18 18:30:33 -0800133 @Activate
134 public void activate() {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700135 kryoBuilder = new KryoNamespace.Builder()
Charles Chan138cd5a2015-09-29 16:57:41 -0700136 .register(KryoNamespaces.API)
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700137 .register(DefaultGroup.class,
138 DefaultGroupBucket.class,
139 DefaultGroupDescription.class,
140 DefaultGroupKey.class,
141 GroupDescription.Type.class,
142 Group.GroupState.class,
143 GroupBuckets.class,
144 DefaultGroupId.class,
145 GroupStoreMessage.class,
146 GroupStoreMessage.Type.class,
147 UpdateType.class,
148 GroupStoreMessageSubjects.class,
149 MultiValuedTimestamp.class,
150 GroupStoreKeyMapKey.class,
151 GroupStoreIdMapKey.class,
152 GroupStoreMapKey.class
Charles Chan138cd5a2015-09-29 16:57:41 -0700153 );
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700154
HIGUCHI Yuta180d70f2015-10-01 16:13:56 -0700155 clusterMsgSerializer = kryoBuilder.build();
156
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700157 messageHandlingExecutor = Executors.
158 newFixedThreadPool(MESSAGE_HANDLER_THREAD_POOL_SIZE,
159 groupedThreads("onos/store/group",
160 "message-handlers"));
Madan Jampani01e05fb2015-08-13 13:29:36 -0700161
162 clusterCommunicator.addSubscriber(GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST,
HIGUCHI Yuta180d70f2015-10-01 16:13:56 -0700163 clusterMsgSerializer::deserialize,
Madan Jampani01e05fb2015-08-13 13:29:36 -0700164 this::process,
165 messageHandlingExecutor);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700166
167 log.debug("Creating EC map groupstorekeymap");
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700168 EventuallyConsistentMapBuilder<GroupStoreKeyMapKey, StoredGroupEntry>
169 keyMapBuilder = storageService.eventuallyConsistentMapBuilder();
170
171 groupStoreEntriesByKey = keyMapBuilder
172 .withName("groupstorekeymap")
173 .withSerializer(kryoBuilder)
Madan Jampanibcf1a482015-06-24 19:05:56 -0700174 .withTimestampProvider((k, v) -> new MultiValuedTimestamp<>(System.currentTimeMillis(),
175 sequenceNumber.getAndIncrement()))
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700176 .build();
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700177 groupStoreEntriesByKey.addListener(new GroupStoreKeyMapListener());
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700178 log.debug("Current size of groupstorekeymap:{}",
179 groupStoreEntriesByKey.size());
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700180
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700181 log.debug("Creating EC map pendinggroupkeymap");
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700182 EventuallyConsistentMapBuilder<GroupStoreKeyMapKey, StoredGroupEntry>
183 auditMapBuilder = storageService.eventuallyConsistentMapBuilder();
184
185 auditPendingReqQueue = auditMapBuilder
186 .withName("pendinggroupkeymap")
187 .withSerializer(kryoBuilder)
Madan Jampanibcf1a482015-06-24 19:05:56 -0700188 .withTimestampProvider((k, v) -> new MultiValuedTimestamp<>(System.currentTimeMillis(),
189 sequenceNumber.getAndIncrement()))
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700190 .build();
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700191 log.debug("Current size of pendinggroupkeymap:{}",
192 auditPendingReqQueue.size());
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700193
alshabib10580802015-02-18 18:30:33 -0800194 log.info("Started");
195 }
196
197 @Deactivate
198 public void deactivate() {
HIGUCHI Yuta180d70f2015-10-01 16:13:56 -0700199 clusterCommunicator.removeSubscriber(GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST);
Thomas Vachuska152f9fd2015-04-02 16:28:13 -0700200 groupStoreEntriesByKey.destroy();
Thomas Vachuska152f9fd2015-04-02 16:28:13 -0700201 auditPendingReqQueue.destroy();
alshabib10580802015-02-18 18:30:33 -0800202 log.info("Stopped");
203 }
204
alshabib10580802015-02-18 18:30:33 -0800205 private static NewConcurrentHashMap<GroupId, Group>
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700206 lazyEmptyExtraneousGroupIdTable() {
alshabib10580802015-02-18 18:30:33 -0800207 return NewConcurrentHashMap.<GroupId, Group>ifNeeded();
208 }
209
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700210 private static NewConcurrentHashMap<GroupId, StoredGroupEntry>
211 lazyEmptyGroupIdTable() {
212 return NewConcurrentHashMap.<GroupId, StoredGroupEntry>ifNeeded();
213 }
214
alshabib10580802015-02-18 18:30:33 -0800215 /**
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700216 * Returns the group store eventual consistent key map.
alshabib10580802015-02-18 18:30:33 -0800217 *
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700218 * @return Map representing group key table.
alshabib10580802015-02-18 18:30:33 -0800219 */
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700220 private EventuallyConsistentMap<GroupStoreKeyMapKey, StoredGroupEntry>
221 getGroupStoreKeyMap() {
222 return groupStoreEntriesByKey;
alshabib10580802015-02-18 18:30:33 -0800223 }
224
225 /**
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700226 * Returns the group id table for specified device.
alshabib10580802015-02-18 18:30:33 -0800227 *
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700228 * @param deviceId identifier of the device
229 * @return Map representing group key table of given device.
alshabib10580802015-02-18 18:30:33 -0800230 */
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700231 private ConcurrentMap<GroupId, StoredGroupEntry> getGroupIdTable(DeviceId deviceId) {
232 return createIfAbsentUnchecked(groupEntriesById,
233 deviceId, lazyEmptyGroupIdTable());
alshabib10580802015-02-18 18:30:33 -0800234 }
235
236 /**
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700237 * Returns the pending group request table.
alshabib10580802015-02-18 18:30:33 -0800238 *
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700239 * @return Map representing group key table.
alshabib10580802015-02-18 18:30:33 -0800240 */
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700241 private EventuallyConsistentMap<GroupStoreKeyMapKey, StoredGroupEntry>
242 getPendingGroupKeyTable() {
243 return auditPendingReqQueue;
alshabib10580802015-02-18 18:30:33 -0800244 }
245
246 /**
247 * Returns the extraneous group id table for specified device.
248 *
249 * @param deviceId identifier of the device
250 * @return Map representing group key table of given device.
251 */
252 private ConcurrentMap<GroupId, Group>
253 getExtraneousGroupIdTable(DeviceId deviceId) {
254 return createIfAbsentUnchecked(extraneousGroupEntriesById,
255 deviceId,
256 lazyEmptyExtraneousGroupIdTable());
257 }
258
259 /**
260 * Returns the number of groups for the specified device in the store.
261 *
262 * @return number of groups for the specified device
263 */
264 @Override
265 public int getGroupCount(DeviceId deviceId) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700266 return (getGroups(deviceId) != null) ?
267 Iterables.size(getGroups(deviceId)) : 0;
alshabib10580802015-02-18 18:30:33 -0800268 }
269
270 /**
271 * Returns the groups associated with a device.
272 *
273 * @param deviceId the device ID
274 *
275 * @return the group entries
276 */
277 @Override
278 public Iterable<Group> getGroups(DeviceId deviceId) {
279 // flatten and make iterator unmodifiable
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700280 return FluentIterable.from(getGroupStoreKeyMap().values())
281 .filter(input -> input.deviceId().equals(deviceId))
282 .transform(input -> input);
alshabib10580802015-02-18 18:30:33 -0800283 }
284
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700285 private Iterable<StoredGroupEntry> getStoredGroups(DeviceId deviceId) {
286 // flatten and make iterator unmodifiable
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700287 return FluentIterable.from(getGroupStoreKeyMap().values())
288 .filter(input -> input.deviceId().equals(deviceId));
289 }
290
alshabib10580802015-02-18 18:30:33 -0800291 /**
292 * Returns the stored group entry.
293 *
294 * @param deviceId the device ID
295 * @param appCookie the group key
296 *
297 * @return a group associated with the key
298 */
299 @Override
300 public Group getGroup(DeviceId deviceId, GroupKey appCookie) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700301 return getStoredGroupEntry(deviceId, appCookie);
302 }
303
304 private StoredGroupEntry getStoredGroupEntry(DeviceId deviceId,
305 GroupKey appCookie) {
306 return getGroupStoreKeyMap().get(new GroupStoreKeyMapKey(deviceId,
307 appCookie));
308 }
309
310 @Override
311 public Group getGroup(DeviceId deviceId, GroupId groupId) {
312 return getStoredGroupEntry(deviceId, groupId);
313 }
314
315 private StoredGroupEntry getStoredGroupEntry(DeviceId deviceId,
316 GroupId groupId) {
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700317 return getGroupIdTable(deviceId).get(groupId);
alshabib10580802015-02-18 18:30:33 -0800318 }
319
320 private int getFreeGroupIdValue(DeviceId deviceId) {
321 int freeId = groupIdGen.incrementAndGet();
322
323 while (true) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700324 Group existing = getGroup(deviceId, new DefaultGroupId(freeId));
alshabib10580802015-02-18 18:30:33 -0800325 if (existing == null) {
326 existing = (
327 extraneousGroupEntriesById.get(deviceId) != null) ?
328 extraneousGroupEntriesById.get(deviceId).
329 get(new DefaultGroupId(freeId)) :
330 null;
331 }
332 if (existing != null) {
333 freeId = groupIdGen.incrementAndGet();
334 } else {
335 break;
336 }
337 }
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700338 log.debug("getFreeGroupIdValue: Next Free ID is {}", freeId);
alshabib10580802015-02-18 18:30:33 -0800339 return freeId;
340 }
341
342 /**
343 * Stores a new group entry using the information from group description.
344 *
345 * @param groupDesc group description to be used to create group entry
346 */
347 @Override
348 public void storeGroupDescription(GroupDescription groupDesc) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700349 log.debug("In storeGroupDescription");
alshabib10580802015-02-18 18:30:33 -0800350 // Check if a group is existing with the same key
Saurav Das8a0732e2015-11-20 15:27:53 -0800351 Group existingGroup = getGroup(groupDesc.deviceId(), groupDesc.appCookie());
352 if (existingGroup != null) {
Saurav Das4ce45962015-11-24 23:21:05 -0800353 log.warn("Group already exists with the same key {} in dev:{} with id:0x{}",
Saurav Das8a0732e2015-11-20 15:27:53 -0800354 groupDesc.appCookie(), groupDesc.deviceId(),
355 Integer.toHexString(existingGroup.id().id()));
alshabib10580802015-02-18 18:30:33 -0800356 return;
357 }
358
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700359 // Check if group to be created by a remote instance
Madan Jampani175e8fd2015-05-20 14:10:45 -0700360 if (mastershipService.getLocalRole(groupDesc.deviceId()) != MastershipRole.MASTER) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700361 log.debug("storeGroupDescription: Device {} local role is not MASTER",
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700362 groupDesc.deviceId());
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700363 if (mastershipService.getMasterFor(groupDesc.deviceId()) == null) {
364 log.error("No Master for device {}..."
365 + "Can not perform add group operation",
366 groupDesc.deviceId());
367 //TODO: Send Group operation failure event
368 return;
369 }
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700370 GroupStoreMessage groupOp = GroupStoreMessage.
371 createGroupAddRequestMsg(groupDesc.deviceId(),
372 groupDesc);
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700373
Madan Jampani175e8fd2015-05-20 14:10:45 -0700374 clusterCommunicator.unicast(groupOp,
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700375 GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST,
HIGUCHI Yuta180d70f2015-10-01 16:13:56 -0700376 clusterMsgSerializer::serialize,
Madan Jampani175e8fd2015-05-20 14:10:45 -0700377 mastershipService.getMasterFor(groupDesc.deviceId())).whenComplete((result, error) -> {
378 if (error != null) {
379 log.warn("Failed to send request to master: {} to {}",
380 groupOp,
381 mastershipService.getMasterFor(groupDesc.deviceId()));
382 //TODO: Send Group operation failure event
383 } else {
384 log.debug("Sent Group operation request for device {} "
385 + "to remote MASTER {}",
386 groupDesc.deviceId(),
387 mastershipService.getMasterFor(groupDesc.deviceId()));
388 }
389 });
alshabib10580802015-02-18 18:30:33 -0800390 return;
391 }
392
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700393 log.debug("Store group for device {} is getting handled locally",
394 groupDesc.deviceId());
alshabib10580802015-02-18 18:30:33 -0800395 storeGroupDescriptionInternal(groupDesc);
396 }
397
Srikanth Vavilapallie48b3cf2015-07-06 11:43:07 -0700398 private Group getMatchingExtraneousGroupbyId(DeviceId deviceId, Integer groupId) {
399 ConcurrentMap<GroupId, Group> extraneousMap =
400 extraneousGroupEntriesById.get(deviceId);
401 if (extraneousMap == null) {
402 return null;
403 }
404 return extraneousMap.get(new DefaultGroupId(groupId));
405 }
406
407 private Group getMatchingExtraneousGroupbyBuckets(DeviceId deviceId,
408 GroupBuckets buckets) {
409 ConcurrentMap<GroupId, Group> extraneousMap =
410 extraneousGroupEntriesById.get(deviceId);
411 if (extraneousMap == null) {
412 return null;
413 }
414
415 for (Group extraneousGroup:extraneousMap.values()) {
416 if (extraneousGroup.buckets().equals(buckets)) {
417 return extraneousGroup;
418 }
419 }
420 return null;
421 }
422
alshabib10580802015-02-18 18:30:33 -0800423 private void storeGroupDescriptionInternal(GroupDescription groupDesc) {
424 // Check if a group is existing with the same key
425 if (getGroup(groupDesc.deviceId(), groupDesc.appCookie()) != null) {
426 return;
427 }
428
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700429 if (deviceAuditStatus.get(groupDesc.deviceId()) == null) {
430 // Device group audit has not completed yet
431 // Add this group description to pending group key table
432 // Create a group entry object with Dummy Group ID
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700433 log.debug("storeGroupDescriptionInternal: Device {} AUDIT pending...Queuing Group ADD request",
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700434 groupDesc.deviceId());
435 StoredGroupEntry group = new DefaultGroup(dummyGroupId, groupDesc);
436 group.setState(GroupState.WAITING_AUDIT_COMPLETE);
437 EventuallyConsistentMap<GroupStoreKeyMapKey, StoredGroupEntry> pendingKeyTable =
438 getPendingGroupKeyTable();
439 pendingKeyTable.put(new GroupStoreKeyMapKey(groupDesc.deviceId(),
440 groupDesc.appCookie()),
441 group);
442 return;
443 }
444
Srikanth Vavilapallie48b3cf2015-07-06 11:43:07 -0700445 Group matchingExtraneousGroup = null;
446 if (groupDesc.givenGroupId() != null) {
447 //Check if there is a extraneous group existing with the same Id
448 matchingExtraneousGroup = getMatchingExtraneousGroupbyId(
449 groupDesc.deviceId(), groupDesc.givenGroupId());
450 if (matchingExtraneousGroup != null) {
451 log.debug("storeGroupDescriptionInternal: Matching extraneous group found in Device {} for group id {}",
452 groupDesc.deviceId(),
453 groupDesc.givenGroupId());
454 //Check if the group buckets matches with user provided buckets
455 if (matchingExtraneousGroup.buckets().equals(groupDesc.buckets())) {
456 //Group is already existing with the same buckets and Id
457 // Create a group entry object
458 log.debug("storeGroupDescriptionInternal: Buckets also matching in Device {} for group id {}",
459 groupDesc.deviceId(),
460 groupDesc.givenGroupId());
461 StoredGroupEntry group = new DefaultGroup(
462 matchingExtraneousGroup.id(), groupDesc);
463 // Insert the newly created group entry into key and id maps
464 getGroupStoreKeyMap().
465 put(new GroupStoreKeyMapKey(groupDesc.deviceId(),
466 groupDesc.appCookie()), group);
467 // Ensure it also inserted into group id based table to
468 // avoid any chances of duplication in group id generation
469 getGroupIdTable(groupDesc.deviceId()).
470 put(matchingExtraneousGroup.id(), group);
471 addOrUpdateGroupEntry(matchingExtraneousGroup);
472 removeExtraneousGroupEntry(matchingExtraneousGroup);
473 return;
474 } else {
475 //Group buckets are not matching. Update group
476 //with user provided buckets.
477 //TODO
478 log.debug("storeGroupDescriptionInternal: Buckets are not matching in Device {} for group id {}",
479 groupDesc.deviceId(),
480 groupDesc.givenGroupId());
481 }
482 }
483 } else {
484 //Check if there is an extraneous group with user provided buckets
485 matchingExtraneousGroup = getMatchingExtraneousGroupbyBuckets(
486 groupDesc.deviceId(), groupDesc.buckets());
487 if (matchingExtraneousGroup != null) {
488 //Group is already existing with the same buckets.
489 //So reuse this group.
490 log.debug("storeGroupDescriptionInternal: Matching extraneous group found in Device {}",
491 groupDesc.deviceId());
492 //Create a group entry object
493 StoredGroupEntry group = new DefaultGroup(
494 matchingExtraneousGroup.id(), groupDesc);
495 // Insert the newly created group entry into key and id maps
496 getGroupStoreKeyMap().
497 put(new GroupStoreKeyMapKey(groupDesc.deviceId(),
498 groupDesc.appCookie()), group);
499 // Ensure it also inserted into group id based table to
500 // avoid any chances of duplication in group id generation
501 getGroupIdTable(groupDesc.deviceId()).
502 put(matchingExtraneousGroup.id(), group);
503 addOrUpdateGroupEntry(matchingExtraneousGroup);
504 removeExtraneousGroupEntry(matchingExtraneousGroup);
505 return;
506 } else {
507 //TODO: Check if there are any empty groups that can be used here
508 log.debug("storeGroupDescriptionInternal: No matching extraneous groups found in Device {}",
509 groupDesc.deviceId());
510 }
511 }
512
Saurav Das100e3b82015-04-30 11:12:10 -0700513 GroupId id = null;
514 if (groupDesc.givenGroupId() == null) {
515 // Get a new group identifier
516 id = new DefaultGroupId(getFreeGroupIdValue(groupDesc.deviceId()));
517 } else {
518 id = new DefaultGroupId(groupDesc.givenGroupId());
519 }
alshabib10580802015-02-18 18:30:33 -0800520 // Create a group entry object
521 StoredGroupEntry group = new DefaultGroup(id, groupDesc);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700522 // Insert the newly created group entry into key and id maps
523 getGroupStoreKeyMap().
524 put(new GroupStoreKeyMapKey(groupDesc.deviceId(),
525 groupDesc.appCookie()), group);
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700526 // Ensure it also inserted into group id based table to
527 // avoid any chances of duplication in group id generation
528 getGroupIdTable(groupDesc.deviceId()).
529 put(id, group);
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700530 log.debug("storeGroupDescriptionInternal: Processing Group ADD request for Id {} in device {}",
531 id,
532 groupDesc.deviceId());
alshabib10580802015-02-18 18:30:33 -0800533 notifyDelegate(new GroupEvent(GroupEvent.Type.GROUP_ADD_REQUESTED,
534 group));
535 }
536
537 /**
538 * Updates the existing group entry with the information
539 * from group description.
540 *
541 * @param deviceId the device ID
542 * @param oldAppCookie the current group key
543 * @param type update type
544 * @param newBuckets group buckets for updates
545 * @param newAppCookie optional new group key
546 */
547 @Override
548 public void updateGroupDescription(DeviceId deviceId,
549 GroupKey oldAppCookie,
550 UpdateType type,
551 GroupBuckets newBuckets,
552 GroupKey newAppCookie) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700553 // Check if group update to be done by a remote instance
sangho52abe3a2015-05-05 14:13:34 -0700554 if (mastershipService.getMasterFor(deviceId) != null &&
555 mastershipService.getLocalRole(deviceId) != MastershipRole.MASTER) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700556 log.debug("updateGroupDescription: Device {} local role is not MASTER",
557 deviceId);
558 if (mastershipService.getMasterFor(deviceId) == null) {
559 log.error("No Master for device {}..."
560 + "Can not perform update group operation",
561 deviceId);
562 //TODO: Send Group operation failure event
563 return;
564 }
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700565 GroupStoreMessage groupOp = GroupStoreMessage.
566 createGroupUpdateRequestMsg(deviceId,
567 oldAppCookie,
568 type,
569 newBuckets,
570 newAppCookie);
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700571
Madan Jampani175e8fd2015-05-20 14:10:45 -0700572 clusterCommunicator.unicast(groupOp,
573 GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST,
HIGUCHI Yuta180d70f2015-10-01 16:13:56 -0700574 clusterMsgSerializer::serialize,
Madan Jampani175e8fd2015-05-20 14:10:45 -0700575 mastershipService.getMasterFor(deviceId)).whenComplete((result, error) -> {
576 if (error != null) {
577 log.warn("Failed to send request to master: {} to {}",
578 groupOp,
579 mastershipService.getMasterFor(deviceId), error);
580 }
581 //TODO: Send Group operation failure event
582 });
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700583 return;
584 }
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700585 log.debug("updateGroupDescription for device {} is getting handled locally",
586 deviceId);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700587 updateGroupDescriptionInternal(deviceId,
588 oldAppCookie,
589 type,
590 newBuckets,
591 newAppCookie);
592 }
593
594 private void updateGroupDescriptionInternal(DeviceId deviceId,
595 GroupKey oldAppCookie,
596 UpdateType type,
597 GroupBuckets newBuckets,
598 GroupKey newAppCookie) {
alshabib10580802015-02-18 18:30:33 -0800599 // Check if a group is existing with the provided key
600 Group oldGroup = getGroup(deviceId, oldAppCookie);
601 if (oldGroup == null) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700602 log.warn("updateGroupDescriptionInternal: Group not found...strange");
alshabib10580802015-02-18 18:30:33 -0800603 return;
604 }
605
606 List<GroupBucket> newBucketList = getUpdatedBucketList(oldGroup,
607 type,
608 newBuckets);
609 if (newBucketList != null) {
610 // Create a new group object from the old group
611 GroupBuckets updatedBuckets = new GroupBuckets(newBucketList);
612 GroupKey newCookie = (newAppCookie != null) ? newAppCookie : oldAppCookie;
613 GroupDescription updatedGroupDesc = new DefaultGroupDescription(
614 oldGroup.deviceId(),
615 oldGroup.type(),
616 updatedBuckets,
617 newCookie,
Saurav Das100e3b82015-04-30 11:12:10 -0700618 oldGroup.givenGroupId(),
alshabib10580802015-02-18 18:30:33 -0800619 oldGroup.appId());
620 StoredGroupEntry newGroup = new DefaultGroup(oldGroup.id(),
621 updatedGroupDesc);
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700622 log.debug("updateGroupDescriptionInternal: group entry {} in device {} moving from {} to PENDING_UPDATE",
623 oldGroup.id(),
624 oldGroup.deviceId(),
625 oldGroup.state());
alshabib10580802015-02-18 18:30:33 -0800626 newGroup.setState(GroupState.PENDING_UPDATE);
627 newGroup.setLife(oldGroup.life());
628 newGroup.setPackets(oldGroup.packets());
629 newGroup.setBytes(oldGroup.bytes());
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700630 //Update the group entry in groupkey based map.
631 //Update to groupid based map will happen in the
632 //groupkey based map update listener
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700633 log.debug("updateGroupDescriptionInternal with type {}: Group updated with buckets",
634 type);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700635 getGroupStoreKeyMap().
636 put(new GroupStoreKeyMapKey(newGroup.deviceId(),
637 newGroup.appCookie()), newGroup);
alshabib10580802015-02-18 18:30:33 -0800638 notifyDelegate(new GroupEvent(Type.GROUP_UPDATE_REQUESTED, newGroup));
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700639 } else {
640 log.warn("updateGroupDescriptionInternal with type {}: No "
641 + "change in the buckets in update", type);
alshabib10580802015-02-18 18:30:33 -0800642 }
643 }
644
645 private List<GroupBucket> getUpdatedBucketList(Group oldGroup,
646 UpdateType type,
647 GroupBuckets buckets) {
648 GroupBuckets oldBuckets = oldGroup.buckets();
Sho SHIMIZU7a4087b2015-09-10 09:23:16 -0700649 List<GroupBucket> newBucketList = new ArrayList<>(oldBuckets.buckets());
alshabib10580802015-02-18 18:30:33 -0800650 boolean groupDescUpdated = false;
651
652 if (type == UpdateType.ADD) {
653 // Check if the any of the new buckets are part of
654 // the old bucket list
655 for (GroupBucket addBucket:buckets.buckets()) {
656 if (!newBucketList.contains(addBucket)) {
657 newBucketList.add(addBucket);
658 groupDescUpdated = true;
659 }
660 }
661 } else if (type == UpdateType.REMOVE) {
662 // Check if the to be removed buckets are part of the
663 // old bucket list
664 for (GroupBucket removeBucket:buckets.buckets()) {
665 if (newBucketList.contains(removeBucket)) {
666 newBucketList.remove(removeBucket);
667 groupDescUpdated = true;
668 }
669 }
670 }
671
672 if (groupDescUpdated) {
673 return newBucketList;
674 } else {
675 return null;
676 }
677 }
678
679 /**
680 * Triggers deleting the existing group entry.
681 *
682 * @param deviceId the device ID
683 * @param appCookie the group key
684 */
685 @Override
686 public void deleteGroupDescription(DeviceId deviceId,
687 GroupKey appCookie) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700688 // Check if group to be deleted by a remote instance
689 if (mastershipService.
690 getLocalRole(deviceId) != MastershipRole.MASTER) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700691 log.debug("deleteGroupDescription: Device {} local role is not MASTER",
692 deviceId);
693 if (mastershipService.getMasterFor(deviceId) == null) {
694 log.error("No Master for device {}..."
695 + "Can not perform delete group operation",
696 deviceId);
697 //TODO: Send Group operation failure event
698 return;
699 }
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700700 GroupStoreMessage groupOp = GroupStoreMessage.
701 createGroupDeleteRequestMsg(deviceId,
702 appCookie);
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700703
Madan Jampani175e8fd2015-05-20 14:10:45 -0700704 clusterCommunicator.unicast(groupOp,
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700705 GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST,
HIGUCHI Yuta180d70f2015-10-01 16:13:56 -0700706 clusterMsgSerializer::serialize,
Madan Jampani175e8fd2015-05-20 14:10:45 -0700707 mastershipService.getMasterFor(deviceId)).whenComplete((result, error) -> {
708 if (error != null) {
709 log.warn("Failed to send request to master: {} to {}",
710 groupOp,
711 mastershipService.getMasterFor(deviceId), error);
712 }
713 //TODO: Send Group operation failure event
714 });
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700715 return;
716 }
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700717 log.debug("deleteGroupDescription in device {} is getting handled locally",
718 deviceId);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700719 deleteGroupDescriptionInternal(deviceId, appCookie);
720 }
721
722 private void deleteGroupDescriptionInternal(DeviceId deviceId,
723 GroupKey appCookie) {
alshabib10580802015-02-18 18:30:33 -0800724 // Check if a group is existing with the provided key
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700725 StoredGroupEntry existing = getStoredGroupEntry(deviceId, appCookie);
alshabib10580802015-02-18 18:30:33 -0800726 if (existing == null) {
727 return;
728 }
729
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700730 log.debug("deleteGroupDescriptionInternal: group entry {} in device {} moving from {} to PENDING_DELETE",
731 existing.id(),
732 existing.deviceId(),
733 existing.state());
alshabib10580802015-02-18 18:30:33 -0800734 synchronized (existing) {
735 existing.setState(GroupState.PENDING_DELETE);
736 }
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700737 log.debug("deleteGroupDescriptionInternal: in device {} issuing GROUP_REMOVE_REQUESTED",
738 deviceId);
alshabib10580802015-02-18 18:30:33 -0800739 notifyDelegate(new GroupEvent(Type.GROUP_REMOVE_REQUESTED, existing));
740 }
741
742 /**
743 * Stores a new group entry, or updates an existing entry.
744 *
745 * @param group group entry
746 */
747 @Override
748 public void addOrUpdateGroupEntry(Group group) {
749 // check if this new entry is an update to an existing entry
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700750 StoredGroupEntry existing = getStoredGroupEntry(group.deviceId(),
751 group.id());
alshabib10580802015-02-18 18:30:33 -0800752 GroupEvent event = null;
753
754 if (existing != null) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700755 log.debug("addOrUpdateGroupEntry: updating group entry {} in device {}",
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700756 group.id(),
757 group.deviceId());
alshabib10580802015-02-18 18:30:33 -0800758 synchronized (existing) {
Srikanth Vavilapalli10e75cd2015-04-13 16:21:24 -0700759 for (GroupBucket bucket:group.buckets().buckets()) {
Sho SHIMIZU30d639b2015-05-05 09:30:35 -0700760 Optional<GroupBucket> matchingBucket =
Srikanth Vavilapalli10e75cd2015-04-13 16:21:24 -0700761 existing.buckets().buckets()
762 .stream()
763 .filter((existingBucket)->(existingBucket.equals(bucket)))
764 .findFirst();
765 if (matchingBucket.isPresent()) {
766 ((StoredGroupBucketEntry) matchingBucket.
767 get()).setPackets(bucket.packets());
768 ((StoredGroupBucketEntry) matchingBucket.
769 get()).setBytes(bucket.bytes());
770 } else {
771 log.warn("addOrUpdateGroupEntry: No matching "
772 + "buckets to update stats");
773 }
774 }
alshabib10580802015-02-18 18:30:33 -0800775 existing.setLife(group.life());
776 existing.setPackets(group.packets());
777 existing.setBytes(group.bytes());
Srikanth Vavilapalli5428b6c2015-05-14 20:22:47 -0700778 if ((existing.state() == GroupState.PENDING_ADD) ||
779 (existing.state() == GroupState.PENDING_ADD_RETRY)) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700780 log.debug("addOrUpdateGroupEntry: group entry {} in device {} moving from {} to ADDED",
781 existing.id(),
782 existing.deviceId(),
Srikanth Vavilapalli5428b6c2015-05-14 20:22:47 -0700783 existing.state());
alshabib10580802015-02-18 18:30:33 -0800784 existing.setState(GroupState.ADDED);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700785 existing.setIsGroupStateAddedFirstTime(true);
alshabib10580802015-02-18 18:30:33 -0800786 event = new GroupEvent(Type.GROUP_ADDED, existing);
787 } else {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700788 log.debug("addOrUpdateGroupEntry: group entry {} in device {} moving from {} to ADDED",
789 existing.id(),
790 existing.deviceId(),
791 GroupState.PENDING_UPDATE);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700792 existing.setState(GroupState.ADDED);
793 existing.setIsGroupStateAddedFirstTime(false);
alshabib10580802015-02-18 18:30:33 -0800794 event = new GroupEvent(Type.GROUP_UPDATED, existing);
795 }
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700796 //Re-PUT map entries to trigger map update events
797 getGroupStoreKeyMap().
798 put(new GroupStoreKeyMapKey(existing.deviceId(),
799 existing.appCookie()), existing);
alshabib10580802015-02-18 18:30:33 -0800800 }
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700801 } else {
802 log.warn("addOrUpdateGroupEntry: Group update "
803 + "happening for a non-existing entry in the map");
alshabib10580802015-02-18 18:30:33 -0800804 }
805
806 if (event != null) {
807 notifyDelegate(event);
808 }
809 }
810
811 /**
812 * Removes the group entry from store.
813 *
814 * @param group group entry
815 */
816 @Override
817 public void removeGroupEntry(Group group) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700818 StoredGroupEntry existing = getStoredGroupEntry(group.deviceId(),
819 group.id());
alshabib10580802015-02-18 18:30:33 -0800820
821 if (existing != null) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700822 log.debug("removeGroupEntry: removing group entry {} in device {}",
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700823 group.id(),
824 group.deviceId());
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700825 //Removal from groupid based map will happen in the
826 //map update listener
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700827 getGroupStoreKeyMap().remove(new GroupStoreKeyMapKey(existing.deviceId(),
828 existing.appCookie()));
alshabib10580802015-02-18 18:30:33 -0800829 notifyDelegate(new GroupEvent(Type.GROUP_REMOVED, existing));
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700830 } else {
831 log.warn("removeGroupEntry for {} in device{} is "
832 + "not existing in our maps",
833 group.id(),
834 group.deviceId());
alshabib10580802015-02-18 18:30:33 -0800835 }
836 }
837
838 @Override
839 public void deviceInitialAuditCompleted(DeviceId deviceId,
840 boolean completed) {
841 synchronized (deviceAuditStatus) {
842 if (completed) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700843 log.debug("AUDIT completed for device {}",
844 deviceId);
alshabib10580802015-02-18 18:30:33 -0800845 deviceAuditStatus.put(deviceId, true);
846 // Execute all pending group requests
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700847 List<StoredGroupEntry> pendingGroupRequests =
848 getPendingGroupKeyTable().values()
849 .stream()
850 .filter(g-> g.deviceId().equals(deviceId))
851 .collect(Collectors.toList());
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700852 log.debug("processing pending group add requests for device {} and number of pending requests {}",
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700853 deviceId,
854 pendingGroupRequests.size());
855 for (Group group:pendingGroupRequests) {
alshabib10580802015-02-18 18:30:33 -0800856 GroupDescription tmp = new DefaultGroupDescription(
857 group.deviceId(),
858 group.type(),
859 group.buckets(),
860 group.appCookie(),
Saurav Das100e3b82015-04-30 11:12:10 -0700861 group.givenGroupId(),
alshabib10580802015-02-18 18:30:33 -0800862 group.appId());
863 storeGroupDescriptionInternal(tmp);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700864 getPendingGroupKeyTable().
865 remove(new GroupStoreKeyMapKey(deviceId, group.appCookie()));
alshabib10580802015-02-18 18:30:33 -0800866 }
alshabib10580802015-02-18 18:30:33 -0800867 } else {
Thomas Vachuskac40d4632015-04-09 16:55:03 -0700868 Boolean audited = deviceAuditStatus.get(deviceId);
869 if (audited != null && audited) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700870 log.debug("Clearing AUDIT status for device {}", deviceId);
alshabib10580802015-02-18 18:30:33 -0800871 deviceAuditStatus.put(deviceId, false);
872 }
873 }
874 }
875 }
876
877 @Override
878 public boolean deviceInitialAuditStatus(DeviceId deviceId) {
879 synchronized (deviceAuditStatus) {
Thomas Vachuskac40d4632015-04-09 16:55:03 -0700880 Boolean audited = deviceAuditStatus.get(deviceId);
881 return audited != null && audited;
alshabib10580802015-02-18 18:30:33 -0800882 }
883 }
884
885 @Override
886 public void groupOperationFailed(DeviceId deviceId, GroupOperation operation) {
887
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700888 StoredGroupEntry existing = getStoredGroupEntry(deviceId,
889 operation.groupId());
alshabib10580802015-02-18 18:30:33 -0800890
891 if (existing == null) {
892 log.warn("No group entry with ID {} found ", operation.groupId());
893 return;
894 }
895
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700896 log.warn("groupOperationFailed: group operation {} failed"
897 + "for group {} in device {}",
898 operation.opType(),
899 existing.id(),
900 existing.deviceId());
alshabib10580802015-02-18 18:30:33 -0800901 switch (operation.opType()) {
902 case ADD:
Srikanth Vavilapalli5428b6c2015-05-14 20:22:47 -0700903 if (existing.state() == GroupState.PENDING_ADD) {
904 //TODO: Need to add support for passing the group
905 //operation failure reason from group provider.
906 //If the error type is anything other than GROUP_EXISTS,
907 //then the GROUP_ADD_FAILED event should be raised even
908 //in PENDING_ADD_RETRY state also.
909 notifyDelegate(new GroupEvent(Type.GROUP_ADD_FAILED, existing));
910 log.warn("groupOperationFailed: cleaningup "
911 + "group {} from store in device {}....",
912 existing.id(),
913 existing.deviceId());
914 //Removal from groupid based map will happen in the
915 //map update listener
916 getGroupStoreKeyMap().remove(new GroupStoreKeyMapKey(existing.deviceId(),
917 existing.appCookie()));
918 }
alshabib10580802015-02-18 18:30:33 -0800919 break;
920 case MODIFY:
921 notifyDelegate(new GroupEvent(Type.GROUP_UPDATE_FAILED, existing));
922 break;
923 case DELETE:
924 notifyDelegate(new GroupEvent(Type.GROUP_REMOVE_FAILED, existing));
925 break;
926 default:
927 log.warn("Unknown group operation type {}", operation.opType());
928 }
alshabib10580802015-02-18 18:30:33 -0800929 }
930
931 @Override
932 public void addOrUpdateExtraneousGroupEntry(Group group) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700933 log.debug("add/update extraneous group entry {} in device {}",
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700934 group.id(),
935 group.deviceId());
alshabib10580802015-02-18 18:30:33 -0800936 ConcurrentMap<GroupId, Group> extraneousIdTable =
937 getExtraneousGroupIdTable(group.deviceId());
938 extraneousIdTable.put(group.id(), group);
Srikanth Vavilapallie48b3cf2015-07-06 11:43:07 -0700939 // Don't remove the extraneous groups, instead re-use it when
940 // a group request comes with the same set of buckets
alshabib10580802015-02-18 18:30:33 -0800941 }
942
943 @Override
944 public void removeExtraneousGroupEntry(Group group) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700945 log.debug("remove extraneous group entry {} of device {} from store",
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700946 group.id(),
947 group.deviceId());
alshabib10580802015-02-18 18:30:33 -0800948 ConcurrentMap<GroupId, Group> extraneousIdTable =
949 getExtraneousGroupIdTable(group.deviceId());
950 extraneousIdTable.remove(group.id());
951 }
952
953 @Override
954 public Iterable<Group> getExtraneousGroups(DeviceId deviceId) {
955 // flatten and make iterator unmodifiable
956 return FluentIterable.from(
957 getExtraneousGroupIdTable(deviceId).values());
958 }
959
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700960 /**
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700961 * Map handler to receive any events when the group key map is updated.
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700962 */
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700963 private class GroupStoreKeyMapListener implements
964 EventuallyConsistentMapListener<GroupStoreKeyMapKey, StoredGroupEntry> {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700965
966 @Override
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700967 public void event(EventuallyConsistentMapEvent<GroupStoreKeyMapKey,
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700968 StoredGroupEntry> mapEvent) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700969 GroupEvent groupEvent = null;
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700970 GroupStoreKeyMapKey key = mapEvent.key();
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700971 StoredGroupEntry group = mapEvent.value();
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700972 if ((key == null) && (group == null)) {
973 log.error("GroupStoreKeyMapListener: Received "
974 + "event {} with null entry", mapEvent.type());
975 return;
976 } else if (group == null) {
977 group = getGroupIdTable(key.deviceId()).values()
978 .stream()
979 .filter((storedGroup) -> (storedGroup.appCookie().equals(key.appCookie)))
980 .findFirst().get();
981 if (group == null) {
982 log.error("GroupStoreKeyMapListener: Received "
983 + "event {} with null entry... can not process", mapEvent.type());
984 return;
985 }
986 }
987 log.trace("received groupid map event {} for id {} in device {}",
988 mapEvent.type(),
989 group.id(),
990 key.deviceId());
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700991 if (mapEvent.type() == EventuallyConsistentMapEvent.Type.PUT) {
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700992 // Update the group ID table
993 getGroupIdTable(group.deviceId()).put(group.id(), group);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700994 if (mapEvent.value().state() == Group.GroupState.ADDED) {
995 if (mapEvent.value().isGroupStateAddedFirstTime()) {
996 groupEvent = new GroupEvent(Type.GROUP_ADDED,
997 mapEvent.value());
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700998 log.trace("Received first time GROUP_ADDED state update for id {} in device {}",
999 group.id(),
1000 group.deviceId());
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001001 } else {
1002 groupEvent = new GroupEvent(Type.GROUP_UPDATED,
1003 mapEvent.value());
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001004 log.trace("Received following GROUP_ADDED state update for id {} in device {}",
1005 group.id(),
1006 group.deviceId());
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001007 }
1008 }
1009 } else if (mapEvent.type() == EventuallyConsistentMapEvent.Type.REMOVE) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001010 groupEvent = new GroupEvent(Type.GROUP_REMOVED, group);
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -07001011 // Remove the entry from the group ID table
1012 getGroupIdTable(group.deviceId()).remove(group.id(), group);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001013 }
1014
1015 if (groupEvent != null) {
1016 notifyDelegate(groupEvent);
1017 }
1018 }
1019 }
Madan Jampani01e05fb2015-08-13 13:29:36 -07001020
1021 private void process(GroupStoreMessage groupOp) {
1022 log.debug("Received remote group operation {} request for device {}",
1023 groupOp.type(),
1024 groupOp.deviceId());
1025 if (!mastershipService.isLocalMaster(groupOp.deviceId())) {
1026 log.warn("This node is not MASTER for device {}", groupOp.deviceId());
1027 return;
1028 }
1029 if (groupOp.type() == GroupStoreMessage.Type.ADD) {
1030 storeGroupDescriptionInternal(groupOp.groupDesc());
1031 } else if (groupOp.type() == GroupStoreMessage.Type.UPDATE) {
1032 updateGroupDescriptionInternal(groupOp.deviceId(),
1033 groupOp.appCookie(),
1034 groupOp.updateType(),
1035 groupOp.updateBuckets(),
1036 groupOp.newAppCookie());
1037 } else if (groupOp.type() == GroupStoreMessage.Type.DELETE) {
1038 deleteGroupDescriptionInternal(groupOp.deviceId(),
1039 groupOp.appCookie());
1040 }
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001041 }
1042
1043 /**
1044 * Flattened map key to be used to store group entries.
1045 */
Ray Milkeyb3c5ce22015-08-10 09:07:36 -07001046 protected static class GroupStoreMapKey {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001047 private final DeviceId deviceId;
1048
1049 public GroupStoreMapKey(DeviceId deviceId) {
1050 this.deviceId = deviceId;
1051 }
1052
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001053 public DeviceId deviceId() {
1054 return deviceId;
1055 }
1056
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001057 @Override
1058 public boolean equals(Object o) {
1059 if (this == o) {
1060 return true;
1061 }
1062 if (!(o instanceof GroupStoreMapKey)) {
1063 return false;
1064 }
1065 GroupStoreMapKey that = (GroupStoreMapKey) o;
1066 return this.deviceId.equals(that.deviceId);
1067 }
1068
1069 @Override
1070 public int hashCode() {
1071 int result = 17;
1072
1073 result = 31 * result + Objects.hash(this.deviceId);
1074
1075 return result;
1076 }
1077 }
1078
Ray Milkeyb3c5ce22015-08-10 09:07:36 -07001079 protected static class GroupStoreKeyMapKey extends GroupStoreMapKey {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001080 private final GroupKey appCookie;
1081 public GroupStoreKeyMapKey(DeviceId deviceId,
1082 GroupKey appCookie) {
1083 super(deviceId);
1084 this.appCookie = appCookie;
1085 }
1086
1087 @Override
1088 public boolean equals(Object o) {
1089 if (this == o) {
1090 return true;
1091 }
1092 if (!(o instanceof GroupStoreKeyMapKey)) {
1093 return false;
1094 }
1095 GroupStoreKeyMapKey that = (GroupStoreKeyMapKey) o;
1096 return (super.equals(that) &&
1097 this.appCookie.equals(that.appCookie));
1098 }
1099
1100 @Override
1101 public int hashCode() {
1102 int result = 17;
1103
1104 result = 31 * result + super.hashCode() + Objects.hash(this.appCookie);
1105
1106 return result;
1107 }
1108 }
1109
Ray Milkeyb3c5ce22015-08-10 09:07:36 -07001110 protected static class GroupStoreIdMapKey extends GroupStoreMapKey {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001111 private final GroupId groupId;
1112 public GroupStoreIdMapKey(DeviceId deviceId,
1113 GroupId groupId) {
1114 super(deviceId);
1115 this.groupId = groupId;
1116 }
1117
1118 @Override
1119 public boolean equals(Object o) {
1120 if (this == o) {
1121 return true;
1122 }
1123 if (!(o instanceof GroupStoreIdMapKey)) {
1124 return false;
1125 }
1126 GroupStoreIdMapKey that = (GroupStoreIdMapKey) o;
1127 return (super.equals(that) &&
1128 this.groupId.equals(that.groupId));
1129 }
1130
1131 @Override
1132 public int hashCode() {
1133 int result = 17;
1134
1135 result = 31 * result + super.hashCode() + Objects.hash(this.groupId);
1136
1137 return result;
1138 }
1139 }
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001140
1141 @Override
1142 public void pushGroupMetrics(DeviceId deviceId,
1143 Collection<Group> groupEntries) {
1144 boolean deviceInitialAuditStatus =
1145 deviceInitialAuditStatus(deviceId);
1146 Set<Group> southboundGroupEntries =
1147 Sets.newHashSet(groupEntries);
1148 Set<StoredGroupEntry> storedGroupEntries =
1149 Sets.newHashSet(getStoredGroups(deviceId));
1150 Set<Group> extraneousStoredEntries =
1151 Sets.newHashSet(getExtraneousGroups(deviceId));
1152
1153 log.trace("pushGroupMetrics: Displaying all ({}) southboundGroupEntries for device {}",
1154 southboundGroupEntries.size(),
1155 deviceId);
1156 for (Iterator<Group> it = southboundGroupEntries.iterator(); it.hasNext();) {
1157 Group group = it.next();
1158 log.trace("Group {} in device {}", group, deviceId);
1159 }
1160
1161 log.trace("Displaying all ({}) stored group entries for device {}",
1162 storedGroupEntries.size(),
1163 deviceId);
1164 for (Iterator<StoredGroupEntry> it1 = storedGroupEntries.iterator();
1165 it1.hasNext();) {
1166 Group group = it1.next();
1167 log.trace("Stored Group {} for device {}", group, deviceId);
1168 }
1169
1170 for (Iterator<Group> it2 = southboundGroupEntries.iterator(); it2.hasNext();) {
1171 Group group = it2.next();
1172 if (storedGroupEntries.remove(group)) {
1173 // we both have the group, let's update some info then.
1174 log.trace("Group AUDIT: group {} exists in both planes for device {}",
1175 group.id(), deviceId);
1176 groupAdded(group);
1177 it2.remove();
1178 }
1179 }
1180 for (Group group : southboundGroupEntries) {
1181 if (getGroup(group.deviceId(), group.id()) != null) {
1182 // There is a group existing with the same id
1183 // It is possible that group update is
1184 // in progress while we got a stale info from switch
1185 if (!storedGroupEntries.remove(getGroup(
1186 group.deviceId(), group.id()))) {
1187 log.warn("Group AUDIT: Inconsistent state:"
1188 + "Group exists in ID based table while "
1189 + "not present in key based table");
1190 }
1191 } else {
1192 // there are groups in the switch that aren't in the store
1193 log.debug("Group AUDIT: extraneous group {} exists in data plane for device {}",
1194 group.id(), deviceId);
1195 extraneousStoredEntries.remove(group);
1196 extraneousGroup(group);
1197 }
1198 }
1199 for (Group group : storedGroupEntries) {
1200 // there are groups in the store that aren't in the switch
1201 log.debug("Group AUDIT: group {} missing in data plane for device {}",
1202 group.id(), deviceId);
1203 groupMissing(group);
1204 }
1205 for (Group group : extraneousStoredEntries) {
1206 // there are groups in the extraneous store that
1207 // aren't in the switch
1208 log.debug("Group AUDIT: clearing extransoeus group {} from store for device {}",
1209 group.id(), deviceId);
1210 removeExtraneousGroupEntry(group);
1211 }
1212
1213 if (!deviceInitialAuditStatus) {
1214 log.debug("Group AUDIT: Setting device {} initial AUDIT completed",
1215 deviceId);
1216 deviceInitialAuditCompleted(deviceId, true);
1217 }
1218 }
1219
1220 private void groupMissing(Group group) {
1221 switch (group.state()) {
1222 case PENDING_DELETE:
1223 log.debug("Group {} delete confirmation from device {}",
1224 group, group.deviceId());
1225 removeGroupEntry(group);
1226 break;
1227 case ADDED:
1228 case PENDING_ADD:
Srikanth Vavilapalli5428b6c2015-05-14 20:22:47 -07001229 case PENDING_ADD_RETRY:
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001230 case PENDING_UPDATE:
1231 log.debug("Group {} is in store but not on device {}",
1232 group, group.deviceId());
1233 StoredGroupEntry existing =
1234 getStoredGroupEntry(group.deviceId(), group.id());
Srikanth Vavilapalli5428b6c2015-05-14 20:22:47 -07001235 log.debug("groupMissing: group entry {} in device {} moving from {} to PENDING_ADD_RETRY",
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001236 existing.id(),
1237 existing.deviceId(),
1238 existing.state());
Srikanth Vavilapalli5428b6c2015-05-14 20:22:47 -07001239 existing.setState(Group.GroupState.PENDING_ADD_RETRY);
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001240 //Re-PUT map entries to trigger map update events
1241 getGroupStoreKeyMap().
1242 put(new GroupStoreKeyMapKey(existing.deviceId(),
1243 existing.appCookie()), existing);
1244 notifyDelegate(new GroupEvent(GroupEvent.Type.GROUP_ADD_REQUESTED,
1245 group));
1246 break;
1247 default:
1248 log.debug("Group {} has not been installed.", group);
1249 break;
1250 }
1251 }
1252
1253 private void extraneousGroup(Group group) {
1254 log.debug("Group {} is on device {} but not in store.",
1255 group, group.deviceId());
1256 addOrUpdateExtraneousGroupEntry(group);
1257 }
1258
1259 private void groupAdded(Group group) {
1260 log.trace("Group {} Added or Updated in device {}",
1261 group, group.deviceId());
1262 addOrUpdateGroupEntry(group);
1263 }
alshabib10580802015-02-18 18:30:33 -08001264}