blob: 1d7ded1b2300bb5296f9c5d51d11b5c045b8d13a [file] [log] [blame]
alshabib10580802015-02-18 18:30:33 -08001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.group.impl;
17
Jonathan Hart6ec029a2015-03-24 17:12:35 -070018import com.google.common.collect.FluentIterable;
Charles Chanf4838a72015-12-07 18:13:45 -080019import com.google.common.collect.ImmutableSet;
Jonathan Hart6ec029a2015-03-24 17:12:35 -070020import com.google.common.collect.Iterables;
Srikanth Vavilapalli23181912015-05-04 09:48:09 -070021import com.google.common.collect.Sets;
Srikanth Vavilapalli10e75cd2015-04-13 16:21:24 -070022
alshabib10580802015-02-18 18:30:33 -080023import org.apache.felix.scr.annotations.Activate;
24import org.apache.felix.scr.annotations.Component;
25import org.apache.felix.scr.annotations.Deactivate;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070026import org.apache.felix.scr.annotations.Reference;
27import org.apache.felix.scr.annotations.ReferenceCardinality;
alshabib10580802015-02-18 18:30:33 -080028import org.apache.felix.scr.annotations.Service;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070029import org.onlab.util.KryoNamespace;
alshabib10580802015-02-18 18:30:33 -080030import org.onlab.util.NewConcurrentHashMap;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070031import org.onosproject.cluster.ClusterService;
Charles Chanf4838a72015-12-07 18:13:45 -080032import org.onosproject.cluster.NodeId;
alshabib10580802015-02-18 18:30:33 -080033import org.onosproject.core.DefaultGroupId;
34import org.onosproject.core.GroupId;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070035import org.onosproject.mastership.MastershipService;
alshabib10580802015-02-18 18:30:33 -080036import org.onosproject.net.DeviceId;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070037import org.onosproject.net.MastershipRole;
alshabib10580802015-02-18 18:30:33 -080038import org.onosproject.net.group.DefaultGroup;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070039import org.onosproject.net.group.DefaultGroupBucket;
alshabib10580802015-02-18 18:30:33 -080040import org.onosproject.net.group.DefaultGroupDescription;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070041import org.onosproject.net.group.DefaultGroupKey;
alshabib10580802015-02-18 18:30:33 -080042import org.onosproject.net.group.Group;
43import org.onosproject.net.group.Group.GroupState;
44import org.onosproject.net.group.GroupBucket;
45import org.onosproject.net.group.GroupBuckets;
46import org.onosproject.net.group.GroupDescription;
47import org.onosproject.net.group.GroupEvent;
48import org.onosproject.net.group.GroupEvent.Type;
49import org.onosproject.net.group.GroupKey;
50import org.onosproject.net.group.GroupOperation;
51import org.onosproject.net.group.GroupStore;
52import org.onosproject.net.group.GroupStoreDelegate;
Srikanth Vavilapalli10e75cd2015-04-13 16:21:24 -070053import org.onosproject.net.group.StoredGroupBucketEntry;
alshabib10580802015-02-18 18:30:33 -080054import org.onosproject.net.group.StoredGroupEntry;
55import org.onosproject.store.AbstractStore;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070056import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
Jonathan Hart63939a32015-05-08 11:57:03 -070057import org.onosproject.store.service.MultiValuedTimestamp;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070058import org.onosproject.store.serializers.KryoNamespaces;
Jonathan Hart6ec029a2015-03-24 17:12:35 -070059import org.onosproject.store.service.EventuallyConsistentMap;
60import org.onosproject.store.service.EventuallyConsistentMapBuilder;
61import org.onosproject.store.service.EventuallyConsistentMapEvent;
62import org.onosproject.store.service.EventuallyConsistentMapListener;
63import org.onosproject.store.service.StorageService;
alshabib10580802015-02-18 18:30:33 -080064import org.slf4j.Logger;
65
Jonathan Hart6ec029a2015-03-24 17:12:35 -070066import java.util.ArrayList;
Srikanth Vavilapalli23181912015-05-04 09:48:09 -070067import java.util.Collection;
Charles Chanf4838a72015-12-07 18:13:45 -080068import java.util.Collections;
Jonathan Hart6ec029a2015-03-24 17:12:35 -070069import java.util.HashMap;
Charles Chan0c7c43b2016-01-14 17:39:20 -080070import java.util.HashSet;
Srikanth Vavilapalli23181912015-05-04 09:48:09 -070071import java.util.Iterator;
Jonathan Hart6ec029a2015-03-24 17:12:35 -070072import java.util.List;
Charles Chan0c7c43b2016-01-14 17:39:20 -080073import java.util.Map.Entry;
Jonathan Hart6ec029a2015-03-24 17:12:35 -070074import java.util.Objects;
Sho SHIMIZU30d639b2015-05-05 09:30:35 -070075import java.util.Optional;
Srikanth Vavilapalli23181912015-05-04 09:48:09 -070076import java.util.Set;
Jonathan Hart6ec029a2015-03-24 17:12:35 -070077import java.util.concurrent.ConcurrentHashMap;
78import java.util.concurrent.ConcurrentMap;
79import java.util.concurrent.ExecutorService;
80import java.util.concurrent.Executors;
81import java.util.concurrent.atomic.AtomicInteger;
82import java.util.concurrent.atomic.AtomicLong;
83import java.util.stream.Collectors;
84
85import static org.apache.commons.lang3.concurrent.ConcurrentUtils.createIfAbsentUnchecked;
86import static org.onlab.util.Tools.groupedThreads;
87import static org.slf4j.LoggerFactory.getLogger;
alshabib10580802015-02-18 18:30:33 -080088
89/**
90 * Manages inventory of group entries using trivial in-memory implementation.
91 */
92@Component(immediate = true)
93@Service
94public class DistributedGroupStore
95 extends AbstractStore<GroupEvent, GroupStoreDelegate>
96 implements GroupStore {
97
98 private final Logger log = getLogger(getClass());
99
100 private final int dummyId = 0xffffffff;
101 private final GroupId dummyGroupId = new DefaultGroupId(dummyId);
102
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700103 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
104 protected ClusterCommunicationService clusterCommunicator;
105
106 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
107 protected ClusterService clusterService;
108
109 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700110 protected StorageService storageService;
111
112 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700113 protected MastershipService mastershipService;
114
115 // Per device group table with (device id + app cookie) as key
116 private EventuallyConsistentMap<GroupStoreKeyMapKey,
117 StoredGroupEntry> groupStoreEntriesByKey = null;
118 // Per device group table with (device id + group id) as key
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700119 private final ConcurrentMap<DeviceId, ConcurrentMap<GroupId, StoredGroupEntry>>
120 groupEntriesById = new ConcurrentHashMap<>();
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700121 private EventuallyConsistentMap<GroupStoreKeyMapKey,
122 StoredGroupEntry> auditPendingReqQueue = null;
alshabib10580802015-02-18 18:30:33 -0800123 private final ConcurrentMap<DeviceId, ConcurrentMap<GroupId, Group>>
124 extraneousGroupEntriesById = new ConcurrentHashMap<>();
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700125 private ExecutorService messageHandlingExecutor;
126 private static final int MESSAGE_HANDLER_THREAD_POOL_SIZE = 1;
alshabib10580802015-02-18 18:30:33 -0800127
Sho SHIMIZU7a4087b2015-09-10 09:23:16 -0700128 private final HashMap<DeviceId, Boolean> deviceAuditStatus = new HashMap<>();
alshabib10580802015-02-18 18:30:33 -0800129
130 private final AtomicInteger groupIdGen = new AtomicInteger();
131
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700132 private KryoNamespace.Builder kryoBuilder = null;
133
Madan Jampanibcf1a482015-06-24 19:05:56 -0700134 private final AtomicLong sequenceNumber = new AtomicLong(0);
135
HIGUCHI Yuta180d70f2015-10-01 16:13:56 -0700136 private KryoNamespace clusterMsgSerializer;
137
alshabib10580802015-02-18 18:30:33 -0800138 @Activate
139 public void activate() {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700140 kryoBuilder = new KryoNamespace.Builder()
Charles Chan138cd5a2015-09-29 16:57:41 -0700141 .register(KryoNamespaces.API)
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700142 .register(DefaultGroup.class,
143 DefaultGroupBucket.class,
144 DefaultGroupDescription.class,
145 DefaultGroupKey.class,
146 GroupDescription.Type.class,
147 Group.GroupState.class,
148 GroupBuckets.class,
149 DefaultGroupId.class,
150 GroupStoreMessage.class,
151 GroupStoreMessage.Type.class,
152 UpdateType.class,
153 GroupStoreMessageSubjects.class,
154 MultiValuedTimestamp.class,
155 GroupStoreKeyMapKey.class,
156 GroupStoreIdMapKey.class,
157 GroupStoreMapKey.class
Charles Chan138cd5a2015-09-29 16:57:41 -0700158 );
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700159
HIGUCHI Yuta180d70f2015-10-01 16:13:56 -0700160 clusterMsgSerializer = kryoBuilder.build();
161
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700162 messageHandlingExecutor = Executors.
163 newFixedThreadPool(MESSAGE_HANDLER_THREAD_POOL_SIZE,
164 groupedThreads("onos/store/group",
165 "message-handlers"));
Madan Jampani01e05fb2015-08-13 13:29:36 -0700166
167 clusterCommunicator.addSubscriber(GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST,
HIGUCHI Yuta180d70f2015-10-01 16:13:56 -0700168 clusterMsgSerializer::deserialize,
Madan Jampani01e05fb2015-08-13 13:29:36 -0700169 this::process,
170 messageHandlingExecutor);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700171
172 log.debug("Creating EC map groupstorekeymap");
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700173 EventuallyConsistentMapBuilder<GroupStoreKeyMapKey, StoredGroupEntry>
174 keyMapBuilder = storageService.eventuallyConsistentMapBuilder();
175
176 groupStoreEntriesByKey = keyMapBuilder
177 .withName("groupstorekeymap")
178 .withSerializer(kryoBuilder)
Madan Jampanibcf1a482015-06-24 19:05:56 -0700179 .withTimestampProvider((k, v) -> new MultiValuedTimestamp<>(System.currentTimeMillis(),
180 sequenceNumber.getAndIncrement()))
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700181 .build();
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700182 groupStoreEntriesByKey.addListener(new GroupStoreKeyMapListener());
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700183 log.debug("Current size of groupstorekeymap:{}",
184 groupStoreEntriesByKey.size());
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700185
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700186 log.debug("Creating EC map pendinggroupkeymap");
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700187 EventuallyConsistentMapBuilder<GroupStoreKeyMapKey, StoredGroupEntry>
188 auditMapBuilder = storageService.eventuallyConsistentMapBuilder();
189
190 auditPendingReqQueue = auditMapBuilder
191 .withName("pendinggroupkeymap")
192 .withSerializer(kryoBuilder)
Madan Jampanibcf1a482015-06-24 19:05:56 -0700193 .withTimestampProvider((k, v) -> new MultiValuedTimestamp<>(System.currentTimeMillis(),
194 sequenceNumber.getAndIncrement()))
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700195 .build();
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700196 log.debug("Current size of pendinggroupkeymap:{}",
197 auditPendingReqQueue.size());
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700198
alshabib10580802015-02-18 18:30:33 -0800199 log.info("Started");
200 }
201
202 @Deactivate
203 public void deactivate() {
HIGUCHI Yuta180d70f2015-10-01 16:13:56 -0700204 clusterCommunicator.removeSubscriber(GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST);
Thomas Vachuska152f9fd2015-04-02 16:28:13 -0700205 groupStoreEntriesByKey.destroy();
Thomas Vachuska152f9fd2015-04-02 16:28:13 -0700206 auditPendingReqQueue.destroy();
alshabib10580802015-02-18 18:30:33 -0800207 log.info("Stopped");
208 }
209
alshabib10580802015-02-18 18:30:33 -0800210 private static NewConcurrentHashMap<GroupId, Group>
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700211 lazyEmptyExtraneousGroupIdTable() {
alshabib10580802015-02-18 18:30:33 -0800212 return NewConcurrentHashMap.<GroupId, Group>ifNeeded();
213 }
214
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700215 private static NewConcurrentHashMap<GroupId, StoredGroupEntry>
216 lazyEmptyGroupIdTable() {
217 return NewConcurrentHashMap.<GroupId, StoredGroupEntry>ifNeeded();
218 }
219
alshabib10580802015-02-18 18:30:33 -0800220 /**
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700221 * Returns the group store eventual consistent key map.
alshabib10580802015-02-18 18:30:33 -0800222 *
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700223 * @return Map representing group key table.
alshabib10580802015-02-18 18:30:33 -0800224 */
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700225 private EventuallyConsistentMap<GroupStoreKeyMapKey, StoredGroupEntry>
226 getGroupStoreKeyMap() {
227 return groupStoreEntriesByKey;
alshabib10580802015-02-18 18:30:33 -0800228 }
229
230 /**
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700231 * Returns the group id table for specified device.
alshabib10580802015-02-18 18:30:33 -0800232 *
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700233 * @param deviceId identifier of the device
234 * @return Map representing group key table of given device.
alshabib10580802015-02-18 18:30:33 -0800235 */
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700236 private ConcurrentMap<GroupId, StoredGroupEntry> getGroupIdTable(DeviceId deviceId) {
237 return createIfAbsentUnchecked(groupEntriesById,
238 deviceId, lazyEmptyGroupIdTable());
alshabib10580802015-02-18 18:30:33 -0800239 }
240
241 /**
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700242 * Returns the pending group request table.
alshabib10580802015-02-18 18:30:33 -0800243 *
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700244 * @return Map representing group key table.
alshabib10580802015-02-18 18:30:33 -0800245 */
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700246 private EventuallyConsistentMap<GroupStoreKeyMapKey, StoredGroupEntry>
247 getPendingGroupKeyTable() {
248 return auditPendingReqQueue;
alshabib10580802015-02-18 18:30:33 -0800249 }
250
251 /**
252 * Returns the extraneous group id table for specified device.
253 *
254 * @param deviceId identifier of the device
255 * @return Map representing group key table of given device.
256 */
257 private ConcurrentMap<GroupId, Group>
258 getExtraneousGroupIdTable(DeviceId deviceId) {
259 return createIfAbsentUnchecked(extraneousGroupEntriesById,
260 deviceId,
261 lazyEmptyExtraneousGroupIdTable());
262 }
263
264 /**
265 * Returns the number of groups for the specified device in the store.
266 *
267 * @return number of groups for the specified device
268 */
269 @Override
270 public int getGroupCount(DeviceId deviceId) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700271 return (getGroups(deviceId) != null) ?
272 Iterables.size(getGroups(deviceId)) : 0;
alshabib10580802015-02-18 18:30:33 -0800273 }
274
275 /**
276 * Returns the groups associated with a device.
277 *
278 * @param deviceId the device ID
279 *
280 * @return the group entries
281 */
282 @Override
283 public Iterable<Group> getGroups(DeviceId deviceId) {
Charles Chanf4838a72015-12-07 18:13:45 -0800284 // Let ImmutableSet.copyOf do the type conversion
285 return ImmutableSet.copyOf(getStoredGroups(deviceId));
alshabib10580802015-02-18 18:30:33 -0800286 }
287
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700288 private Iterable<StoredGroupEntry> getStoredGroups(DeviceId deviceId) {
Charles Chanf4838a72015-12-07 18:13:45 -0800289 NodeId master = mastershipService.getMasterFor(deviceId);
290 if (master == null) {
291 log.debug("Failed to getGroups: No master for {}", deviceId);
292 return Collections.emptySet();
293 }
294
295 Set<StoredGroupEntry> storedGroups = getGroupStoreKeyMap().values()
296 .stream()
297 .filter(input -> input.deviceId().equals(deviceId))
298 .collect(Collectors.toSet());
299 return ImmutableSet.copyOf(storedGroups);
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700300 }
301
alshabib10580802015-02-18 18:30:33 -0800302 /**
303 * Returns the stored group entry.
304 *
305 * @param deviceId the device ID
306 * @param appCookie the group key
307 *
308 * @return a group associated with the key
309 */
310 @Override
311 public Group getGroup(DeviceId deviceId, GroupKey appCookie) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700312 return getStoredGroupEntry(deviceId, appCookie);
313 }
314
315 private StoredGroupEntry getStoredGroupEntry(DeviceId deviceId,
316 GroupKey appCookie) {
317 return getGroupStoreKeyMap().get(new GroupStoreKeyMapKey(deviceId,
318 appCookie));
319 }
320
321 @Override
322 public Group getGroup(DeviceId deviceId, GroupId groupId) {
323 return getStoredGroupEntry(deviceId, groupId);
324 }
325
326 private StoredGroupEntry getStoredGroupEntry(DeviceId deviceId,
327 GroupId groupId) {
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700328 return getGroupIdTable(deviceId).get(groupId);
alshabib10580802015-02-18 18:30:33 -0800329 }
330
331 private int getFreeGroupIdValue(DeviceId deviceId) {
332 int freeId = groupIdGen.incrementAndGet();
333
334 while (true) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700335 Group existing = getGroup(deviceId, new DefaultGroupId(freeId));
alshabib10580802015-02-18 18:30:33 -0800336 if (existing == null) {
337 existing = (
338 extraneousGroupEntriesById.get(deviceId) != null) ?
339 extraneousGroupEntriesById.get(deviceId).
340 get(new DefaultGroupId(freeId)) :
341 null;
342 }
343 if (existing != null) {
344 freeId = groupIdGen.incrementAndGet();
345 } else {
346 break;
347 }
348 }
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700349 log.debug("getFreeGroupIdValue: Next Free ID is {}", freeId);
alshabib10580802015-02-18 18:30:33 -0800350 return freeId;
351 }
352
353 /**
354 * Stores a new group entry using the information from group description.
355 *
356 * @param groupDesc group description to be used to create group entry
357 */
358 @Override
359 public void storeGroupDescription(GroupDescription groupDesc) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700360 log.debug("In storeGroupDescription");
alshabib10580802015-02-18 18:30:33 -0800361 // Check if a group is existing with the same key
Saurav Das8a0732e2015-11-20 15:27:53 -0800362 Group existingGroup = getGroup(groupDesc.deviceId(), groupDesc.appCookie());
363 if (existingGroup != null) {
Saurav Das4ce45962015-11-24 23:21:05 -0800364 log.warn("Group already exists with the same key {} in dev:{} with id:0x{}",
Saurav Das8a0732e2015-11-20 15:27:53 -0800365 groupDesc.appCookie(), groupDesc.deviceId(),
366 Integer.toHexString(existingGroup.id().id()));
alshabib10580802015-02-18 18:30:33 -0800367 return;
368 }
369
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700370 // Check if group to be created by a remote instance
Madan Jampani175e8fd2015-05-20 14:10:45 -0700371 if (mastershipService.getLocalRole(groupDesc.deviceId()) != MastershipRole.MASTER) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700372 log.debug("storeGroupDescription: Device {} local role is not MASTER",
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700373 groupDesc.deviceId());
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700374 if (mastershipService.getMasterFor(groupDesc.deviceId()) == null) {
375 log.error("No Master for device {}..."
376 + "Can not perform add group operation",
377 groupDesc.deviceId());
378 //TODO: Send Group operation failure event
379 return;
380 }
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700381 GroupStoreMessage groupOp = GroupStoreMessage.
382 createGroupAddRequestMsg(groupDesc.deviceId(),
383 groupDesc);
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700384
Madan Jampani175e8fd2015-05-20 14:10:45 -0700385 clusterCommunicator.unicast(groupOp,
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700386 GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST,
HIGUCHI Yuta180d70f2015-10-01 16:13:56 -0700387 clusterMsgSerializer::serialize,
Madan Jampani175e8fd2015-05-20 14:10:45 -0700388 mastershipService.getMasterFor(groupDesc.deviceId())).whenComplete((result, error) -> {
389 if (error != null) {
390 log.warn("Failed to send request to master: {} to {}",
391 groupOp,
392 mastershipService.getMasterFor(groupDesc.deviceId()));
393 //TODO: Send Group operation failure event
394 } else {
395 log.debug("Sent Group operation request for device {} "
396 + "to remote MASTER {}",
397 groupDesc.deviceId(),
398 mastershipService.getMasterFor(groupDesc.deviceId()));
399 }
400 });
alshabib10580802015-02-18 18:30:33 -0800401 return;
402 }
403
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700404 log.debug("Store group for device {} is getting handled locally",
405 groupDesc.deviceId());
alshabib10580802015-02-18 18:30:33 -0800406 storeGroupDescriptionInternal(groupDesc);
407 }
408
Srikanth Vavilapallie48b3cf2015-07-06 11:43:07 -0700409 private Group getMatchingExtraneousGroupbyId(DeviceId deviceId, Integer groupId) {
410 ConcurrentMap<GroupId, Group> extraneousMap =
411 extraneousGroupEntriesById.get(deviceId);
412 if (extraneousMap == null) {
413 return null;
414 }
415 return extraneousMap.get(new DefaultGroupId(groupId));
416 }
417
418 private Group getMatchingExtraneousGroupbyBuckets(DeviceId deviceId,
419 GroupBuckets buckets) {
420 ConcurrentMap<GroupId, Group> extraneousMap =
421 extraneousGroupEntriesById.get(deviceId);
422 if (extraneousMap == null) {
423 return null;
424 }
425
426 for (Group extraneousGroup:extraneousMap.values()) {
427 if (extraneousGroup.buckets().equals(buckets)) {
428 return extraneousGroup;
429 }
430 }
431 return null;
432 }
433
alshabib10580802015-02-18 18:30:33 -0800434 private void storeGroupDescriptionInternal(GroupDescription groupDesc) {
435 // Check if a group is existing with the same key
436 if (getGroup(groupDesc.deviceId(), groupDesc.appCookie()) != null) {
437 return;
438 }
439
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700440 if (deviceAuditStatus.get(groupDesc.deviceId()) == null) {
441 // Device group audit has not completed yet
442 // Add this group description to pending group key table
443 // Create a group entry object with Dummy Group ID
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700444 log.debug("storeGroupDescriptionInternal: Device {} AUDIT pending...Queuing Group ADD request",
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700445 groupDesc.deviceId());
446 StoredGroupEntry group = new DefaultGroup(dummyGroupId, groupDesc);
447 group.setState(GroupState.WAITING_AUDIT_COMPLETE);
448 EventuallyConsistentMap<GroupStoreKeyMapKey, StoredGroupEntry> pendingKeyTable =
449 getPendingGroupKeyTable();
450 pendingKeyTable.put(new GroupStoreKeyMapKey(groupDesc.deviceId(),
451 groupDesc.appCookie()),
452 group);
453 return;
454 }
455
Srikanth Vavilapallie48b3cf2015-07-06 11:43:07 -0700456 Group matchingExtraneousGroup = null;
457 if (groupDesc.givenGroupId() != null) {
458 //Check if there is a extraneous group existing with the same Id
459 matchingExtraneousGroup = getMatchingExtraneousGroupbyId(
460 groupDesc.deviceId(), groupDesc.givenGroupId());
461 if (matchingExtraneousGroup != null) {
462 log.debug("storeGroupDescriptionInternal: Matching extraneous group found in Device {} for group id {}",
463 groupDesc.deviceId(),
464 groupDesc.givenGroupId());
465 //Check if the group buckets matches with user provided buckets
466 if (matchingExtraneousGroup.buckets().equals(groupDesc.buckets())) {
467 //Group is already existing with the same buckets and Id
468 // Create a group entry object
469 log.debug("storeGroupDescriptionInternal: Buckets also matching in Device {} for group id {}",
470 groupDesc.deviceId(),
471 groupDesc.givenGroupId());
472 StoredGroupEntry group = new DefaultGroup(
473 matchingExtraneousGroup.id(), groupDesc);
474 // Insert the newly created group entry into key and id maps
475 getGroupStoreKeyMap().
476 put(new GroupStoreKeyMapKey(groupDesc.deviceId(),
477 groupDesc.appCookie()), group);
478 // Ensure it also inserted into group id based table to
479 // avoid any chances of duplication in group id generation
480 getGroupIdTable(groupDesc.deviceId()).
481 put(matchingExtraneousGroup.id(), group);
482 addOrUpdateGroupEntry(matchingExtraneousGroup);
483 removeExtraneousGroupEntry(matchingExtraneousGroup);
484 return;
485 } else {
486 //Group buckets are not matching. Update group
487 //with user provided buckets.
488 //TODO
489 log.debug("storeGroupDescriptionInternal: Buckets are not matching in Device {} for group id {}",
490 groupDesc.deviceId(),
491 groupDesc.givenGroupId());
492 }
493 }
494 } else {
495 //Check if there is an extraneous group with user provided buckets
496 matchingExtraneousGroup = getMatchingExtraneousGroupbyBuckets(
497 groupDesc.deviceId(), groupDesc.buckets());
498 if (matchingExtraneousGroup != null) {
499 //Group is already existing with the same buckets.
500 //So reuse this group.
501 log.debug("storeGroupDescriptionInternal: Matching extraneous group found in Device {}",
502 groupDesc.deviceId());
503 //Create a group entry object
504 StoredGroupEntry group = new DefaultGroup(
505 matchingExtraneousGroup.id(), groupDesc);
506 // Insert the newly created group entry into key and id maps
507 getGroupStoreKeyMap().
508 put(new GroupStoreKeyMapKey(groupDesc.deviceId(),
509 groupDesc.appCookie()), group);
510 // Ensure it also inserted into group id based table to
511 // avoid any chances of duplication in group id generation
512 getGroupIdTable(groupDesc.deviceId()).
513 put(matchingExtraneousGroup.id(), group);
514 addOrUpdateGroupEntry(matchingExtraneousGroup);
515 removeExtraneousGroupEntry(matchingExtraneousGroup);
516 return;
517 } else {
518 //TODO: Check if there are any empty groups that can be used here
519 log.debug("storeGroupDescriptionInternal: No matching extraneous groups found in Device {}",
520 groupDesc.deviceId());
521 }
522 }
523
Saurav Das100e3b82015-04-30 11:12:10 -0700524 GroupId id = null;
525 if (groupDesc.givenGroupId() == null) {
526 // Get a new group identifier
527 id = new DefaultGroupId(getFreeGroupIdValue(groupDesc.deviceId()));
528 } else {
529 id = new DefaultGroupId(groupDesc.givenGroupId());
530 }
alshabib10580802015-02-18 18:30:33 -0800531 // Create a group entry object
532 StoredGroupEntry group = new DefaultGroup(id, groupDesc);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700533 // Insert the newly created group entry into key and id maps
534 getGroupStoreKeyMap().
535 put(new GroupStoreKeyMapKey(groupDesc.deviceId(),
536 groupDesc.appCookie()), group);
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700537 // Ensure it also inserted into group id based table to
538 // avoid any chances of duplication in group id generation
539 getGroupIdTable(groupDesc.deviceId()).
540 put(id, group);
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700541 log.debug("storeGroupDescriptionInternal: Processing Group ADD request for Id {} in device {}",
542 id,
543 groupDesc.deviceId());
alshabib10580802015-02-18 18:30:33 -0800544 notifyDelegate(new GroupEvent(GroupEvent.Type.GROUP_ADD_REQUESTED,
545 group));
546 }
547
548 /**
549 * Updates the existing group entry with the information
550 * from group description.
551 *
552 * @param deviceId the device ID
553 * @param oldAppCookie the current group key
554 * @param type update type
555 * @param newBuckets group buckets for updates
556 * @param newAppCookie optional new group key
557 */
558 @Override
559 public void updateGroupDescription(DeviceId deviceId,
560 GroupKey oldAppCookie,
561 UpdateType type,
562 GroupBuckets newBuckets,
563 GroupKey newAppCookie) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700564 // Check if group update to be done by a remote instance
sangho52abe3a2015-05-05 14:13:34 -0700565 if (mastershipService.getMasterFor(deviceId) != null &&
566 mastershipService.getLocalRole(deviceId) != MastershipRole.MASTER) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700567 log.debug("updateGroupDescription: Device {} local role is not MASTER",
568 deviceId);
569 if (mastershipService.getMasterFor(deviceId) == null) {
570 log.error("No Master for device {}..."
571 + "Can not perform update group operation",
572 deviceId);
573 //TODO: Send Group operation failure event
574 return;
575 }
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700576 GroupStoreMessage groupOp = GroupStoreMessage.
577 createGroupUpdateRequestMsg(deviceId,
578 oldAppCookie,
579 type,
580 newBuckets,
581 newAppCookie);
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700582
Madan Jampani175e8fd2015-05-20 14:10:45 -0700583 clusterCommunicator.unicast(groupOp,
584 GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST,
HIGUCHI Yuta180d70f2015-10-01 16:13:56 -0700585 clusterMsgSerializer::serialize,
Madan Jampani175e8fd2015-05-20 14:10:45 -0700586 mastershipService.getMasterFor(deviceId)).whenComplete((result, error) -> {
587 if (error != null) {
588 log.warn("Failed to send request to master: {} to {}",
589 groupOp,
590 mastershipService.getMasterFor(deviceId), error);
591 }
592 //TODO: Send Group operation failure event
593 });
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700594 return;
595 }
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700596 log.debug("updateGroupDescription for device {} is getting handled locally",
597 deviceId);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700598 updateGroupDescriptionInternal(deviceId,
599 oldAppCookie,
600 type,
601 newBuckets,
602 newAppCookie);
603 }
604
605 private void updateGroupDescriptionInternal(DeviceId deviceId,
606 GroupKey oldAppCookie,
607 UpdateType type,
608 GroupBuckets newBuckets,
609 GroupKey newAppCookie) {
alshabib10580802015-02-18 18:30:33 -0800610 // Check if a group is existing with the provided key
611 Group oldGroup = getGroup(deviceId, oldAppCookie);
612 if (oldGroup == null) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700613 log.warn("updateGroupDescriptionInternal: Group not found...strange");
alshabib10580802015-02-18 18:30:33 -0800614 return;
615 }
616
617 List<GroupBucket> newBucketList = getUpdatedBucketList(oldGroup,
618 type,
619 newBuckets);
620 if (newBucketList != null) {
621 // Create a new group object from the old group
622 GroupBuckets updatedBuckets = new GroupBuckets(newBucketList);
623 GroupKey newCookie = (newAppCookie != null) ? newAppCookie : oldAppCookie;
624 GroupDescription updatedGroupDesc = new DefaultGroupDescription(
625 oldGroup.deviceId(),
626 oldGroup.type(),
627 updatedBuckets,
628 newCookie,
Saurav Das100e3b82015-04-30 11:12:10 -0700629 oldGroup.givenGroupId(),
alshabib10580802015-02-18 18:30:33 -0800630 oldGroup.appId());
631 StoredGroupEntry newGroup = new DefaultGroup(oldGroup.id(),
632 updatedGroupDesc);
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700633 log.debug("updateGroupDescriptionInternal: group entry {} in device {} moving from {} to PENDING_UPDATE",
634 oldGroup.id(),
635 oldGroup.deviceId(),
636 oldGroup.state());
alshabib10580802015-02-18 18:30:33 -0800637 newGroup.setState(GroupState.PENDING_UPDATE);
638 newGroup.setLife(oldGroup.life());
639 newGroup.setPackets(oldGroup.packets());
640 newGroup.setBytes(oldGroup.bytes());
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700641 //Update the group entry in groupkey based map.
642 //Update to groupid based map will happen in the
643 //groupkey based map update listener
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700644 log.debug("updateGroupDescriptionInternal with type {}: Group updated with buckets",
645 type);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700646 getGroupStoreKeyMap().
647 put(new GroupStoreKeyMapKey(newGroup.deviceId(),
648 newGroup.appCookie()), newGroup);
alshabib10580802015-02-18 18:30:33 -0800649 notifyDelegate(new GroupEvent(Type.GROUP_UPDATE_REQUESTED, newGroup));
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700650 } else {
651 log.warn("updateGroupDescriptionInternal with type {}: No "
652 + "change in the buckets in update", type);
alshabib10580802015-02-18 18:30:33 -0800653 }
654 }
655
656 private List<GroupBucket> getUpdatedBucketList(Group oldGroup,
657 UpdateType type,
658 GroupBuckets buckets) {
659 GroupBuckets oldBuckets = oldGroup.buckets();
Sho SHIMIZU7a4087b2015-09-10 09:23:16 -0700660 List<GroupBucket> newBucketList = new ArrayList<>(oldBuckets.buckets());
alshabib10580802015-02-18 18:30:33 -0800661 boolean groupDescUpdated = false;
662
663 if (type == UpdateType.ADD) {
664 // Check if the any of the new buckets are part of
665 // the old bucket list
666 for (GroupBucket addBucket:buckets.buckets()) {
667 if (!newBucketList.contains(addBucket)) {
668 newBucketList.add(addBucket);
669 groupDescUpdated = true;
670 }
671 }
672 } else if (type == UpdateType.REMOVE) {
673 // Check if the to be removed buckets are part of the
674 // old bucket list
675 for (GroupBucket removeBucket:buckets.buckets()) {
676 if (newBucketList.contains(removeBucket)) {
677 newBucketList.remove(removeBucket);
678 groupDescUpdated = true;
679 }
680 }
681 }
682
683 if (groupDescUpdated) {
684 return newBucketList;
685 } else {
686 return null;
687 }
688 }
689
690 /**
691 * Triggers deleting the existing group entry.
692 *
693 * @param deviceId the device ID
694 * @param appCookie the group key
695 */
696 @Override
697 public void deleteGroupDescription(DeviceId deviceId,
698 GroupKey appCookie) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700699 // Check if group to be deleted by a remote instance
700 if (mastershipService.
701 getLocalRole(deviceId) != MastershipRole.MASTER) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700702 log.debug("deleteGroupDescription: Device {} local role is not MASTER",
703 deviceId);
704 if (mastershipService.getMasterFor(deviceId) == null) {
705 log.error("No Master for device {}..."
706 + "Can not perform delete group operation",
707 deviceId);
708 //TODO: Send Group operation failure event
709 return;
710 }
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700711 GroupStoreMessage groupOp = GroupStoreMessage.
712 createGroupDeleteRequestMsg(deviceId,
713 appCookie);
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700714
Madan Jampani175e8fd2015-05-20 14:10:45 -0700715 clusterCommunicator.unicast(groupOp,
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700716 GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST,
HIGUCHI Yuta180d70f2015-10-01 16:13:56 -0700717 clusterMsgSerializer::serialize,
Madan Jampani175e8fd2015-05-20 14:10:45 -0700718 mastershipService.getMasterFor(deviceId)).whenComplete((result, error) -> {
719 if (error != null) {
720 log.warn("Failed to send request to master: {} to {}",
721 groupOp,
722 mastershipService.getMasterFor(deviceId), error);
723 }
724 //TODO: Send Group operation failure event
725 });
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700726 return;
727 }
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700728 log.debug("deleteGroupDescription in device {} is getting handled locally",
729 deviceId);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700730 deleteGroupDescriptionInternal(deviceId, appCookie);
731 }
732
733 private void deleteGroupDescriptionInternal(DeviceId deviceId,
734 GroupKey appCookie) {
alshabib10580802015-02-18 18:30:33 -0800735 // Check if a group is existing with the provided key
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700736 StoredGroupEntry existing = getStoredGroupEntry(deviceId, appCookie);
alshabib10580802015-02-18 18:30:33 -0800737 if (existing == null) {
738 return;
739 }
740
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700741 log.debug("deleteGroupDescriptionInternal: group entry {} in device {} moving from {} to PENDING_DELETE",
742 existing.id(),
743 existing.deviceId(),
744 existing.state());
alshabib10580802015-02-18 18:30:33 -0800745 synchronized (existing) {
746 existing.setState(GroupState.PENDING_DELETE);
747 }
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700748 log.debug("deleteGroupDescriptionInternal: in device {} issuing GROUP_REMOVE_REQUESTED",
749 deviceId);
alshabib10580802015-02-18 18:30:33 -0800750 notifyDelegate(new GroupEvent(Type.GROUP_REMOVE_REQUESTED, existing));
751 }
752
753 /**
754 * Stores a new group entry, or updates an existing entry.
755 *
756 * @param group group entry
757 */
758 @Override
759 public void addOrUpdateGroupEntry(Group group) {
760 // check if this new entry is an update to an existing entry
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700761 StoredGroupEntry existing = getStoredGroupEntry(group.deviceId(),
762 group.id());
alshabib10580802015-02-18 18:30:33 -0800763 GroupEvent event = null;
764
765 if (existing != null) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700766 log.debug("addOrUpdateGroupEntry: updating group entry {} in device {}",
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700767 group.id(),
768 group.deviceId());
alshabib10580802015-02-18 18:30:33 -0800769 synchronized (existing) {
Srikanth Vavilapalli10e75cd2015-04-13 16:21:24 -0700770 for (GroupBucket bucket:group.buckets().buckets()) {
Sho SHIMIZU30d639b2015-05-05 09:30:35 -0700771 Optional<GroupBucket> matchingBucket =
Srikanth Vavilapalli10e75cd2015-04-13 16:21:24 -0700772 existing.buckets().buckets()
773 .stream()
774 .filter((existingBucket)->(existingBucket.equals(bucket)))
775 .findFirst();
776 if (matchingBucket.isPresent()) {
777 ((StoredGroupBucketEntry) matchingBucket.
778 get()).setPackets(bucket.packets());
779 ((StoredGroupBucketEntry) matchingBucket.
780 get()).setBytes(bucket.bytes());
781 } else {
782 log.warn("addOrUpdateGroupEntry: No matching "
783 + "buckets to update stats");
784 }
785 }
alshabib10580802015-02-18 18:30:33 -0800786 existing.setLife(group.life());
787 existing.setPackets(group.packets());
788 existing.setBytes(group.bytes());
Srikanth Vavilapalli5428b6c2015-05-14 20:22:47 -0700789 if ((existing.state() == GroupState.PENDING_ADD) ||
790 (existing.state() == GroupState.PENDING_ADD_RETRY)) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700791 log.debug("addOrUpdateGroupEntry: group entry {} in device {} moving from {} to ADDED",
792 existing.id(),
793 existing.deviceId(),
Srikanth Vavilapalli5428b6c2015-05-14 20:22:47 -0700794 existing.state());
alshabib10580802015-02-18 18:30:33 -0800795 existing.setState(GroupState.ADDED);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700796 existing.setIsGroupStateAddedFirstTime(true);
alshabib10580802015-02-18 18:30:33 -0800797 event = new GroupEvent(Type.GROUP_ADDED, existing);
798 } else {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700799 log.debug("addOrUpdateGroupEntry: group entry {} in device {} moving from {} to ADDED",
800 existing.id(),
801 existing.deviceId(),
802 GroupState.PENDING_UPDATE);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700803 existing.setState(GroupState.ADDED);
804 existing.setIsGroupStateAddedFirstTime(false);
alshabib10580802015-02-18 18:30:33 -0800805 event = new GroupEvent(Type.GROUP_UPDATED, existing);
806 }
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700807 //Re-PUT map entries to trigger map update events
808 getGroupStoreKeyMap().
809 put(new GroupStoreKeyMapKey(existing.deviceId(),
810 existing.appCookie()), existing);
alshabib10580802015-02-18 18:30:33 -0800811 }
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700812 } else {
813 log.warn("addOrUpdateGroupEntry: Group update "
814 + "happening for a non-existing entry in the map");
alshabib10580802015-02-18 18:30:33 -0800815 }
816
817 if (event != null) {
818 notifyDelegate(event);
819 }
820 }
821
822 /**
823 * Removes the group entry from store.
824 *
825 * @param group group entry
826 */
827 @Override
828 public void removeGroupEntry(Group group) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700829 StoredGroupEntry existing = getStoredGroupEntry(group.deviceId(),
830 group.id());
alshabib10580802015-02-18 18:30:33 -0800831
832 if (existing != null) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700833 log.debug("removeGroupEntry: removing group entry {} in device {}",
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700834 group.id(),
835 group.deviceId());
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700836 //Removal from groupid based map will happen in the
837 //map update listener
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700838 getGroupStoreKeyMap().remove(new GroupStoreKeyMapKey(existing.deviceId(),
839 existing.appCookie()));
alshabib10580802015-02-18 18:30:33 -0800840 notifyDelegate(new GroupEvent(Type.GROUP_REMOVED, existing));
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700841 } else {
842 log.warn("removeGroupEntry for {} in device{} is "
843 + "not existing in our maps",
844 group.id(),
845 group.deviceId());
alshabib10580802015-02-18 18:30:33 -0800846 }
847 }
848
849 @Override
Charles Chan0c7c43b2016-01-14 17:39:20 -0800850 public void purgeGroupEntry(DeviceId deviceId) {
851 Set<Entry<GroupStoreKeyMapKey, StoredGroupEntry>> entryPendingRemove =
852 new HashSet<>();
853
854 groupStoreEntriesByKey.entrySet().stream()
855 .filter(entry -> entry.getKey().deviceId().equals(deviceId))
856 .forEach(entryPendingRemove::add);
857
858 entryPendingRemove.forEach(entry -> {
859 groupStoreEntriesByKey.remove(entry.getKey());
860 notifyDelegate(new GroupEvent(Type.GROUP_REMOVED, entry.getValue()));
861 });
862 }
863
864 @Override
alshabib10580802015-02-18 18:30:33 -0800865 public void deviceInitialAuditCompleted(DeviceId deviceId,
866 boolean completed) {
867 synchronized (deviceAuditStatus) {
868 if (completed) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700869 log.debug("AUDIT completed for device {}",
870 deviceId);
alshabib10580802015-02-18 18:30:33 -0800871 deviceAuditStatus.put(deviceId, true);
872 // Execute all pending group requests
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700873 List<StoredGroupEntry> pendingGroupRequests =
874 getPendingGroupKeyTable().values()
875 .stream()
876 .filter(g-> g.deviceId().equals(deviceId))
877 .collect(Collectors.toList());
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700878 log.debug("processing pending group add requests for device {} and number of pending requests {}",
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700879 deviceId,
880 pendingGroupRequests.size());
881 for (Group group:pendingGroupRequests) {
alshabib10580802015-02-18 18:30:33 -0800882 GroupDescription tmp = new DefaultGroupDescription(
883 group.deviceId(),
884 group.type(),
885 group.buckets(),
886 group.appCookie(),
Saurav Das100e3b82015-04-30 11:12:10 -0700887 group.givenGroupId(),
alshabib10580802015-02-18 18:30:33 -0800888 group.appId());
889 storeGroupDescriptionInternal(tmp);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700890 getPendingGroupKeyTable().
891 remove(new GroupStoreKeyMapKey(deviceId, group.appCookie()));
alshabib10580802015-02-18 18:30:33 -0800892 }
alshabib10580802015-02-18 18:30:33 -0800893 } else {
Thomas Vachuskac40d4632015-04-09 16:55:03 -0700894 Boolean audited = deviceAuditStatus.get(deviceId);
895 if (audited != null && audited) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700896 log.debug("Clearing AUDIT status for device {}", deviceId);
alshabib10580802015-02-18 18:30:33 -0800897 deviceAuditStatus.put(deviceId, false);
898 }
899 }
900 }
901 }
902
903 @Override
904 public boolean deviceInitialAuditStatus(DeviceId deviceId) {
905 synchronized (deviceAuditStatus) {
Thomas Vachuskac40d4632015-04-09 16:55:03 -0700906 Boolean audited = deviceAuditStatus.get(deviceId);
907 return audited != null && audited;
alshabib10580802015-02-18 18:30:33 -0800908 }
909 }
910
911 @Override
912 public void groupOperationFailed(DeviceId deviceId, GroupOperation operation) {
913
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700914 StoredGroupEntry existing = getStoredGroupEntry(deviceId,
915 operation.groupId());
alshabib10580802015-02-18 18:30:33 -0800916
917 if (existing == null) {
918 log.warn("No group entry with ID {} found ", operation.groupId());
919 return;
920 }
921
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700922 log.warn("groupOperationFailed: group operation {} failed"
923 + "for group {} in device {}",
924 operation.opType(),
925 existing.id(),
926 existing.deviceId());
alshabib10580802015-02-18 18:30:33 -0800927 switch (operation.opType()) {
928 case ADD:
Srikanth Vavilapalli5428b6c2015-05-14 20:22:47 -0700929 if (existing.state() == GroupState.PENDING_ADD) {
930 //TODO: Need to add support for passing the group
931 //operation failure reason from group provider.
932 //If the error type is anything other than GROUP_EXISTS,
933 //then the GROUP_ADD_FAILED event should be raised even
934 //in PENDING_ADD_RETRY state also.
935 notifyDelegate(new GroupEvent(Type.GROUP_ADD_FAILED, existing));
936 log.warn("groupOperationFailed: cleaningup "
937 + "group {} from store in device {}....",
938 existing.id(),
939 existing.deviceId());
940 //Removal from groupid based map will happen in the
941 //map update listener
942 getGroupStoreKeyMap().remove(new GroupStoreKeyMapKey(existing.deviceId(),
943 existing.appCookie()));
944 }
alshabib10580802015-02-18 18:30:33 -0800945 break;
946 case MODIFY:
947 notifyDelegate(new GroupEvent(Type.GROUP_UPDATE_FAILED, existing));
948 break;
949 case DELETE:
950 notifyDelegate(new GroupEvent(Type.GROUP_REMOVE_FAILED, existing));
951 break;
952 default:
953 log.warn("Unknown group operation type {}", operation.opType());
954 }
alshabib10580802015-02-18 18:30:33 -0800955 }
956
957 @Override
958 public void addOrUpdateExtraneousGroupEntry(Group group) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700959 log.debug("add/update extraneous group entry {} in device {}",
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700960 group.id(),
961 group.deviceId());
alshabib10580802015-02-18 18:30:33 -0800962 ConcurrentMap<GroupId, Group> extraneousIdTable =
963 getExtraneousGroupIdTable(group.deviceId());
964 extraneousIdTable.put(group.id(), group);
Srikanth Vavilapallie48b3cf2015-07-06 11:43:07 -0700965 // Don't remove the extraneous groups, instead re-use it when
966 // a group request comes with the same set of buckets
alshabib10580802015-02-18 18:30:33 -0800967 }
968
969 @Override
970 public void removeExtraneousGroupEntry(Group group) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700971 log.debug("remove extraneous group entry {} of device {} from store",
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700972 group.id(),
973 group.deviceId());
alshabib10580802015-02-18 18:30:33 -0800974 ConcurrentMap<GroupId, Group> extraneousIdTable =
975 getExtraneousGroupIdTable(group.deviceId());
976 extraneousIdTable.remove(group.id());
977 }
978
979 @Override
980 public Iterable<Group> getExtraneousGroups(DeviceId deviceId) {
981 // flatten and make iterator unmodifiable
982 return FluentIterable.from(
983 getExtraneousGroupIdTable(deviceId).values());
984 }
985
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700986 /**
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700987 * Map handler to receive any events when the group key map is updated.
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700988 */
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700989 private class GroupStoreKeyMapListener implements
990 EventuallyConsistentMapListener<GroupStoreKeyMapKey, StoredGroupEntry> {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700991
992 @Override
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700993 public void event(EventuallyConsistentMapEvent<GroupStoreKeyMapKey,
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700994 StoredGroupEntry> mapEvent) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700995 GroupEvent groupEvent = null;
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700996 GroupStoreKeyMapKey key = mapEvent.key();
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700997 StoredGroupEntry group = mapEvent.value();
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700998 if ((key == null) && (group == null)) {
999 log.error("GroupStoreKeyMapListener: Received "
1000 + "event {} with null entry", mapEvent.type());
1001 return;
1002 } else if (group == null) {
1003 group = getGroupIdTable(key.deviceId()).values()
1004 .stream()
1005 .filter((storedGroup) -> (storedGroup.appCookie().equals(key.appCookie)))
1006 .findFirst().get();
1007 if (group == null) {
1008 log.error("GroupStoreKeyMapListener: Received "
1009 + "event {} with null entry... can not process", mapEvent.type());
1010 return;
1011 }
1012 }
1013 log.trace("received groupid map event {} for id {} in device {}",
1014 mapEvent.type(),
1015 group.id(),
1016 key.deviceId());
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001017 if (mapEvent.type() == EventuallyConsistentMapEvent.Type.PUT) {
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -07001018 // Update the group ID table
1019 getGroupIdTable(group.deviceId()).put(group.id(), group);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001020 if (mapEvent.value().state() == Group.GroupState.ADDED) {
1021 if (mapEvent.value().isGroupStateAddedFirstTime()) {
1022 groupEvent = new GroupEvent(Type.GROUP_ADDED,
1023 mapEvent.value());
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001024 log.trace("Received first time GROUP_ADDED state update for id {} in device {}",
1025 group.id(),
1026 group.deviceId());
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001027 } else {
1028 groupEvent = new GroupEvent(Type.GROUP_UPDATED,
1029 mapEvent.value());
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001030 log.trace("Received following GROUP_ADDED state update for id {} in device {}",
1031 group.id(),
1032 group.deviceId());
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001033 }
1034 }
1035 } else if (mapEvent.type() == EventuallyConsistentMapEvent.Type.REMOVE) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001036 groupEvent = new GroupEvent(Type.GROUP_REMOVED, group);
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -07001037 // Remove the entry from the group ID table
1038 getGroupIdTable(group.deviceId()).remove(group.id(), group);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001039 }
1040
1041 if (groupEvent != null) {
1042 notifyDelegate(groupEvent);
1043 }
1044 }
1045 }
Madan Jampani01e05fb2015-08-13 13:29:36 -07001046
1047 private void process(GroupStoreMessage groupOp) {
1048 log.debug("Received remote group operation {} request for device {}",
1049 groupOp.type(),
1050 groupOp.deviceId());
1051 if (!mastershipService.isLocalMaster(groupOp.deviceId())) {
1052 log.warn("This node is not MASTER for device {}", groupOp.deviceId());
1053 return;
1054 }
1055 if (groupOp.type() == GroupStoreMessage.Type.ADD) {
1056 storeGroupDescriptionInternal(groupOp.groupDesc());
1057 } else if (groupOp.type() == GroupStoreMessage.Type.UPDATE) {
1058 updateGroupDescriptionInternal(groupOp.deviceId(),
1059 groupOp.appCookie(),
1060 groupOp.updateType(),
1061 groupOp.updateBuckets(),
1062 groupOp.newAppCookie());
1063 } else if (groupOp.type() == GroupStoreMessage.Type.DELETE) {
1064 deleteGroupDescriptionInternal(groupOp.deviceId(),
1065 groupOp.appCookie());
1066 }
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001067 }
1068
1069 /**
1070 * Flattened map key to be used to store group entries.
1071 */
Ray Milkeyb3c5ce22015-08-10 09:07:36 -07001072 protected static class GroupStoreMapKey {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001073 private final DeviceId deviceId;
1074
1075 public GroupStoreMapKey(DeviceId deviceId) {
1076 this.deviceId = deviceId;
1077 }
1078
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001079 public DeviceId deviceId() {
1080 return deviceId;
1081 }
1082
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001083 @Override
1084 public boolean equals(Object o) {
1085 if (this == o) {
1086 return true;
1087 }
1088 if (!(o instanceof GroupStoreMapKey)) {
1089 return false;
1090 }
1091 GroupStoreMapKey that = (GroupStoreMapKey) o;
1092 return this.deviceId.equals(that.deviceId);
1093 }
1094
1095 @Override
1096 public int hashCode() {
1097 int result = 17;
1098
1099 result = 31 * result + Objects.hash(this.deviceId);
1100
1101 return result;
1102 }
1103 }
1104
Ray Milkeyb3c5ce22015-08-10 09:07:36 -07001105 protected static class GroupStoreKeyMapKey extends GroupStoreMapKey {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001106 private final GroupKey appCookie;
1107 public GroupStoreKeyMapKey(DeviceId deviceId,
1108 GroupKey appCookie) {
1109 super(deviceId);
1110 this.appCookie = appCookie;
1111 }
1112
1113 @Override
1114 public boolean equals(Object o) {
1115 if (this == o) {
1116 return true;
1117 }
1118 if (!(o instanceof GroupStoreKeyMapKey)) {
1119 return false;
1120 }
1121 GroupStoreKeyMapKey that = (GroupStoreKeyMapKey) o;
1122 return (super.equals(that) &&
1123 this.appCookie.equals(that.appCookie));
1124 }
1125
1126 @Override
1127 public int hashCode() {
1128 int result = 17;
1129
1130 result = 31 * result + super.hashCode() + Objects.hash(this.appCookie);
1131
1132 return result;
1133 }
1134 }
1135
Ray Milkeyb3c5ce22015-08-10 09:07:36 -07001136 protected static class GroupStoreIdMapKey extends GroupStoreMapKey {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001137 private final GroupId groupId;
1138 public GroupStoreIdMapKey(DeviceId deviceId,
1139 GroupId groupId) {
1140 super(deviceId);
1141 this.groupId = groupId;
1142 }
1143
1144 @Override
1145 public boolean equals(Object o) {
1146 if (this == o) {
1147 return true;
1148 }
1149 if (!(o instanceof GroupStoreIdMapKey)) {
1150 return false;
1151 }
1152 GroupStoreIdMapKey that = (GroupStoreIdMapKey) o;
1153 return (super.equals(that) &&
1154 this.groupId.equals(that.groupId));
1155 }
1156
1157 @Override
1158 public int hashCode() {
1159 int result = 17;
1160
1161 result = 31 * result + super.hashCode() + Objects.hash(this.groupId);
1162
1163 return result;
1164 }
1165 }
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001166
1167 @Override
1168 public void pushGroupMetrics(DeviceId deviceId,
1169 Collection<Group> groupEntries) {
1170 boolean deviceInitialAuditStatus =
1171 deviceInitialAuditStatus(deviceId);
1172 Set<Group> southboundGroupEntries =
1173 Sets.newHashSet(groupEntries);
1174 Set<StoredGroupEntry> storedGroupEntries =
1175 Sets.newHashSet(getStoredGroups(deviceId));
1176 Set<Group> extraneousStoredEntries =
1177 Sets.newHashSet(getExtraneousGroups(deviceId));
1178
1179 log.trace("pushGroupMetrics: Displaying all ({}) southboundGroupEntries for device {}",
1180 southboundGroupEntries.size(),
1181 deviceId);
1182 for (Iterator<Group> it = southboundGroupEntries.iterator(); it.hasNext();) {
1183 Group group = it.next();
1184 log.trace("Group {} in device {}", group, deviceId);
1185 }
1186
1187 log.trace("Displaying all ({}) stored group entries for device {}",
1188 storedGroupEntries.size(),
1189 deviceId);
1190 for (Iterator<StoredGroupEntry> it1 = storedGroupEntries.iterator();
1191 it1.hasNext();) {
1192 Group group = it1.next();
1193 log.trace("Stored Group {} for device {}", group, deviceId);
1194 }
1195
1196 for (Iterator<Group> it2 = southboundGroupEntries.iterator(); it2.hasNext();) {
1197 Group group = it2.next();
1198 if (storedGroupEntries.remove(group)) {
1199 // we both have the group, let's update some info then.
1200 log.trace("Group AUDIT: group {} exists in both planes for device {}",
1201 group.id(), deviceId);
1202 groupAdded(group);
1203 it2.remove();
1204 }
1205 }
1206 for (Group group : southboundGroupEntries) {
1207 if (getGroup(group.deviceId(), group.id()) != null) {
1208 // There is a group existing with the same id
1209 // It is possible that group update is
1210 // in progress while we got a stale info from switch
1211 if (!storedGroupEntries.remove(getGroup(
1212 group.deviceId(), group.id()))) {
1213 log.warn("Group AUDIT: Inconsistent state:"
1214 + "Group exists in ID based table while "
1215 + "not present in key based table");
1216 }
1217 } else {
1218 // there are groups in the switch that aren't in the store
1219 log.debug("Group AUDIT: extraneous group {} exists in data plane for device {}",
1220 group.id(), deviceId);
1221 extraneousStoredEntries.remove(group);
1222 extraneousGroup(group);
1223 }
1224 }
1225 for (Group group : storedGroupEntries) {
1226 // there are groups in the store that aren't in the switch
1227 log.debug("Group AUDIT: group {} missing in data plane for device {}",
1228 group.id(), deviceId);
1229 groupMissing(group);
1230 }
1231 for (Group group : extraneousStoredEntries) {
1232 // there are groups in the extraneous store that
1233 // aren't in the switch
1234 log.debug("Group AUDIT: clearing extransoeus group {} from store for device {}",
1235 group.id(), deviceId);
1236 removeExtraneousGroupEntry(group);
1237 }
1238
1239 if (!deviceInitialAuditStatus) {
1240 log.debug("Group AUDIT: Setting device {} initial AUDIT completed",
1241 deviceId);
1242 deviceInitialAuditCompleted(deviceId, true);
1243 }
1244 }
1245
1246 private void groupMissing(Group group) {
1247 switch (group.state()) {
1248 case PENDING_DELETE:
1249 log.debug("Group {} delete confirmation from device {}",
1250 group, group.deviceId());
1251 removeGroupEntry(group);
1252 break;
1253 case ADDED:
1254 case PENDING_ADD:
Srikanth Vavilapalli5428b6c2015-05-14 20:22:47 -07001255 case PENDING_ADD_RETRY:
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001256 case PENDING_UPDATE:
1257 log.debug("Group {} is in store but not on device {}",
1258 group, group.deviceId());
1259 StoredGroupEntry existing =
1260 getStoredGroupEntry(group.deviceId(), group.id());
Srikanth Vavilapalli5428b6c2015-05-14 20:22:47 -07001261 log.debug("groupMissing: group entry {} in device {} moving from {} to PENDING_ADD_RETRY",
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001262 existing.id(),
1263 existing.deviceId(),
1264 existing.state());
Srikanth Vavilapalli5428b6c2015-05-14 20:22:47 -07001265 existing.setState(Group.GroupState.PENDING_ADD_RETRY);
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001266 //Re-PUT map entries to trigger map update events
1267 getGroupStoreKeyMap().
1268 put(new GroupStoreKeyMapKey(existing.deviceId(),
1269 existing.appCookie()), existing);
1270 notifyDelegate(new GroupEvent(GroupEvent.Type.GROUP_ADD_REQUESTED,
1271 group));
1272 break;
1273 default:
1274 log.debug("Group {} has not been installed.", group);
1275 break;
1276 }
1277 }
1278
1279 private void extraneousGroup(Group group) {
1280 log.debug("Group {} is on device {} but not in store.",
1281 group, group.deviceId());
1282 addOrUpdateExtraneousGroupEntry(group);
1283 }
1284
1285 private void groupAdded(Group group) {
1286 log.trace("Group {} Added or Updated in device {}",
1287 group, group.deviceId());
1288 addOrUpdateGroupEntry(group);
1289 }
alshabib10580802015-02-18 18:30:33 -08001290}