blob: 4f4c06faaa75e5d468d963e16a2844cb9c51bde3 [file] [log] [blame]
alshabib10580802015-02-18 18:30:33 -08001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.group.impl;
17
Jonathan Hart6ec029a2015-03-24 17:12:35 -070018import com.google.common.collect.FluentIterable;
Charles Chanf4838a72015-12-07 18:13:45 -080019import com.google.common.collect.ImmutableSet;
Jonathan Hart6ec029a2015-03-24 17:12:35 -070020import com.google.common.collect.Iterables;
Srikanth Vavilapalli23181912015-05-04 09:48:09 -070021import com.google.common.collect.Sets;
Srikanth Vavilapalli10e75cd2015-04-13 16:21:24 -070022
alshabib10580802015-02-18 18:30:33 -080023import org.apache.felix.scr.annotations.Activate;
24import org.apache.felix.scr.annotations.Component;
25import org.apache.felix.scr.annotations.Deactivate;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070026import org.apache.felix.scr.annotations.Reference;
27import org.apache.felix.scr.annotations.ReferenceCardinality;
alshabib10580802015-02-18 18:30:33 -080028import org.apache.felix.scr.annotations.Service;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070029import org.onlab.util.KryoNamespace;
alshabib10580802015-02-18 18:30:33 -080030import org.onlab.util.NewConcurrentHashMap;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070031import org.onosproject.cluster.ClusterService;
Charles Chanf4838a72015-12-07 18:13:45 -080032import org.onosproject.cluster.NodeId;
alshabib10580802015-02-18 18:30:33 -080033import org.onosproject.core.DefaultGroupId;
34import org.onosproject.core.GroupId;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070035import org.onosproject.mastership.MastershipService;
alshabib10580802015-02-18 18:30:33 -080036import org.onosproject.net.DeviceId;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070037import org.onosproject.net.MastershipRole;
alshabib10580802015-02-18 18:30:33 -080038import org.onosproject.net.group.DefaultGroup;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070039import org.onosproject.net.group.DefaultGroupBucket;
alshabib10580802015-02-18 18:30:33 -080040import org.onosproject.net.group.DefaultGroupDescription;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070041import org.onosproject.net.group.DefaultGroupKey;
alshabib10580802015-02-18 18:30:33 -080042import org.onosproject.net.group.Group;
43import org.onosproject.net.group.Group.GroupState;
44import org.onosproject.net.group.GroupBucket;
45import org.onosproject.net.group.GroupBuckets;
46import org.onosproject.net.group.GroupDescription;
47import org.onosproject.net.group.GroupEvent;
48import org.onosproject.net.group.GroupEvent.Type;
49import org.onosproject.net.group.GroupKey;
50import org.onosproject.net.group.GroupOperation;
51import org.onosproject.net.group.GroupStore;
52import org.onosproject.net.group.GroupStoreDelegate;
Srikanth Vavilapalli10e75cd2015-04-13 16:21:24 -070053import org.onosproject.net.group.StoredGroupBucketEntry;
alshabib10580802015-02-18 18:30:33 -080054import org.onosproject.net.group.StoredGroupEntry;
55import org.onosproject.store.AbstractStore;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070056import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
Jonathan Hart63939a32015-05-08 11:57:03 -070057import org.onosproject.store.service.MultiValuedTimestamp;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070058import org.onosproject.store.serializers.KryoNamespaces;
Jonathan Hart6ec029a2015-03-24 17:12:35 -070059import org.onosproject.store.service.EventuallyConsistentMap;
60import org.onosproject.store.service.EventuallyConsistentMapBuilder;
61import org.onosproject.store.service.EventuallyConsistentMapEvent;
62import org.onosproject.store.service.EventuallyConsistentMapListener;
63import org.onosproject.store.service.StorageService;
alshabib10580802015-02-18 18:30:33 -080064import org.slf4j.Logger;
65
Jonathan Hart6ec029a2015-03-24 17:12:35 -070066import java.util.ArrayList;
Srikanth Vavilapalli23181912015-05-04 09:48:09 -070067import java.util.Collection;
Charles Chanf4838a72015-12-07 18:13:45 -080068import java.util.Collections;
Jonathan Hart6ec029a2015-03-24 17:12:35 -070069import java.util.HashMap;
Srikanth Vavilapalli23181912015-05-04 09:48:09 -070070import java.util.Iterator;
Jonathan Hart6ec029a2015-03-24 17:12:35 -070071import java.util.List;
72import java.util.Objects;
Sho SHIMIZU30d639b2015-05-05 09:30:35 -070073import java.util.Optional;
Srikanth Vavilapalli23181912015-05-04 09:48:09 -070074import java.util.Set;
Jonathan Hart6ec029a2015-03-24 17:12:35 -070075import java.util.concurrent.ConcurrentHashMap;
76import java.util.concurrent.ConcurrentMap;
77import java.util.concurrent.ExecutorService;
78import java.util.concurrent.Executors;
79import java.util.concurrent.atomic.AtomicInteger;
80import java.util.concurrent.atomic.AtomicLong;
81import java.util.stream.Collectors;
82
83import static org.apache.commons.lang3.concurrent.ConcurrentUtils.createIfAbsentUnchecked;
84import static org.onlab.util.Tools.groupedThreads;
85import static org.slf4j.LoggerFactory.getLogger;
alshabib10580802015-02-18 18:30:33 -080086
87/**
88 * Manages inventory of group entries using trivial in-memory implementation.
89 */
90@Component(immediate = true)
91@Service
92public class DistributedGroupStore
93 extends AbstractStore<GroupEvent, GroupStoreDelegate>
94 implements GroupStore {
95
96 private final Logger log = getLogger(getClass());
97
98 private final int dummyId = 0xffffffff;
99 private final GroupId dummyGroupId = new DefaultGroupId(dummyId);
100
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700101 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
102 protected ClusterCommunicationService clusterCommunicator;
103
104 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
105 protected ClusterService clusterService;
106
107 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700108 protected StorageService storageService;
109
110 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700111 protected MastershipService mastershipService;
112
113 // Per device group table with (device id + app cookie) as key
114 private EventuallyConsistentMap<GroupStoreKeyMapKey,
115 StoredGroupEntry> groupStoreEntriesByKey = null;
116 // Per device group table with (device id + group id) as key
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700117 private final ConcurrentMap<DeviceId, ConcurrentMap<GroupId, StoredGroupEntry>>
118 groupEntriesById = new ConcurrentHashMap<>();
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700119 private EventuallyConsistentMap<GroupStoreKeyMapKey,
120 StoredGroupEntry> auditPendingReqQueue = null;
alshabib10580802015-02-18 18:30:33 -0800121 private final ConcurrentMap<DeviceId, ConcurrentMap<GroupId, Group>>
122 extraneousGroupEntriesById = new ConcurrentHashMap<>();
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700123 private ExecutorService messageHandlingExecutor;
124 private static final int MESSAGE_HANDLER_THREAD_POOL_SIZE = 1;
alshabib10580802015-02-18 18:30:33 -0800125
Sho SHIMIZU7a4087b2015-09-10 09:23:16 -0700126 private final HashMap<DeviceId, Boolean> deviceAuditStatus = new HashMap<>();
alshabib10580802015-02-18 18:30:33 -0800127
128 private final AtomicInteger groupIdGen = new AtomicInteger();
129
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700130 private KryoNamespace.Builder kryoBuilder = null;
131
Madan Jampanibcf1a482015-06-24 19:05:56 -0700132 private final AtomicLong sequenceNumber = new AtomicLong(0);
133
HIGUCHI Yuta180d70f2015-10-01 16:13:56 -0700134 private KryoNamespace clusterMsgSerializer;
135
alshabib10580802015-02-18 18:30:33 -0800136 @Activate
137 public void activate() {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700138 kryoBuilder = new KryoNamespace.Builder()
Charles Chan138cd5a2015-09-29 16:57:41 -0700139 .register(KryoNamespaces.API)
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700140 .register(DefaultGroup.class,
141 DefaultGroupBucket.class,
142 DefaultGroupDescription.class,
143 DefaultGroupKey.class,
144 GroupDescription.Type.class,
145 Group.GroupState.class,
146 GroupBuckets.class,
147 DefaultGroupId.class,
148 GroupStoreMessage.class,
149 GroupStoreMessage.Type.class,
150 UpdateType.class,
151 GroupStoreMessageSubjects.class,
152 MultiValuedTimestamp.class,
153 GroupStoreKeyMapKey.class,
154 GroupStoreIdMapKey.class,
155 GroupStoreMapKey.class
Charles Chan138cd5a2015-09-29 16:57:41 -0700156 );
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700157
HIGUCHI Yuta180d70f2015-10-01 16:13:56 -0700158 clusterMsgSerializer = kryoBuilder.build();
159
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700160 messageHandlingExecutor = Executors.
161 newFixedThreadPool(MESSAGE_HANDLER_THREAD_POOL_SIZE,
162 groupedThreads("onos/store/group",
163 "message-handlers"));
Madan Jampani01e05fb2015-08-13 13:29:36 -0700164
165 clusterCommunicator.addSubscriber(GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST,
HIGUCHI Yuta180d70f2015-10-01 16:13:56 -0700166 clusterMsgSerializer::deserialize,
Madan Jampani01e05fb2015-08-13 13:29:36 -0700167 this::process,
168 messageHandlingExecutor);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700169
170 log.debug("Creating EC map groupstorekeymap");
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700171 EventuallyConsistentMapBuilder<GroupStoreKeyMapKey, StoredGroupEntry>
172 keyMapBuilder = storageService.eventuallyConsistentMapBuilder();
173
174 groupStoreEntriesByKey = keyMapBuilder
175 .withName("groupstorekeymap")
176 .withSerializer(kryoBuilder)
Madan Jampanibcf1a482015-06-24 19:05:56 -0700177 .withTimestampProvider((k, v) -> new MultiValuedTimestamp<>(System.currentTimeMillis(),
178 sequenceNumber.getAndIncrement()))
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700179 .build();
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700180 groupStoreEntriesByKey.addListener(new GroupStoreKeyMapListener());
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700181 log.debug("Current size of groupstorekeymap:{}",
182 groupStoreEntriesByKey.size());
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700183
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700184 log.debug("Creating EC map pendinggroupkeymap");
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700185 EventuallyConsistentMapBuilder<GroupStoreKeyMapKey, StoredGroupEntry>
186 auditMapBuilder = storageService.eventuallyConsistentMapBuilder();
187
188 auditPendingReqQueue = auditMapBuilder
189 .withName("pendinggroupkeymap")
190 .withSerializer(kryoBuilder)
Madan Jampanibcf1a482015-06-24 19:05:56 -0700191 .withTimestampProvider((k, v) -> new MultiValuedTimestamp<>(System.currentTimeMillis(),
192 sequenceNumber.getAndIncrement()))
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700193 .build();
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700194 log.debug("Current size of pendinggroupkeymap:{}",
195 auditPendingReqQueue.size());
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700196
alshabib10580802015-02-18 18:30:33 -0800197 log.info("Started");
198 }
199
200 @Deactivate
201 public void deactivate() {
HIGUCHI Yuta180d70f2015-10-01 16:13:56 -0700202 clusterCommunicator.removeSubscriber(GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST);
Thomas Vachuska152f9fd2015-04-02 16:28:13 -0700203 groupStoreEntriesByKey.destroy();
Thomas Vachuska152f9fd2015-04-02 16:28:13 -0700204 auditPendingReqQueue.destroy();
alshabib10580802015-02-18 18:30:33 -0800205 log.info("Stopped");
206 }
207
alshabib10580802015-02-18 18:30:33 -0800208 private static NewConcurrentHashMap<GroupId, Group>
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700209 lazyEmptyExtraneousGroupIdTable() {
alshabib10580802015-02-18 18:30:33 -0800210 return NewConcurrentHashMap.<GroupId, Group>ifNeeded();
211 }
212
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700213 private static NewConcurrentHashMap<GroupId, StoredGroupEntry>
214 lazyEmptyGroupIdTable() {
215 return NewConcurrentHashMap.<GroupId, StoredGroupEntry>ifNeeded();
216 }
217
alshabib10580802015-02-18 18:30:33 -0800218 /**
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700219 * Returns the group store eventual consistent key map.
alshabib10580802015-02-18 18:30:33 -0800220 *
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700221 * @return Map representing group key table.
alshabib10580802015-02-18 18:30:33 -0800222 */
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700223 private EventuallyConsistentMap<GroupStoreKeyMapKey, StoredGroupEntry>
224 getGroupStoreKeyMap() {
225 return groupStoreEntriesByKey;
alshabib10580802015-02-18 18:30:33 -0800226 }
227
228 /**
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700229 * Returns the group id table for specified device.
alshabib10580802015-02-18 18:30:33 -0800230 *
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700231 * @param deviceId identifier of the device
232 * @return Map representing group key table of given device.
alshabib10580802015-02-18 18:30:33 -0800233 */
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700234 private ConcurrentMap<GroupId, StoredGroupEntry> getGroupIdTable(DeviceId deviceId) {
235 return createIfAbsentUnchecked(groupEntriesById,
236 deviceId, lazyEmptyGroupIdTable());
alshabib10580802015-02-18 18:30:33 -0800237 }
238
239 /**
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700240 * Returns the pending group request table.
alshabib10580802015-02-18 18:30:33 -0800241 *
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700242 * @return Map representing group key table.
alshabib10580802015-02-18 18:30:33 -0800243 */
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700244 private EventuallyConsistentMap<GroupStoreKeyMapKey, StoredGroupEntry>
245 getPendingGroupKeyTable() {
246 return auditPendingReqQueue;
alshabib10580802015-02-18 18:30:33 -0800247 }
248
249 /**
250 * Returns the extraneous group id table for specified device.
251 *
252 * @param deviceId identifier of the device
253 * @return Map representing group key table of given device.
254 */
255 private ConcurrentMap<GroupId, Group>
256 getExtraneousGroupIdTable(DeviceId deviceId) {
257 return createIfAbsentUnchecked(extraneousGroupEntriesById,
258 deviceId,
259 lazyEmptyExtraneousGroupIdTable());
260 }
261
262 /**
263 * Returns the number of groups for the specified device in the store.
264 *
265 * @return number of groups for the specified device
266 */
267 @Override
268 public int getGroupCount(DeviceId deviceId) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700269 return (getGroups(deviceId) != null) ?
270 Iterables.size(getGroups(deviceId)) : 0;
alshabib10580802015-02-18 18:30:33 -0800271 }
272
273 /**
274 * Returns the groups associated with a device.
275 *
276 * @param deviceId the device ID
277 *
278 * @return the group entries
279 */
280 @Override
281 public Iterable<Group> getGroups(DeviceId deviceId) {
Charles Chanf4838a72015-12-07 18:13:45 -0800282 // Let ImmutableSet.copyOf do the type conversion
283 return ImmutableSet.copyOf(getStoredGroups(deviceId));
alshabib10580802015-02-18 18:30:33 -0800284 }
285
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700286 private Iterable<StoredGroupEntry> getStoredGroups(DeviceId deviceId) {
Charles Chanf4838a72015-12-07 18:13:45 -0800287 NodeId master = mastershipService.getMasterFor(deviceId);
288 if (master == null) {
289 log.debug("Failed to getGroups: No master for {}", deviceId);
290 return Collections.emptySet();
291 }
292
293 Set<StoredGroupEntry> storedGroups = getGroupStoreKeyMap().values()
294 .stream()
295 .filter(input -> input.deviceId().equals(deviceId))
296 .collect(Collectors.toSet());
297 return ImmutableSet.copyOf(storedGroups);
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700298 }
299
alshabib10580802015-02-18 18:30:33 -0800300 /**
301 * Returns the stored group entry.
302 *
303 * @param deviceId the device ID
304 * @param appCookie the group key
305 *
306 * @return a group associated with the key
307 */
308 @Override
309 public Group getGroup(DeviceId deviceId, GroupKey appCookie) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700310 return getStoredGroupEntry(deviceId, appCookie);
311 }
312
313 private StoredGroupEntry getStoredGroupEntry(DeviceId deviceId,
314 GroupKey appCookie) {
315 return getGroupStoreKeyMap().get(new GroupStoreKeyMapKey(deviceId,
316 appCookie));
317 }
318
319 @Override
320 public Group getGroup(DeviceId deviceId, GroupId groupId) {
321 return getStoredGroupEntry(deviceId, groupId);
322 }
323
324 private StoredGroupEntry getStoredGroupEntry(DeviceId deviceId,
325 GroupId groupId) {
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700326 return getGroupIdTable(deviceId).get(groupId);
alshabib10580802015-02-18 18:30:33 -0800327 }
328
329 private int getFreeGroupIdValue(DeviceId deviceId) {
330 int freeId = groupIdGen.incrementAndGet();
331
332 while (true) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700333 Group existing = getGroup(deviceId, new DefaultGroupId(freeId));
alshabib10580802015-02-18 18:30:33 -0800334 if (existing == null) {
335 existing = (
336 extraneousGroupEntriesById.get(deviceId) != null) ?
337 extraneousGroupEntriesById.get(deviceId).
338 get(new DefaultGroupId(freeId)) :
339 null;
340 }
341 if (existing != null) {
342 freeId = groupIdGen.incrementAndGet();
343 } else {
344 break;
345 }
346 }
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700347 log.debug("getFreeGroupIdValue: Next Free ID is {}", freeId);
alshabib10580802015-02-18 18:30:33 -0800348 return freeId;
349 }
350
351 /**
352 * Stores a new group entry using the information from group description.
353 *
354 * @param groupDesc group description to be used to create group entry
355 */
356 @Override
357 public void storeGroupDescription(GroupDescription groupDesc) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700358 log.debug("In storeGroupDescription");
alshabib10580802015-02-18 18:30:33 -0800359 // Check if a group is existing with the same key
Saurav Das8a0732e2015-11-20 15:27:53 -0800360 Group existingGroup = getGroup(groupDesc.deviceId(), groupDesc.appCookie());
361 if (existingGroup != null) {
Saurav Das4ce45962015-11-24 23:21:05 -0800362 log.warn("Group already exists with the same key {} in dev:{} with id:0x{}",
Saurav Das8a0732e2015-11-20 15:27:53 -0800363 groupDesc.appCookie(), groupDesc.deviceId(),
364 Integer.toHexString(existingGroup.id().id()));
alshabib10580802015-02-18 18:30:33 -0800365 return;
366 }
367
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700368 // Check if group to be created by a remote instance
Madan Jampani175e8fd2015-05-20 14:10:45 -0700369 if (mastershipService.getLocalRole(groupDesc.deviceId()) != MastershipRole.MASTER) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700370 log.debug("storeGroupDescription: Device {} local role is not MASTER",
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700371 groupDesc.deviceId());
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700372 if (mastershipService.getMasterFor(groupDesc.deviceId()) == null) {
373 log.error("No Master for device {}..."
374 + "Can not perform add group operation",
375 groupDesc.deviceId());
376 //TODO: Send Group operation failure event
377 return;
378 }
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700379 GroupStoreMessage groupOp = GroupStoreMessage.
380 createGroupAddRequestMsg(groupDesc.deviceId(),
381 groupDesc);
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700382
Madan Jampani175e8fd2015-05-20 14:10:45 -0700383 clusterCommunicator.unicast(groupOp,
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700384 GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST,
HIGUCHI Yuta180d70f2015-10-01 16:13:56 -0700385 clusterMsgSerializer::serialize,
Madan Jampani175e8fd2015-05-20 14:10:45 -0700386 mastershipService.getMasterFor(groupDesc.deviceId())).whenComplete((result, error) -> {
387 if (error != null) {
388 log.warn("Failed to send request to master: {} to {}",
389 groupOp,
390 mastershipService.getMasterFor(groupDesc.deviceId()));
391 //TODO: Send Group operation failure event
392 } else {
393 log.debug("Sent Group operation request for device {} "
394 + "to remote MASTER {}",
395 groupDesc.deviceId(),
396 mastershipService.getMasterFor(groupDesc.deviceId()));
397 }
398 });
alshabib10580802015-02-18 18:30:33 -0800399 return;
400 }
401
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700402 log.debug("Store group for device {} is getting handled locally",
403 groupDesc.deviceId());
alshabib10580802015-02-18 18:30:33 -0800404 storeGroupDescriptionInternal(groupDesc);
405 }
406
Srikanth Vavilapallie48b3cf2015-07-06 11:43:07 -0700407 private Group getMatchingExtraneousGroupbyId(DeviceId deviceId, Integer groupId) {
408 ConcurrentMap<GroupId, Group> extraneousMap =
409 extraneousGroupEntriesById.get(deviceId);
410 if (extraneousMap == null) {
411 return null;
412 }
413 return extraneousMap.get(new DefaultGroupId(groupId));
414 }
415
416 private Group getMatchingExtraneousGroupbyBuckets(DeviceId deviceId,
417 GroupBuckets buckets) {
418 ConcurrentMap<GroupId, Group> extraneousMap =
419 extraneousGroupEntriesById.get(deviceId);
420 if (extraneousMap == null) {
421 return null;
422 }
423
424 for (Group extraneousGroup:extraneousMap.values()) {
425 if (extraneousGroup.buckets().equals(buckets)) {
426 return extraneousGroup;
427 }
428 }
429 return null;
430 }
431
alshabib10580802015-02-18 18:30:33 -0800432 private void storeGroupDescriptionInternal(GroupDescription groupDesc) {
433 // Check if a group is existing with the same key
434 if (getGroup(groupDesc.deviceId(), groupDesc.appCookie()) != null) {
435 return;
436 }
437
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700438 if (deviceAuditStatus.get(groupDesc.deviceId()) == null) {
439 // Device group audit has not completed yet
440 // Add this group description to pending group key table
441 // Create a group entry object with Dummy Group ID
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700442 log.debug("storeGroupDescriptionInternal: Device {} AUDIT pending...Queuing Group ADD request",
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700443 groupDesc.deviceId());
444 StoredGroupEntry group = new DefaultGroup(dummyGroupId, groupDesc);
445 group.setState(GroupState.WAITING_AUDIT_COMPLETE);
446 EventuallyConsistentMap<GroupStoreKeyMapKey, StoredGroupEntry> pendingKeyTable =
447 getPendingGroupKeyTable();
448 pendingKeyTable.put(new GroupStoreKeyMapKey(groupDesc.deviceId(),
449 groupDesc.appCookie()),
450 group);
451 return;
452 }
453
Srikanth Vavilapallie48b3cf2015-07-06 11:43:07 -0700454 Group matchingExtraneousGroup = null;
455 if (groupDesc.givenGroupId() != null) {
456 //Check if there is a extraneous group existing with the same Id
457 matchingExtraneousGroup = getMatchingExtraneousGroupbyId(
458 groupDesc.deviceId(), groupDesc.givenGroupId());
459 if (matchingExtraneousGroup != null) {
460 log.debug("storeGroupDescriptionInternal: Matching extraneous group found in Device {} for group id {}",
461 groupDesc.deviceId(),
462 groupDesc.givenGroupId());
463 //Check if the group buckets matches with user provided buckets
464 if (matchingExtraneousGroup.buckets().equals(groupDesc.buckets())) {
465 //Group is already existing with the same buckets and Id
466 // Create a group entry object
467 log.debug("storeGroupDescriptionInternal: Buckets also matching in Device {} for group id {}",
468 groupDesc.deviceId(),
469 groupDesc.givenGroupId());
470 StoredGroupEntry group = new DefaultGroup(
471 matchingExtraneousGroup.id(), groupDesc);
472 // Insert the newly created group entry into key and id maps
473 getGroupStoreKeyMap().
474 put(new GroupStoreKeyMapKey(groupDesc.deviceId(),
475 groupDesc.appCookie()), group);
476 // Ensure it also inserted into group id based table to
477 // avoid any chances of duplication in group id generation
478 getGroupIdTable(groupDesc.deviceId()).
479 put(matchingExtraneousGroup.id(), group);
480 addOrUpdateGroupEntry(matchingExtraneousGroup);
481 removeExtraneousGroupEntry(matchingExtraneousGroup);
482 return;
483 } else {
484 //Group buckets are not matching. Update group
485 //with user provided buckets.
486 //TODO
487 log.debug("storeGroupDescriptionInternal: Buckets are not matching in Device {} for group id {}",
488 groupDesc.deviceId(),
489 groupDesc.givenGroupId());
490 }
491 }
492 } else {
493 //Check if there is an extraneous group with user provided buckets
494 matchingExtraneousGroup = getMatchingExtraneousGroupbyBuckets(
495 groupDesc.deviceId(), groupDesc.buckets());
496 if (matchingExtraneousGroup != null) {
497 //Group is already existing with the same buckets.
498 //So reuse this group.
499 log.debug("storeGroupDescriptionInternal: Matching extraneous group found in Device {}",
500 groupDesc.deviceId());
501 //Create a group entry object
502 StoredGroupEntry group = new DefaultGroup(
503 matchingExtraneousGroup.id(), groupDesc);
504 // Insert the newly created group entry into key and id maps
505 getGroupStoreKeyMap().
506 put(new GroupStoreKeyMapKey(groupDesc.deviceId(),
507 groupDesc.appCookie()), group);
508 // Ensure it also inserted into group id based table to
509 // avoid any chances of duplication in group id generation
510 getGroupIdTable(groupDesc.deviceId()).
511 put(matchingExtraneousGroup.id(), group);
512 addOrUpdateGroupEntry(matchingExtraneousGroup);
513 removeExtraneousGroupEntry(matchingExtraneousGroup);
514 return;
515 } else {
516 //TODO: Check if there are any empty groups that can be used here
517 log.debug("storeGroupDescriptionInternal: No matching extraneous groups found in Device {}",
518 groupDesc.deviceId());
519 }
520 }
521
Saurav Das100e3b82015-04-30 11:12:10 -0700522 GroupId id = null;
523 if (groupDesc.givenGroupId() == null) {
524 // Get a new group identifier
525 id = new DefaultGroupId(getFreeGroupIdValue(groupDesc.deviceId()));
526 } else {
527 id = new DefaultGroupId(groupDesc.givenGroupId());
528 }
alshabib10580802015-02-18 18:30:33 -0800529 // Create a group entry object
530 StoredGroupEntry group = new DefaultGroup(id, groupDesc);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700531 // Insert the newly created group entry into key and id maps
532 getGroupStoreKeyMap().
533 put(new GroupStoreKeyMapKey(groupDesc.deviceId(),
534 groupDesc.appCookie()), group);
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700535 // Ensure it also inserted into group id based table to
536 // avoid any chances of duplication in group id generation
537 getGroupIdTable(groupDesc.deviceId()).
538 put(id, group);
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700539 log.debug("storeGroupDescriptionInternal: Processing Group ADD request for Id {} in device {}",
540 id,
541 groupDesc.deviceId());
alshabib10580802015-02-18 18:30:33 -0800542 notifyDelegate(new GroupEvent(GroupEvent.Type.GROUP_ADD_REQUESTED,
543 group));
544 }
545
546 /**
547 * Updates the existing group entry with the information
548 * from group description.
549 *
550 * @param deviceId the device ID
551 * @param oldAppCookie the current group key
552 * @param type update type
553 * @param newBuckets group buckets for updates
554 * @param newAppCookie optional new group key
555 */
556 @Override
557 public void updateGroupDescription(DeviceId deviceId,
558 GroupKey oldAppCookie,
559 UpdateType type,
560 GroupBuckets newBuckets,
561 GroupKey newAppCookie) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700562 // Check if group update to be done by a remote instance
sangho52abe3a2015-05-05 14:13:34 -0700563 if (mastershipService.getMasterFor(deviceId) != null &&
564 mastershipService.getLocalRole(deviceId) != MastershipRole.MASTER) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700565 log.debug("updateGroupDescription: Device {} local role is not MASTER",
566 deviceId);
567 if (mastershipService.getMasterFor(deviceId) == null) {
568 log.error("No Master for device {}..."
569 + "Can not perform update group operation",
570 deviceId);
571 //TODO: Send Group operation failure event
572 return;
573 }
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700574 GroupStoreMessage groupOp = GroupStoreMessage.
575 createGroupUpdateRequestMsg(deviceId,
576 oldAppCookie,
577 type,
578 newBuckets,
579 newAppCookie);
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700580
Madan Jampani175e8fd2015-05-20 14:10:45 -0700581 clusterCommunicator.unicast(groupOp,
582 GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST,
HIGUCHI Yuta180d70f2015-10-01 16:13:56 -0700583 clusterMsgSerializer::serialize,
Madan Jampani175e8fd2015-05-20 14:10:45 -0700584 mastershipService.getMasterFor(deviceId)).whenComplete((result, error) -> {
585 if (error != null) {
586 log.warn("Failed to send request to master: {} to {}",
587 groupOp,
588 mastershipService.getMasterFor(deviceId), error);
589 }
590 //TODO: Send Group operation failure event
591 });
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700592 return;
593 }
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700594 log.debug("updateGroupDescription for device {} is getting handled locally",
595 deviceId);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700596 updateGroupDescriptionInternal(deviceId,
597 oldAppCookie,
598 type,
599 newBuckets,
600 newAppCookie);
601 }
602
603 private void updateGroupDescriptionInternal(DeviceId deviceId,
604 GroupKey oldAppCookie,
605 UpdateType type,
606 GroupBuckets newBuckets,
607 GroupKey newAppCookie) {
alshabib10580802015-02-18 18:30:33 -0800608 // Check if a group is existing with the provided key
609 Group oldGroup = getGroup(deviceId, oldAppCookie);
610 if (oldGroup == null) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700611 log.warn("updateGroupDescriptionInternal: Group not found...strange");
alshabib10580802015-02-18 18:30:33 -0800612 return;
613 }
614
615 List<GroupBucket> newBucketList = getUpdatedBucketList(oldGroup,
616 type,
617 newBuckets);
618 if (newBucketList != null) {
619 // Create a new group object from the old group
620 GroupBuckets updatedBuckets = new GroupBuckets(newBucketList);
621 GroupKey newCookie = (newAppCookie != null) ? newAppCookie : oldAppCookie;
622 GroupDescription updatedGroupDesc = new DefaultGroupDescription(
623 oldGroup.deviceId(),
624 oldGroup.type(),
625 updatedBuckets,
626 newCookie,
Saurav Das100e3b82015-04-30 11:12:10 -0700627 oldGroup.givenGroupId(),
alshabib10580802015-02-18 18:30:33 -0800628 oldGroup.appId());
629 StoredGroupEntry newGroup = new DefaultGroup(oldGroup.id(),
630 updatedGroupDesc);
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700631 log.debug("updateGroupDescriptionInternal: group entry {} in device {} moving from {} to PENDING_UPDATE",
632 oldGroup.id(),
633 oldGroup.deviceId(),
634 oldGroup.state());
alshabib10580802015-02-18 18:30:33 -0800635 newGroup.setState(GroupState.PENDING_UPDATE);
636 newGroup.setLife(oldGroup.life());
637 newGroup.setPackets(oldGroup.packets());
638 newGroup.setBytes(oldGroup.bytes());
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700639 //Update the group entry in groupkey based map.
640 //Update to groupid based map will happen in the
641 //groupkey based map update listener
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700642 log.debug("updateGroupDescriptionInternal with type {}: Group updated with buckets",
643 type);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700644 getGroupStoreKeyMap().
645 put(new GroupStoreKeyMapKey(newGroup.deviceId(),
646 newGroup.appCookie()), newGroup);
alshabib10580802015-02-18 18:30:33 -0800647 notifyDelegate(new GroupEvent(Type.GROUP_UPDATE_REQUESTED, newGroup));
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700648 } else {
649 log.warn("updateGroupDescriptionInternal with type {}: No "
650 + "change in the buckets in update", type);
alshabib10580802015-02-18 18:30:33 -0800651 }
652 }
653
654 private List<GroupBucket> getUpdatedBucketList(Group oldGroup,
655 UpdateType type,
656 GroupBuckets buckets) {
657 GroupBuckets oldBuckets = oldGroup.buckets();
Sho SHIMIZU7a4087b2015-09-10 09:23:16 -0700658 List<GroupBucket> newBucketList = new ArrayList<>(oldBuckets.buckets());
alshabib10580802015-02-18 18:30:33 -0800659 boolean groupDescUpdated = false;
660
661 if (type == UpdateType.ADD) {
662 // Check if the any of the new buckets are part of
663 // the old bucket list
664 for (GroupBucket addBucket:buckets.buckets()) {
665 if (!newBucketList.contains(addBucket)) {
666 newBucketList.add(addBucket);
667 groupDescUpdated = true;
668 }
669 }
670 } else if (type == UpdateType.REMOVE) {
671 // Check if the to be removed buckets are part of the
672 // old bucket list
673 for (GroupBucket removeBucket:buckets.buckets()) {
674 if (newBucketList.contains(removeBucket)) {
675 newBucketList.remove(removeBucket);
676 groupDescUpdated = true;
677 }
678 }
679 }
680
681 if (groupDescUpdated) {
682 return newBucketList;
683 } else {
684 return null;
685 }
686 }
687
688 /**
689 * Triggers deleting the existing group entry.
690 *
691 * @param deviceId the device ID
692 * @param appCookie the group key
693 */
694 @Override
695 public void deleteGroupDescription(DeviceId deviceId,
696 GroupKey appCookie) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700697 // Check if group to be deleted by a remote instance
698 if (mastershipService.
699 getLocalRole(deviceId) != MastershipRole.MASTER) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700700 log.debug("deleteGroupDescription: Device {} local role is not MASTER",
701 deviceId);
702 if (mastershipService.getMasterFor(deviceId) == null) {
703 log.error("No Master for device {}..."
704 + "Can not perform delete group operation",
705 deviceId);
706 //TODO: Send Group operation failure event
707 return;
708 }
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700709 GroupStoreMessage groupOp = GroupStoreMessage.
710 createGroupDeleteRequestMsg(deviceId,
711 appCookie);
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700712
Madan Jampani175e8fd2015-05-20 14:10:45 -0700713 clusterCommunicator.unicast(groupOp,
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700714 GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST,
HIGUCHI Yuta180d70f2015-10-01 16:13:56 -0700715 clusterMsgSerializer::serialize,
Madan Jampani175e8fd2015-05-20 14:10:45 -0700716 mastershipService.getMasterFor(deviceId)).whenComplete((result, error) -> {
717 if (error != null) {
718 log.warn("Failed to send request to master: {} to {}",
719 groupOp,
720 mastershipService.getMasterFor(deviceId), error);
721 }
722 //TODO: Send Group operation failure event
723 });
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700724 return;
725 }
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700726 log.debug("deleteGroupDescription in device {} is getting handled locally",
727 deviceId);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700728 deleteGroupDescriptionInternal(deviceId, appCookie);
729 }
730
731 private void deleteGroupDescriptionInternal(DeviceId deviceId,
732 GroupKey appCookie) {
alshabib10580802015-02-18 18:30:33 -0800733 // Check if a group is existing with the provided key
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700734 StoredGroupEntry existing = getStoredGroupEntry(deviceId, appCookie);
alshabib10580802015-02-18 18:30:33 -0800735 if (existing == null) {
736 return;
737 }
738
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700739 log.debug("deleteGroupDescriptionInternal: group entry {} in device {} moving from {} to PENDING_DELETE",
740 existing.id(),
741 existing.deviceId(),
742 existing.state());
alshabib10580802015-02-18 18:30:33 -0800743 synchronized (existing) {
744 existing.setState(GroupState.PENDING_DELETE);
745 }
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700746 log.debug("deleteGroupDescriptionInternal: in device {} issuing GROUP_REMOVE_REQUESTED",
747 deviceId);
alshabib10580802015-02-18 18:30:33 -0800748 notifyDelegate(new GroupEvent(Type.GROUP_REMOVE_REQUESTED, existing));
749 }
750
751 /**
752 * Stores a new group entry, or updates an existing entry.
753 *
754 * @param group group entry
755 */
756 @Override
757 public void addOrUpdateGroupEntry(Group group) {
758 // check if this new entry is an update to an existing entry
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700759 StoredGroupEntry existing = getStoredGroupEntry(group.deviceId(),
760 group.id());
alshabib10580802015-02-18 18:30:33 -0800761 GroupEvent event = null;
762
763 if (existing != null) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700764 log.debug("addOrUpdateGroupEntry: updating group entry {} in device {}",
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700765 group.id(),
766 group.deviceId());
alshabib10580802015-02-18 18:30:33 -0800767 synchronized (existing) {
Srikanth Vavilapalli10e75cd2015-04-13 16:21:24 -0700768 for (GroupBucket bucket:group.buckets().buckets()) {
Sho SHIMIZU30d639b2015-05-05 09:30:35 -0700769 Optional<GroupBucket> matchingBucket =
Srikanth Vavilapalli10e75cd2015-04-13 16:21:24 -0700770 existing.buckets().buckets()
771 .stream()
772 .filter((existingBucket)->(existingBucket.equals(bucket)))
773 .findFirst();
774 if (matchingBucket.isPresent()) {
775 ((StoredGroupBucketEntry) matchingBucket.
776 get()).setPackets(bucket.packets());
777 ((StoredGroupBucketEntry) matchingBucket.
778 get()).setBytes(bucket.bytes());
779 } else {
780 log.warn("addOrUpdateGroupEntry: No matching "
781 + "buckets to update stats");
782 }
783 }
alshabib10580802015-02-18 18:30:33 -0800784 existing.setLife(group.life());
785 existing.setPackets(group.packets());
786 existing.setBytes(group.bytes());
Srikanth Vavilapalli5428b6c2015-05-14 20:22:47 -0700787 if ((existing.state() == GroupState.PENDING_ADD) ||
788 (existing.state() == GroupState.PENDING_ADD_RETRY)) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700789 log.debug("addOrUpdateGroupEntry: group entry {} in device {} moving from {} to ADDED",
790 existing.id(),
791 existing.deviceId(),
Srikanth Vavilapalli5428b6c2015-05-14 20:22:47 -0700792 existing.state());
alshabib10580802015-02-18 18:30:33 -0800793 existing.setState(GroupState.ADDED);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700794 existing.setIsGroupStateAddedFirstTime(true);
alshabib10580802015-02-18 18:30:33 -0800795 event = new GroupEvent(Type.GROUP_ADDED, existing);
796 } else {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700797 log.debug("addOrUpdateGroupEntry: group entry {} in device {} moving from {} to ADDED",
798 existing.id(),
799 existing.deviceId(),
800 GroupState.PENDING_UPDATE);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700801 existing.setState(GroupState.ADDED);
802 existing.setIsGroupStateAddedFirstTime(false);
alshabib10580802015-02-18 18:30:33 -0800803 event = new GroupEvent(Type.GROUP_UPDATED, existing);
804 }
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700805 //Re-PUT map entries to trigger map update events
806 getGroupStoreKeyMap().
807 put(new GroupStoreKeyMapKey(existing.deviceId(),
808 existing.appCookie()), existing);
alshabib10580802015-02-18 18:30:33 -0800809 }
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700810 } else {
811 log.warn("addOrUpdateGroupEntry: Group update "
812 + "happening for a non-existing entry in the map");
alshabib10580802015-02-18 18:30:33 -0800813 }
814
815 if (event != null) {
816 notifyDelegate(event);
817 }
818 }
819
820 /**
821 * Removes the group entry from store.
822 *
823 * @param group group entry
824 */
825 @Override
826 public void removeGroupEntry(Group group) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700827 StoredGroupEntry existing = getStoredGroupEntry(group.deviceId(),
828 group.id());
alshabib10580802015-02-18 18:30:33 -0800829
830 if (existing != null) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700831 log.debug("removeGroupEntry: removing group entry {} in device {}",
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700832 group.id(),
833 group.deviceId());
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700834 //Removal from groupid based map will happen in the
835 //map update listener
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700836 getGroupStoreKeyMap().remove(new GroupStoreKeyMapKey(existing.deviceId(),
837 existing.appCookie()));
alshabib10580802015-02-18 18:30:33 -0800838 notifyDelegate(new GroupEvent(Type.GROUP_REMOVED, existing));
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700839 } else {
840 log.warn("removeGroupEntry for {} in device{} is "
841 + "not existing in our maps",
842 group.id(),
843 group.deviceId());
alshabib10580802015-02-18 18:30:33 -0800844 }
845 }
846
847 @Override
848 public void deviceInitialAuditCompleted(DeviceId deviceId,
849 boolean completed) {
850 synchronized (deviceAuditStatus) {
851 if (completed) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700852 log.debug("AUDIT completed for device {}",
853 deviceId);
alshabib10580802015-02-18 18:30:33 -0800854 deviceAuditStatus.put(deviceId, true);
855 // Execute all pending group requests
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700856 List<StoredGroupEntry> pendingGroupRequests =
857 getPendingGroupKeyTable().values()
858 .stream()
859 .filter(g-> g.deviceId().equals(deviceId))
860 .collect(Collectors.toList());
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700861 log.debug("processing pending group add requests for device {} and number of pending requests {}",
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700862 deviceId,
863 pendingGroupRequests.size());
864 for (Group group:pendingGroupRequests) {
alshabib10580802015-02-18 18:30:33 -0800865 GroupDescription tmp = new DefaultGroupDescription(
866 group.deviceId(),
867 group.type(),
868 group.buckets(),
869 group.appCookie(),
Saurav Das100e3b82015-04-30 11:12:10 -0700870 group.givenGroupId(),
alshabib10580802015-02-18 18:30:33 -0800871 group.appId());
872 storeGroupDescriptionInternal(tmp);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700873 getPendingGroupKeyTable().
874 remove(new GroupStoreKeyMapKey(deviceId, group.appCookie()));
alshabib10580802015-02-18 18:30:33 -0800875 }
alshabib10580802015-02-18 18:30:33 -0800876 } else {
Thomas Vachuskac40d4632015-04-09 16:55:03 -0700877 Boolean audited = deviceAuditStatus.get(deviceId);
878 if (audited != null && audited) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700879 log.debug("Clearing AUDIT status for device {}", deviceId);
alshabib10580802015-02-18 18:30:33 -0800880 deviceAuditStatus.put(deviceId, false);
881 }
882 }
883 }
884 }
885
886 @Override
887 public boolean deviceInitialAuditStatus(DeviceId deviceId) {
888 synchronized (deviceAuditStatus) {
Thomas Vachuskac40d4632015-04-09 16:55:03 -0700889 Boolean audited = deviceAuditStatus.get(deviceId);
890 return audited != null && audited;
alshabib10580802015-02-18 18:30:33 -0800891 }
892 }
893
894 @Override
895 public void groupOperationFailed(DeviceId deviceId, GroupOperation operation) {
896
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700897 StoredGroupEntry existing = getStoredGroupEntry(deviceId,
898 operation.groupId());
alshabib10580802015-02-18 18:30:33 -0800899
900 if (existing == null) {
901 log.warn("No group entry with ID {} found ", operation.groupId());
902 return;
903 }
904
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700905 log.warn("groupOperationFailed: group operation {} failed"
906 + "for group {} in device {}",
907 operation.opType(),
908 existing.id(),
909 existing.deviceId());
alshabib10580802015-02-18 18:30:33 -0800910 switch (operation.opType()) {
911 case ADD:
Srikanth Vavilapalli5428b6c2015-05-14 20:22:47 -0700912 if (existing.state() == GroupState.PENDING_ADD) {
913 //TODO: Need to add support for passing the group
914 //operation failure reason from group provider.
915 //If the error type is anything other than GROUP_EXISTS,
916 //then the GROUP_ADD_FAILED event should be raised even
917 //in PENDING_ADD_RETRY state also.
918 notifyDelegate(new GroupEvent(Type.GROUP_ADD_FAILED, existing));
919 log.warn("groupOperationFailed: cleaningup "
920 + "group {} from store in device {}....",
921 existing.id(),
922 existing.deviceId());
923 //Removal from groupid based map will happen in the
924 //map update listener
925 getGroupStoreKeyMap().remove(new GroupStoreKeyMapKey(existing.deviceId(),
926 existing.appCookie()));
927 }
alshabib10580802015-02-18 18:30:33 -0800928 break;
929 case MODIFY:
930 notifyDelegate(new GroupEvent(Type.GROUP_UPDATE_FAILED, existing));
931 break;
932 case DELETE:
933 notifyDelegate(new GroupEvent(Type.GROUP_REMOVE_FAILED, existing));
934 break;
935 default:
936 log.warn("Unknown group operation type {}", operation.opType());
937 }
alshabib10580802015-02-18 18:30:33 -0800938 }
939
940 @Override
941 public void addOrUpdateExtraneousGroupEntry(Group group) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700942 log.debug("add/update extraneous group entry {} in device {}",
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700943 group.id(),
944 group.deviceId());
alshabib10580802015-02-18 18:30:33 -0800945 ConcurrentMap<GroupId, Group> extraneousIdTable =
946 getExtraneousGroupIdTable(group.deviceId());
947 extraneousIdTable.put(group.id(), group);
Srikanth Vavilapallie48b3cf2015-07-06 11:43:07 -0700948 // Don't remove the extraneous groups, instead re-use it when
949 // a group request comes with the same set of buckets
alshabib10580802015-02-18 18:30:33 -0800950 }
951
952 @Override
953 public void removeExtraneousGroupEntry(Group group) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700954 log.debug("remove extraneous group entry {} of device {} from store",
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700955 group.id(),
956 group.deviceId());
alshabib10580802015-02-18 18:30:33 -0800957 ConcurrentMap<GroupId, Group> extraneousIdTable =
958 getExtraneousGroupIdTable(group.deviceId());
959 extraneousIdTable.remove(group.id());
960 }
961
962 @Override
963 public Iterable<Group> getExtraneousGroups(DeviceId deviceId) {
964 // flatten and make iterator unmodifiable
965 return FluentIterable.from(
966 getExtraneousGroupIdTable(deviceId).values());
967 }
968
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700969 /**
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700970 * Map handler to receive any events when the group key map is updated.
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700971 */
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700972 private class GroupStoreKeyMapListener implements
973 EventuallyConsistentMapListener<GroupStoreKeyMapKey, StoredGroupEntry> {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700974
975 @Override
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700976 public void event(EventuallyConsistentMapEvent<GroupStoreKeyMapKey,
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700977 StoredGroupEntry> mapEvent) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700978 GroupEvent groupEvent = null;
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700979 GroupStoreKeyMapKey key = mapEvent.key();
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700980 StoredGroupEntry group = mapEvent.value();
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700981 if ((key == null) && (group == null)) {
982 log.error("GroupStoreKeyMapListener: Received "
983 + "event {} with null entry", mapEvent.type());
984 return;
985 } else if (group == null) {
986 group = getGroupIdTable(key.deviceId()).values()
987 .stream()
988 .filter((storedGroup) -> (storedGroup.appCookie().equals(key.appCookie)))
989 .findFirst().get();
990 if (group == null) {
991 log.error("GroupStoreKeyMapListener: Received "
992 + "event {} with null entry... can not process", mapEvent.type());
993 return;
994 }
995 }
996 log.trace("received groupid map event {} for id {} in device {}",
997 mapEvent.type(),
998 group.id(),
999 key.deviceId());
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001000 if (mapEvent.type() == EventuallyConsistentMapEvent.Type.PUT) {
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -07001001 // Update the group ID table
1002 getGroupIdTable(group.deviceId()).put(group.id(), group);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001003 if (mapEvent.value().state() == Group.GroupState.ADDED) {
1004 if (mapEvent.value().isGroupStateAddedFirstTime()) {
1005 groupEvent = new GroupEvent(Type.GROUP_ADDED,
1006 mapEvent.value());
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001007 log.trace("Received first time GROUP_ADDED state update for id {} in device {}",
1008 group.id(),
1009 group.deviceId());
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001010 } else {
1011 groupEvent = new GroupEvent(Type.GROUP_UPDATED,
1012 mapEvent.value());
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001013 log.trace("Received following GROUP_ADDED state update for id {} in device {}",
1014 group.id(),
1015 group.deviceId());
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001016 }
1017 }
1018 } else if (mapEvent.type() == EventuallyConsistentMapEvent.Type.REMOVE) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001019 groupEvent = new GroupEvent(Type.GROUP_REMOVED, group);
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -07001020 // Remove the entry from the group ID table
1021 getGroupIdTable(group.deviceId()).remove(group.id(), group);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001022 }
1023
1024 if (groupEvent != null) {
1025 notifyDelegate(groupEvent);
1026 }
1027 }
1028 }
Madan Jampani01e05fb2015-08-13 13:29:36 -07001029
1030 private void process(GroupStoreMessage groupOp) {
1031 log.debug("Received remote group operation {} request for device {}",
1032 groupOp.type(),
1033 groupOp.deviceId());
1034 if (!mastershipService.isLocalMaster(groupOp.deviceId())) {
1035 log.warn("This node is not MASTER for device {}", groupOp.deviceId());
1036 return;
1037 }
1038 if (groupOp.type() == GroupStoreMessage.Type.ADD) {
1039 storeGroupDescriptionInternal(groupOp.groupDesc());
1040 } else if (groupOp.type() == GroupStoreMessage.Type.UPDATE) {
1041 updateGroupDescriptionInternal(groupOp.deviceId(),
1042 groupOp.appCookie(),
1043 groupOp.updateType(),
1044 groupOp.updateBuckets(),
1045 groupOp.newAppCookie());
1046 } else if (groupOp.type() == GroupStoreMessage.Type.DELETE) {
1047 deleteGroupDescriptionInternal(groupOp.deviceId(),
1048 groupOp.appCookie());
1049 }
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001050 }
1051
1052 /**
1053 * Flattened map key to be used to store group entries.
1054 */
Ray Milkeyb3c5ce22015-08-10 09:07:36 -07001055 protected static class GroupStoreMapKey {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001056 private final DeviceId deviceId;
1057
1058 public GroupStoreMapKey(DeviceId deviceId) {
1059 this.deviceId = deviceId;
1060 }
1061
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001062 public DeviceId deviceId() {
1063 return deviceId;
1064 }
1065
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001066 @Override
1067 public boolean equals(Object o) {
1068 if (this == o) {
1069 return true;
1070 }
1071 if (!(o instanceof GroupStoreMapKey)) {
1072 return false;
1073 }
1074 GroupStoreMapKey that = (GroupStoreMapKey) o;
1075 return this.deviceId.equals(that.deviceId);
1076 }
1077
1078 @Override
1079 public int hashCode() {
1080 int result = 17;
1081
1082 result = 31 * result + Objects.hash(this.deviceId);
1083
1084 return result;
1085 }
1086 }
1087
Ray Milkeyb3c5ce22015-08-10 09:07:36 -07001088 protected static class GroupStoreKeyMapKey extends GroupStoreMapKey {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001089 private final GroupKey appCookie;
1090 public GroupStoreKeyMapKey(DeviceId deviceId,
1091 GroupKey appCookie) {
1092 super(deviceId);
1093 this.appCookie = appCookie;
1094 }
1095
1096 @Override
1097 public boolean equals(Object o) {
1098 if (this == o) {
1099 return true;
1100 }
1101 if (!(o instanceof GroupStoreKeyMapKey)) {
1102 return false;
1103 }
1104 GroupStoreKeyMapKey that = (GroupStoreKeyMapKey) o;
1105 return (super.equals(that) &&
1106 this.appCookie.equals(that.appCookie));
1107 }
1108
1109 @Override
1110 public int hashCode() {
1111 int result = 17;
1112
1113 result = 31 * result + super.hashCode() + Objects.hash(this.appCookie);
1114
1115 return result;
1116 }
1117 }
1118
Ray Milkeyb3c5ce22015-08-10 09:07:36 -07001119 protected static class GroupStoreIdMapKey extends GroupStoreMapKey {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001120 private final GroupId groupId;
1121 public GroupStoreIdMapKey(DeviceId deviceId,
1122 GroupId groupId) {
1123 super(deviceId);
1124 this.groupId = groupId;
1125 }
1126
1127 @Override
1128 public boolean equals(Object o) {
1129 if (this == o) {
1130 return true;
1131 }
1132 if (!(o instanceof GroupStoreIdMapKey)) {
1133 return false;
1134 }
1135 GroupStoreIdMapKey that = (GroupStoreIdMapKey) o;
1136 return (super.equals(that) &&
1137 this.groupId.equals(that.groupId));
1138 }
1139
1140 @Override
1141 public int hashCode() {
1142 int result = 17;
1143
1144 result = 31 * result + super.hashCode() + Objects.hash(this.groupId);
1145
1146 return result;
1147 }
1148 }
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001149
1150 @Override
1151 public void pushGroupMetrics(DeviceId deviceId,
1152 Collection<Group> groupEntries) {
1153 boolean deviceInitialAuditStatus =
1154 deviceInitialAuditStatus(deviceId);
1155 Set<Group> southboundGroupEntries =
1156 Sets.newHashSet(groupEntries);
1157 Set<StoredGroupEntry> storedGroupEntries =
1158 Sets.newHashSet(getStoredGroups(deviceId));
1159 Set<Group> extraneousStoredEntries =
1160 Sets.newHashSet(getExtraneousGroups(deviceId));
1161
1162 log.trace("pushGroupMetrics: Displaying all ({}) southboundGroupEntries for device {}",
1163 southboundGroupEntries.size(),
1164 deviceId);
1165 for (Iterator<Group> it = southboundGroupEntries.iterator(); it.hasNext();) {
1166 Group group = it.next();
1167 log.trace("Group {} in device {}", group, deviceId);
1168 }
1169
1170 log.trace("Displaying all ({}) stored group entries for device {}",
1171 storedGroupEntries.size(),
1172 deviceId);
1173 for (Iterator<StoredGroupEntry> it1 = storedGroupEntries.iterator();
1174 it1.hasNext();) {
1175 Group group = it1.next();
1176 log.trace("Stored Group {} for device {}", group, deviceId);
1177 }
1178
1179 for (Iterator<Group> it2 = southboundGroupEntries.iterator(); it2.hasNext();) {
1180 Group group = it2.next();
1181 if (storedGroupEntries.remove(group)) {
1182 // we both have the group, let's update some info then.
1183 log.trace("Group AUDIT: group {} exists in both planes for device {}",
1184 group.id(), deviceId);
1185 groupAdded(group);
1186 it2.remove();
1187 }
1188 }
1189 for (Group group : southboundGroupEntries) {
1190 if (getGroup(group.deviceId(), group.id()) != null) {
1191 // There is a group existing with the same id
1192 // It is possible that group update is
1193 // in progress while we got a stale info from switch
1194 if (!storedGroupEntries.remove(getGroup(
1195 group.deviceId(), group.id()))) {
1196 log.warn("Group AUDIT: Inconsistent state:"
1197 + "Group exists in ID based table while "
1198 + "not present in key based table");
1199 }
1200 } else {
1201 // there are groups in the switch that aren't in the store
1202 log.debug("Group AUDIT: extraneous group {} exists in data plane for device {}",
1203 group.id(), deviceId);
1204 extraneousStoredEntries.remove(group);
1205 extraneousGroup(group);
1206 }
1207 }
1208 for (Group group : storedGroupEntries) {
1209 // there are groups in the store that aren't in the switch
1210 log.debug("Group AUDIT: group {} missing in data plane for device {}",
1211 group.id(), deviceId);
1212 groupMissing(group);
1213 }
1214 for (Group group : extraneousStoredEntries) {
1215 // there are groups in the extraneous store that
1216 // aren't in the switch
1217 log.debug("Group AUDIT: clearing extransoeus group {} from store for device {}",
1218 group.id(), deviceId);
1219 removeExtraneousGroupEntry(group);
1220 }
1221
1222 if (!deviceInitialAuditStatus) {
1223 log.debug("Group AUDIT: Setting device {} initial AUDIT completed",
1224 deviceId);
1225 deviceInitialAuditCompleted(deviceId, true);
1226 }
1227 }
1228
1229 private void groupMissing(Group group) {
1230 switch (group.state()) {
1231 case PENDING_DELETE:
1232 log.debug("Group {} delete confirmation from device {}",
1233 group, group.deviceId());
1234 removeGroupEntry(group);
1235 break;
1236 case ADDED:
1237 case PENDING_ADD:
Srikanth Vavilapalli5428b6c2015-05-14 20:22:47 -07001238 case PENDING_ADD_RETRY:
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001239 case PENDING_UPDATE:
1240 log.debug("Group {} is in store but not on device {}",
1241 group, group.deviceId());
1242 StoredGroupEntry existing =
1243 getStoredGroupEntry(group.deviceId(), group.id());
Srikanth Vavilapalli5428b6c2015-05-14 20:22:47 -07001244 log.debug("groupMissing: group entry {} in device {} moving from {} to PENDING_ADD_RETRY",
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001245 existing.id(),
1246 existing.deviceId(),
1247 existing.state());
Srikanth Vavilapalli5428b6c2015-05-14 20:22:47 -07001248 existing.setState(Group.GroupState.PENDING_ADD_RETRY);
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001249 //Re-PUT map entries to trigger map update events
1250 getGroupStoreKeyMap().
1251 put(new GroupStoreKeyMapKey(existing.deviceId(),
1252 existing.appCookie()), existing);
1253 notifyDelegate(new GroupEvent(GroupEvent.Type.GROUP_ADD_REQUESTED,
1254 group));
1255 break;
1256 default:
1257 log.debug("Group {} has not been installed.", group);
1258 break;
1259 }
1260 }
1261
1262 private void extraneousGroup(Group group) {
1263 log.debug("Group {} is on device {} but not in store.",
1264 group, group.deviceId());
1265 addOrUpdateExtraneousGroupEntry(group);
1266 }
1267
1268 private void groupAdded(Group group) {
1269 log.trace("Group {} Added or Updated in device {}",
1270 group, group.deviceId());
1271 addOrUpdateGroupEntry(group);
1272 }
alshabib10580802015-02-18 18:30:33 -08001273}