blob: 250dfb9b83796b7dc224e69e3897effaf543350b [file] [log] [blame]
alshabib10580802015-02-18 18:30:33 -08001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.group.impl;
17
Jonathan Hart6ec029a2015-03-24 17:12:35 -070018import com.google.common.collect.FluentIterable;
Charles Chanf4838a72015-12-07 18:13:45 -080019import com.google.common.collect.ImmutableSet;
Jonathan Hart6ec029a2015-03-24 17:12:35 -070020import com.google.common.collect.Iterables;
Srikanth Vavilapalli23181912015-05-04 09:48:09 -070021import com.google.common.collect.Sets;
Srikanth Vavilapalli10e75cd2015-04-13 16:21:24 -070022
alshabib10580802015-02-18 18:30:33 -080023import org.apache.felix.scr.annotations.Activate;
24import org.apache.felix.scr.annotations.Component;
25import org.apache.felix.scr.annotations.Deactivate;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070026import org.apache.felix.scr.annotations.Reference;
27import org.apache.felix.scr.annotations.ReferenceCardinality;
alshabib10580802015-02-18 18:30:33 -080028import org.apache.felix.scr.annotations.Service;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070029import org.onlab.util.KryoNamespace;
alshabib10580802015-02-18 18:30:33 -080030import org.onlab.util.NewConcurrentHashMap;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070031import org.onosproject.cluster.ClusterService;
Charles Chanf4838a72015-12-07 18:13:45 -080032import org.onosproject.cluster.NodeId;
alshabib10580802015-02-18 18:30:33 -080033import org.onosproject.core.DefaultGroupId;
34import org.onosproject.core.GroupId;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070035import org.onosproject.mastership.MastershipService;
alshabib10580802015-02-18 18:30:33 -080036import org.onosproject.net.DeviceId;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070037import org.onosproject.net.MastershipRole;
alshabib10580802015-02-18 18:30:33 -080038import org.onosproject.net.group.DefaultGroup;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070039import org.onosproject.net.group.DefaultGroupBucket;
alshabib10580802015-02-18 18:30:33 -080040import org.onosproject.net.group.DefaultGroupDescription;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070041import org.onosproject.net.group.DefaultGroupKey;
alshabib10580802015-02-18 18:30:33 -080042import org.onosproject.net.group.Group;
43import org.onosproject.net.group.Group.GroupState;
44import org.onosproject.net.group.GroupBucket;
45import org.onosproject.net.group.GroupBuckets;
46import org.onosproject.net.group.GroupDescription;
47import org.onosproject.net.group.GroupEvent;
48import org.onosproject.net.group.GroupEvent.Type;
49import org.onosproject.net.group.GroupKey;
50import org.onosproject.net.group.GroupOperation;
51import org.onosproject.net.group.GroupStore;
52import org.onosproject.net.group.GroupStoreDelegate;
Srikanth Vavilapalli10e75cd2015-04-13 16:21:24 -070053import org.onosproject.net.group.StoredGroupBucketEntry;
alshabib10580802015-02-18 18:30:33 -080054import org.onosproject.net.group.StoredGroupEntry;
55import org.onosproject.store.AbstractStore;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070056import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
Jonathan Hart63939a32015-05-08 11:57:03 -070057import org.onosproject.store.service.MultiValuedTimestamp;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070058import org.onosproject.store.serializers.KryoNamespaces;
Madan Jampani0b847532016-03-03 13:44:15 -080059import org.onosproject.store.service.ConsistentMap;
60import org.onosproject.store.service.MapEvent;
61import org.onosproject.store.service.MapEventListener;
62import org.onosproject.store.service.Serializer;
Jonathan Hart6ec029a2015-03-24 17:12:35 -070063import org.onosproject.store.service.StorageService;
Madan Jampani0b847532016-03-03 13:44:15 -080064import org.onosproject.store.service.Versioned;
alshabib10580802015-02-18 18:30:33 -080065import org.slf4j.Logger;
66
Jonathan Hart6ec029a2015-03-24 17:12:35 -070067import java.util.ArrayList;
Srikanth Vavilapalli23181912015-05-04 09:48:09 -070068import java.util.Collection;
Charles Chanf4838a72015-12-07 18:13:45 -080069import java.util.Collections;
Jonathan Hart6ec029a2015-03-24 17:12:35 -070070import java.util.HashMap;
Charles Chan0c7c43b2016-01-14 17:39:20 -080071import java.util.HashSet;
Srikanth Vavilapalli23181912015-05-04 09:48:09 -070072import java.util.Iterator;
Jonathan Hart6ec029a2015-03-24 17:12:35 -070073import java.util.List;
Madan Jampani0b847532016-03-03 13:44:15 -080074import java.util.Map;
Charles Chan0c7c43b2016-01-14 17:39:20 -080075import java.util.Map.Entry;
Jonathan Hart6ec029a2015-03-24 17:12:35 -070076import java.util.Objects;
Sho SHIMIZU30d639b2015-05-05 09:30:35 -070077import java.util.Optional;
Srikanth Vavilapalli23181912015-05-04 09:48:09 -070078import java.util.Set;
Jonathan Hart6ec029a2015-03-24 17:12:35 -070079import java.util.concurrent.ConcurrentHashMap;
80import java.util.concurrent.ConcurrentMap;
81import java.util.concurrent.ExecutorService;
82import java.util.concurrent.Executors;
83import java.util.concurrent.atomic.AtomicInteger;
Jonathan Hart6ec029a2015-03-24 17:12:35 -070084import java.util.stream.Collectors;
85
86import static org.apache.commons.lang3.concurrent.ConcurrentUtils.createIfAbsentUnchecked;
87import static org.onlab.util.Tools.groupedThreads;
88import static org.slf4j.LoggerFactory.getLogger;
alshabib10580802015-02-18 18:30:33 -080089
90/**
Saurav Das0fd79d92016-03-07 10:58:36 -080091 * Manages inventory of group entries using distributed group stores from the
92 * storage service.
alshabib10580802015-02-18 18:30:33 -080093 */
94@Component(immediate = true)
95@Service
96public class DistributedGroupStore
97 extends AbstractStore<GroupEvent, GroupStoreDelegate>
98 implements GroupStore {
99
100 private final Logger log = getLogger(getClass());
101
102 private final int dummyId = 0xffffffff;
103 private final GroupId dummyGroupId = new DefaultGroupId(dummyId);
104
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700105 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
106 protected ClusterCommunicationService clusterCommunicator;
107
108 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
109 protected ClusterService clusterService;
110
111 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700112 protected StorageService storageService;
113
114 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700115 protected MastershipService mastershipService;
116
117 // Per device group table with (device id + app cookie) as key
Madan Jampani0b847532016-03-03 13:44:15 -0800118 private ConsistentMap<GroupStoreKeyMapKey,
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700119 StoredGroupEntry> groupStoreEntriesByKey = null;
120 // Per device group table with (device id + group id) as key
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700121 private final ConcurrentMap<DeviceId, ConcurrentMap<GroupId, StoredGroupEntry>>
122 groupEntriesById = new ConcurrentHashMap<>();
Madan Jampani0b847532016-03-03 13:44:15 -0800123 private ConsistentMap<GroupStoreKeyMapKey,
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700124 StoredGroupEntry> auditPendingReqQueue = null;
alshabib10580802015-02-18 18:30:33 -0800125 private final ConcurrentMap<DeviceId, ConcurrentMap<GroupId, Group>>
126 extraneousGroupEntriesById = new ConcurrentHashMap<>();
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700127 private ExecutorService messageHandlingExecutor;
128 private static final int MESSAGE_HANDLER_THREAD_POOL_SIZE = 1;
alshabib10580802015-02-18 18:30:33 -0800129
Sho SHIMIZU7a4087b2015-09-10 09:23:16 -0700130 private final HashMap<DeviceId, Boolean> deviceAuditStatus = new HashMap<>();
alshabib10580802015-02-18 18:30:33 -0800131
132 private final AtomicInteger groupIdGen = new AtomicInteger();
133
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700134 private KryoNamespace.Builder kryoBuilder = null;
135
HIGUCHI Yuta180d70f2015-10-01 16:13:56 -0700136 private KryoNamespace clusterMsgSerializer;
137
alshabib10580802015-02-18 18:30:33 -0800138 @Activate
139 public void activate() {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700140 kryoBuilder = new KryoNamespace.Builder()
Charles Chan138cd5a2015-09-29 16:57:41 -0700141 .register(KryoNamespaces.API)
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700142 .register(DefaultGroup.class,
143 DefaultGroupBucket.class,
144 DefaultGroupDescription.class,
145 DefaultGroupKey.class,
146 GroupDescription.Type.class,
147 Group.GroupState.class,
148 GroupBuckets.class,
149 DefaultGroupId.class,
150 GroupStoreMessage.class,
151 GroupStoreMessage.Type.class,
152 UpdateType.class,
153 GroupStoreMessageSubjects.class,
154 MultiValuedTimestamp.class,
155 GroupStoreKeyMapKey.class,
156 GroupStoreIdMapKey.class,
157 GroupStoreMapKey.class
Charles Chan138cd5a2015-09-29 16:57:41 -0700158 );
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700159
HIGUCHI Yuta180d70f2015-10-01 16:13:56 -0700160 clusterMsgSerializer = kryoBuilder.build();
161
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700162 messageHandlingExecutor = Executors.
163 newFixedThreadPool(MESSAGE_HANDLER_THREAD_POOL_SIZE,
164 groupedThreads("onos/store/group",
165 "message-handlers"));
Madan Jampani01e05fb2015-08-13 13:29:36 -0700166
167 clusterCommunicator.addSubscriber(GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST,
HIGUCHI Yuta180d70f2015-10-01 16:13:56 -0700168 clusterMsgSerializer::deserialize,
Madan Jampani01e05fb2015-08-13 13:29:36 -0700169 this::process,
170 messageHandlingExecutor);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700171
Madan Jampani0b847532016-03-03 13:44:15 -0800172 log.debug("Creating Consistent map onos-group-store-keymap");
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700173
Madan Jampani0b847532016-03-03 13:44:15 -0800174 groupStoreEntriesByKey = storageService.<GroupStoreKeyMapKey, StoredGroupEntry>consistentMapBuilder()
175 .withName("onos-group-store-keymap")
176 .withSerializer(Serializer.using(kryoBuilder.build()))
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700177 .build();
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700178 groupStoreEntriesByKey.addListener(new GroupStoreKeyMapListener());
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700179 log.debug("Current size of groupstorekeymap:{}",
180 groupStoreEntriesByKey.size());
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700181
Madan Jampani0b847532016-03-03 13:44:15 -0800182 log.debug("Creating Consistent map pendinggroupkeymap");
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700183
Madan Jampani0b847532016-03-03 13:44:15 -0800184 auditPendingReqQueue = storageService.<GroupStoreKeyMapKey, StoredGroupEntry>consistentMapBuilder()
185 .withName("onos-pending-group-keymap")
186 .withSerializer(Serializer.using(kryoBuilder.build()))
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700187 .build();
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700188 log.debug("Current size of pendinggroupkeymap:{}",
189 auditPendingReqQueue.size());
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700190
alshabib10580802015-02-18 18:30:33 -0800191 log.info("Started");
192 }
193
194 @Deactivate
195 public void deactivate() {
HIGUCHI Yuta180d70f2015-10-01 16:13:56 -0700196 clusterCommunicator.removeSubscriber(GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST);
Thomas Vachuska152f9fd2015-04-02 16:28:13 -0700197 groupStoreEntriesByKey.destroy();
Thomas Vachuska152f9fd2015-04-02 16:28:13 -0700198 auditPendingReqQueue.destroy();
alshabib10580802015-02-18 18:30:33 -0800199 log.info("Stopped");
200 }
201
alshabib10580802015-02-18 18:30:33 -0800202 private static NewConcurrentHashMap<GroupId, Group>
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700203 lazyEmptyExtraneousGroupIdTable() {
alshabib10580802015-02-18 18:30:33 -0800204 return NewConcurrentHashMap.<GroupId, Group>ifNeeded();
205 }
206
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700207 private static NewConcurrentHashMap<GroupId, StoredGroupEntry>
208 lazyEmptyGroupIdTable() {
209 return NewConcurrentHashMap.<GroupId, StoredGroupEntry>ifNeeded();
210 }
211
alshabib10580802015-02-18 18:30:33 -0800212 /**
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700213 * Returns the group store eventual consistent key map.
alshabib10580802015-02-18 18:30:33 -0800214 *
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700215 * @return Map representing group key table.
alshabib10580802015-02-18 18:30:33 -0800216 */
Madan Jampani0b847532016-03-03 13:44:15 -0800217 private Map<GroupStoreKeyMapKey, StoredGroupEntry>
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700218 getGroupStoreKeyMap() {
Madan Jampani0b847532016-03-03 13:44:15 -0800219 return groupStoreEntriesByKey.asJavaMap();
alshabib10580802015-02-18 18:30:33 -0800220 }
221
222 /**
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700223 * Returns the group id table for specified device.
alshabib10580802015-02-18 18:30:33 -0800224 *
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700225 * @param deviceId identifier of the device
226 * @return Map representing group key table of given device.
alshabib10580802015-02-18 18:30:33 -0800227 */
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700228 private ConcurrentMap<GroupId, StoredGroupEntry> getGroupIdTable(DeviceId deviceId) {
229 return createIfAbsentUnchecked(groupEntriesById,
230 deviceId, lazyEmptyGroupIdTable());
alshabib10580802015-02-18 18:30:33 -0800231 }
232
233 /**
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700234 * Returns the pending group request table.
alshabib10580802015-02-18 18:30:33 -0800235 *
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700236 * @return Map representing group key table.
alshabib10580802015-02-18 18:30:33 -0800237 */
Madan Jampani0b847532016-03-03 13:44:15 -0800238 private Map<GroupStoreKeyMapKey, StoredGroupEntry>
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700239 getPendingGroupKeyTable() {
Madan Jampani0b847532016-03-03 13:44:15 -0800240 return auditPendingReqQueue.asJavaMap();
alshabib10580802015-02-18 18:30:33 -0800241 }
242
243 /**
244 * Returns the extraneous group id table for specified device.
245 *
246 * @param deviceId identifier of the device
247 * @return Map representing group key table of given device.
248 */
249 private ConcurrentMap<GroupId, Group>
250 getExtraneousGroupIdTable(DeviceId deviceId) {
251 return createIfAbsentUnchecked(extraneousGroupEntriesById,
252 deviceId,
253 lazyEmptyExtraneousGroupIdTable());
254 }
255
256 /**
257 * Returns the number of groups for the specified device in the store.
258 *
259 * @return number of groups for the specified device
260 */
261 @Override
262 public int getGroupCount(DeviceId deviceId) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700263 return (getGroups(deviceId) != null) ?
264 Iterables.size(getGroups(deviceId)) : 0;
alshabib10580802015-02-18 18:30:33 -0800265 }
266
267 /**
268 * Returns the groups associated with a device.
269 *
270 * @param deviceId the device ID
271 *
272 * @return the group entries
273 */
274 @Override
275 public Iterable<Group> getGroups(DeviceId deviceId) {
Charles Chanf4838a72015-12-07 18:13:45 -0800276 // Let ImmutableSet.copyOf do the type conversion
277 return ImmutableSet.copyOf(getStoredGroups(deviceId));
alshabib10580802015-02-18 18:30:33 -0800278 }
279
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700280 private Iterable<StoredGroupEntry> getStoredGroups(DeviceId deviceId) {
Charles Chanf4838a72015-12-07 18:13:45 -0800281 NodeId master = mastershipService.getMasterFor(deviceId);
282 if (master == null) {
283 log.debug("Failed to getGroups: No master for {}", deviceId);
284 return Collections.emptySet();
285 }
286
287 Set<StoredGroupEntry> storedGroups = getGroupStoreKeyMap().values()
288 .stream()
289 .filter(input -> input.deviceId().equals(deviceId))
290 .collect(Collectors.toSet());
291 return ImmutableSet.copyOf(storedGroups);
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700292 }
293
alshabib10580802015-02-18 18:30:33 -0800294 /**
295 * Returns the stored group entry.
296 *
297 * @param deviceId the device ID
298 * @param appCookie the group key
299 *
300 * @return a group associated with the key
301 */
302 @Override
303 public Group getGroup(DeviceId deviceId, GroupKey appCookie) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700304 return getStoredGroupEntry(deviceId, appCookie);
305 }
306
307 private StoredGroupEntry getStoredGroupEntry(DeviceId deviceId,
308 GroupKey appCookie) {
309 return getGroupStoreKeyMap().get(new GroupStoreKeyMapKey(deviceId,
310 appCookie));
311 }
312
313 @Override
314 public Group getGroup(DeviceId deviceId, GroupId groupId) {
315 return getStoredGroupEntry(deviceId, groupId);
316 }
317
318 private StoredGroupEntry getStoredGroupEntry(DeviceId deviceId,
319 GroupId groupId) {
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700320 return getGroupIdTable(deviceId).get(groupId);
alshabib10580802015-02-18 18:30:33 -0800321 }
322
323 private int getFreeGroupIdValue(DeviceId deviceId) {
324 int freeId = groupIdGen.incrementAndGet();
325
326 while (true) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700327 Group existing = getGroup(deviceId, new DefaultGroupId(freeId));
alshabib10580802015-02-18 18:30:33 -0800328 if (existing == null) {
329 existing = (
330 extraneousGroupEntriesById.get(deviceId) != null) ?
331 extraneousGroupEntriesById.get(deviceId).
332 get(new DefaultGroupId(freeId)) :
333 null;
334 }
335 if (existing != null) {
336 freeId = groupIdGen.incrementAndGet();
337 } else {
338 break;
339 }
340 }
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700341 log.debug("getFreeGroupIdValue: Next Free ID is {}", freeId);
alshabib10580802015-02-18 18:30:33 -0800342 return freeId;
343 }
344
345 /**
346 * Stores a new group entry using the information from group description.
347 *
348 * @param groupDesc group description to be used to create group entry
349 */
350 @Override
351 public void storeGroupDescription(GroupDescription groupDesc) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700352 log.debug("In storeGroupDescription");
alshabib10580802015-02-18 18:30:33 -0800353 // Check if a group is existing with the same key
Saurav Das8a0732e2015-11-20 15:27:53 -0800354 Group existingGroup = getGroup(groupDesc.deviceId(), groupDesc.appCookie());
355 if (existingGroup != null) {
Saurav Das4ce45962015-11-24 23:21:05 -0800356 log.warn("Group already exists with the same key {} in dev:{} with id:0x{}",
Saurav Das8a0732e2015-11-20 15:27:53 -0800357 groupDesc.appCookie(), groupDesc.deviceId(),
358 Integer.toHexString(existingGroup.id().id()));
alshabib10580802015-02-18 18:30:33 -0800359 return;
360 }
361
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700362 // Check if group to be created by a remote instance
Madan Jampani175e8fd2015-05-20 14:10:45 -0700363 if (mastershipService.getLocalRole(groupDesc.deviceId()) != MastershipRole.MASTER) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700364 log.debug("storeGroupDescription: Device {} local role is not MASTER",
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700365 groupDesc.deviceId());
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700366 if (mastershipService.getMasterFor(groupDesc.deviceId()) == null) {
367 log.error("No Master for device {}..."
368 + "Can not perform add group operation",
369 groupDesc.deviceId());
370 //TODO: Send Group operation failure event
371 return;
372 }
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700373 GroupStoreMessage groupOp = GroupStoreMessage.
374 createGroupAddRequestMsg(groupDesc.deviceId(),
375 groupDesc);
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700376
Madan Jampani175e8fd2015-05-20 14:10:45 -0700377 clusterCommunicator.unicast(groupOp,
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700378 GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST,
HIGUCHI Yuta180d70f2015-10-01 16:13:56 -0700379 clusterMsgSerializer::serialize,
Madan Jampani175e8fd2015-05-20 14:10:45 -0700380 mastershipService.getMasterFor(groupDesc.deviceId())).whenComplete((result, error) -> {
381 if (error != null) {
382 log.warn("Failed to send request to master: {} to {}",
383 groupOp,
384 mastershipService.getMasterFor(groupDesc.deviceId()));
385 //TODO: Send Group operation failure event
386 } else {
387 log.debug("Sent Group operation request for device {} "
388 + "to remote MASTER {}",
389 groupDesc.deviceId(),
390 mastershipService.getMasterFor(groupDesc.deviceId()));
391 }
392 });
alshabib10580802015-02-18 18:30:33 -0800393 return;
394 }
395
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700396 log.debug("Store group for device {} is getting handled locally",
397 groupDesc.deviceId());
alshabib10580802015-02-18 18:30:33 -0800398 storeGroupDescriptionInternal(groupDesc);
399 }
400
Srikanth Vavilapallie48b3cf2015-07-06 11:43:07 -0700401 private Group getMatchingExtraneousGroupbyId(DeviceId deviceId, Integer groupId) {
402 ConcurrentMap<GroupId, Group> extraneousMap =
403 extraneousGroupEntriesById.get(deviceId);
404 if (extraneousMap == null) {
405 return null;
406 }
407 return extraneousMap.get(new DefaultGroupId(groupId));
408 }
409
410 private Group getMatchingExtraneousGroupbyBuckets(DeviceId deviceId,
411 GroupBuckets buckets) {
412 ConcurrentMap<GroupId, Group> extraneousMap =
413 extraneousGroupEntriesById.get(deviceId);
414 if (extraneousMap == null) {
415 return null;
416 }
417
418 for (Group extraneousGroup:extraneousMap.values()) {
419 if (extraneousGroup.buckets().equals(buckets)) {
420 return extraneousGroup;
421 }
422 }
423 return null;
424 }
425
alshabib10580802015-02-18 18:30:33 -0800426 private void storeGroupDescriptionInternal(GroupDescription groupDesc) {
427 // Check if a group is existing with the same key
428 if (getGroup(groupDesc.deviceId(), groupDesc.appCookie()) != null) {
429 return;
430 }
431
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700432 if (deviceAuditStatus.get(groupDesc.deviceId()) == null) {
433 // Device group audit has not completed yet
434 // Add this group description to pending group key table
435 // Create a group entry object with Dummy Group ID
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700436 log.debug("storeGroupDescriptionInternal: Device {} AUDIT pending...Queuing Group ADD request",
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700437 groupDesc.deviceId());
438 StoredGroupEntry group = new DefaultGroup(dummyGroupId, groupDesc);
439 group.setState(GroupState.WAITING_AUDIT_COMPLETE);
Madan Jampani0b847532016-03-03 13:44:15 -0800440 Map<GroupStoreKeyMapKey, StoredGroupEntry> pendingKeyTable =
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700441 getPendingGroupKeyTable();
442 pendingKeyTable.put(new GroupStoreKeyMapKey(groupDesc.deviceId(),
443 groupDesc.appCookie()),
444 group);
445 return;
446 }
447
Srikanth Vavilapallie48b3cf2015-07-06 11:43:07 -0700448 Group matchingExtraneousGroup = null;
449 if (groupDesc.givenGroupId() != null) {
450 //Check if there is a extraneous group existing with the same Id
451 matchingExtraneousGroup = getMatchingExtraneousGroupbyId(
452 groupDesc.deviceId(), groupDesc.givenGroupId());
453 if (matchingExtraneousGroup != null) {
Saurav Das0fd79d92016-03-07 10:58:36 -0800454 log.debug("storeGroupDescriptionInternal: Matching extraneous group "
455 + "found in Device {} for group id 0x{}",
Srikanth Vavilapallie48b3cf2015-07-06 11:43:07 -0700456 groupDesc.deviceId(),
Saurav Das0fd79d92016-03-07 10:58:36 -0800457 Integer.toHexString(groupDesc.givenGroupId()));
Srikanth Vavilapallie48b3cf2015-07-06 11:43:07 -0700458 //Check if the group buckets matches with user provided buckets
459 if (matchingExtraneousGroup.buckets().equals(groupDesc.buckets())) {
460 //Group is already existing with the same buckets and Id
461 // Create a group entry object
Saurav Das0fd79d92016-03-07 10:58:36 -0800462 log.debug("storeGroupDescriptionInternal: Buckets also matching "
463 + "in Device {} for group id 0x{}",
Srikanth Vavilapallie48b3cf2015-07-06 11:43:07 -0700464 groupDesc.deviceId(),
Saurav Das0fd79d92016-03-07 10:58:36 -0800465 Integer.toHexString(groupDesc.givenGroupId()));
Srikanth Vavilapallie48b3cf2015-07-06 11:43:07 -0700466 StoredGroupEntry group = new DefaultGroup(
467 matchingExtraneousGroup.id(), groupDesc);
468 // Insert the newly created group entry into key and id maps
469 getGroupStoreKeyMap().
470 put(new GroupStoreKeyMapKey(groupDesc.deviceId(),
471 groupDesc.appCookie()), group);
472 // Ensure it also inserted into group id based table to
473 // avoid any chances of duplication in group id generation
474 getGroupIdTable(groupDesc.deviceId()).
475 put(matchingExtraneousGroup.id(), group);
476 addOrUpdateGroupEntry(matchingExtraneousGroup);
477 removeExtraneousGroupEntry(matchingExtraneousGroup);
478 return;
479 } else {
480 //Group buckets are not matching. Update group
481 //with user provided buckets.
Saurav Das0fd79d92016-03-07 10:58:36 -0800482 log.debug("storeGroupDescriptionInternal: Buckets are not "
483 + "matching in Device {} for group id 0x{}",
Srikanth Vavilapallie48b3cf2015-07-06 11:43:07 -0700484 groupDesc.deviceId(),
Saurav Das0fd79d92016-03-07 10:58:36 -0800485 Integer.toHexString(groupDesc.givenGroupId()));
486 StoredGroupEntry modifiedGroup = new DefaultGroup(
487 matchingExtraneousGroup.id(), groupDesc);
488 modifiedGroup.setState(GroupState.PENDING_UPDATE);
489 getGroupStoreKeyMap().
490 put(new GroupStoreKeyMapKey(groupDesc.deviceId(),
491 groupDesc.appCookie()), modifiedGroup);
492 // Ensure it also inserted into group id based table to
493 // avoid any chances of duplication in group id generation
494 getGroupIdTable(groupDesc.deviceId()).
495 put(matchingExtraneousGroup.id(), modifiedGroup);
496 removeExtraneousGroupEntry(matchingExtraneousGroup);
497 log.debug("storeGroupDescriptionInternal: Triggering Group "
498 + "UPDATE request for {} in device {}",
499 matchingExtraneousGroup.id(),
500 groupDesc.deviceId());
501 notifyDelegate(new GroupEvent(Type.GROUP_UPDATE_REQUESTED, modifiedGroup));
502 return;
Srikanth Vavilapallie48b3cf2015-07-06 11:43:07 -0700503 }
504 }
505 } else {
506 //Check if there is an extraneous group with user provided buckets
507 matchingExtraneousGroup = getMatchingExtraneousGroupbyBuckets(
508 groupDesc.deviceId(), groupDesc.buckets());
509 if (matchingExtraneousGroup != null) {
510 //Group is already existing with the same buckets.
511 //So reuse this group.
512 log.debug("storeGroupDescriptionInternal: Matching extraneous group found in Device {}",
513 groupDesc.deviceId());
514 //Create a group entry object
515 StoredGroupEntry group = new DefaultGroup(
516 matchingExtraneousGroup.id(), groupDesc);
517 // Insert the newly created group entry into key and id maps
518 getGroupStoreKeyMap().
519 put(new GroupStoreKeyMapKey(groupDesc.deviceId(),
520 groupDesc.appCookie()), group);
521 // Ensure it also inserted into group id based table to
522 // avoid any chances of duplication in group id generation
523 getGroupIdTable(groupDesc.deviceId()).
524 put(matchingExtraneousGroup.id(), group);
525 addOrUpdateGroupEntry(matchingExtraneousGroup);
526 removeExtraneousGroupEntry(matchingExtraneousGroup);
527 return;
528 } else {
529 //TODO: Check if there are any empty groups that can be used here
530 log.debug("storeGroupDescriptionInternal: No matching extraneous groups found in Device {}",
531 groupDesc.deviceId());
532 }
533 }
534
Saurav Das100e3b82015-04-30 11:12:10 -0700535 GroupId id = null;
536 if (groupDesc.givenGroupId() == null) {
537 // Get a new group identifier
538 id = new DefaultGroupId(getFreeGroupIdValue(groupDesc.deviceId()));
539 } else {
540 id = new DefaultGroupId(groupDesc.givenGroupId());
541 }
alshabib10580802015-02-18 18:30:33 -0800542 // Create a group entry object
543 StoredGroupEntry group = new DefaultGroup(id, groupDesc);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700544 // Insert the newly created group entry into key and id maps
545 getGroupStoreKeyMap().
546 put(new GroupStoreKeyMapKey(groupDesc.deviceId(),
547 groupDesc.appCookie()), group);
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700548 // Ensure it also inserted into group id based table to
549 // avoid any chances of duplication in group id generation
550 getGroupIdTable(groupDesc.deviceId()).
551 put(id, group);
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700552 log.debug("storeGroupDescriptionInternal: Processing Group ADD request for Id {} in device {}",
553 id,
554 groupDesc.deviceId());
alshabib10580802015-02-18 18:30:33 -0800555 notifyDelegate(new GroupEvent(GroupEvent.Type.GROUP_ADD_REQUESTED,
556 group));
557 }
558
559 /**
560 * Updates the existing group entry with the information
561 * from group description.
562 *
563 * @param deviceId the device ID
564 * @param oldAppCookie the current group key
565 * @param type update type
566 * @param newBuckets group buckets for updates
567 * @param newAppCookie optional new group key
568 */
569 @Override
570 public void updateGroupDescription(DeviceId deviceId,
571 GroupKey oldAppCookie,
572 UpdateType type,
573 GroupBuckets newBuckets,
574 GroupKey newAppCookie) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700575 // Check if group update to be done by a remote instance
sangho52abe3a2015-05-05 14:13:34 -0700576 if (mastershipService.getMasterFor(deviceId) != null &&
577 mastershipService.getLocalRole(deviceId) != MastershipRole.MASTER) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700578 log.debug("updateGroupDescription: Device {} local role is not MASTER",
579 deviceId);
580 if (mastershipService.getMasterFor(deviceId) == null) {
581 log.error("No Master for device {}..."
582 + "Can not perform update group operation",
583 deviceId);
584 //TODO: Send Group operation failure event
585 return;
586 }
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700587 GroupStoreMessage groupOp = GroupStoreMessage.
588 createGroupUpdateRequestMsg(deviceId,
589 oldAppCookie,
590 type,
591 newBuckets,
592 newAppCookie);
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700593
Madan Jampani175e8fd2015-05-20 14:10:45 -0700594 clusterCommunicator.unicast(groupOp,
595 GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST,
HIGUCHI Yuta180d70f2015-10-01 16:13:56 -0700596 clusterMsgSerializer::serialize,
Madan Jampani175e8fd2015-05-20 14:10:45 -0700597 mastershipService.getMasterFor(deviceId)).whenComplete((result, error) -> {
598 if (error != null) {
599 log.warn("Failed to send request to master: {} to {}",
600 groupOp,
601 mastershipService.getMasterFor(deviceId), error);
602 }
603 //TODO: Send Group operation failure event
604 });
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700605 return;
606 }
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700607 log.debug("updateGroupDescription for device {} is getting handled locally",
608 deviceId);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700609 updateGroupDescriptionInternal(deviceId,
610 oldAppCookie,
611 type,
612 newBuckets,
613 newAppCookie);
614 }
615
616 private void updateGroupDescriptionInternal(DeviceId deviceId,
617 GroupKey oldAppCookie,
618 UpdateType type,
619 GroupBuckets newBuckets,
620 GroupKey newAppCookie) {
alshabib10580802015-02-18 18:30:33 -0800621 // Check if a group is existing with the provided key
622 Group oldGroup = getGroup(deviceId, oldAppCookie);
623 if (oldGroup == null) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700624 log.warn("updateGroupDescriptionInternal: Group not found...strange");
alshabib10580802015-02-18 18:30:33 -0800625 return;
626 }
627
628 List<GroupBucket> newBucketList = getUpdatedBucketList(oldGroup,
629 type,
630 newBuckets);
631 if (newBucketList != null) {
632 // Create a new group object from the old group
633 GroupBuckets updatedBuckets = new GroupBuckets(newBucketList);
634 GroupKey newCookie = (newAppCookie != null) ? newAppCookie : oldAppCookie;
635 GroupDescription updatedGroupDesc = new DefaultGroupDescription(
636 oldGroup.deviceId(),
637 oldGroup.type(),
638 updatedBuckets,
639 newCookie,
Saurav Das100e3b82015-04-30 11:12:10 -0700640 oldGroup.givenGroupId(),
alshabib10580802015-02-18 18:30:33 -0800641 oldGroup.appId());
642 StoredGroupEntry newGroup = new DefaultGroup(oldGroup.id(),
643 updatedGroupDesc);
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700644 log.debug("updateGroupDescriptionInternal: group entry {} in device {} moving from {} to PENDING_UPDATE",
645 oldGroup.id(),
646 oldGroup.deviceId(),
647 oldGroup.state());
alshabib10580802015-02-18 18:30:33 -0800648 newGroup.setState(GroupState.PENDING_UPDATE);
649 newGroup.setLife(oldGroup.life());
650 newGroup.setPackets(oldGroup.packets());
651 newGroup.setBytes(oldGroup.bytes());
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700652 //Update the group entry in groupkey based map.
653 //Update to groupid based map will happen in the
654 //groupkey based map update listener
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700655 log.debug("updateGroupDescriptionInternal with type {}: Group updated with buckets",
656 type);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700657 getGroupStoreKeyMap().
658 put(new GroupStoreKeyMapKey(newGroup.deviceId(),
659 newGroup.appCookie()), newGroup);
alshabib10580802015-02-18 18:30:33 -0800660 notifyDelegate(new GroupEvent(Type.GROUP_UPDATE_REQUESTED, newGroup));
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700661 } else {
662 log.warn("updateGroupDescriptionInternal with type {}: No "
663 + "change in the buckets in update", type);
alshabib10580802015-02-18 18:30:33 -0800664 }
665 }
666
667 private List<GroupBucket> getUpdatedBucketList(Group oldGroup,
668 UpdateType type,
669 GroupBuckets buckets) {
670 GroupBuckets oldBuckets = oldGroup.buckets();
Sho SHIMIZU7a4087b2015-09-10 09:23:16 -0700671 List<GroupBucket> newBucketList = new ArrayList<>(oldBuckets.buckets());
alshabib10580802015-02-18 18:30:33 -0800672 boolean groupDescUpdated = false;
673
674 if (type == UpdateType.ADD) {
675 // Check if the any of the new buckets are part of
676 // the old bucket list
677 for (GroupBucket addBucket:buckets.buckets()) {
678 if (!newBucketList.contains(addBucket)) {
679 newBucketList.add(addBucket);
680 groupDescUpdated = true;
681 }
682 }
683 } else if (type == UpdateType.REMOVE) {
684 // Check if the to be removed buckets are part of the
685 // old bucket list
686 for (GroupBucket removeBucket:buckets.buckets()) {
687 if (newBucketList.contains(removeBucket)) {
688 newBucketList.remove(removeBucket);
689 groupDescUpdated = true;
690 }
691 }
692 }
693
694 if (groupDescUpdated) {
695 return newBucketList;
696 } else {
697 return null;
698 }
699 }
700
701 /**
702 * Triggers deleting the existing group entry.
703 *
704 * @param deviceId the device ID
705 * @param appCookie the group key
706 */
707 @Override
708 public void deleteGroupDescription(DeviceId deviceId,
709 GroupKey appCookie) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700710 // Check if group to be deleted by a remote instance
711 if (mastershipService.
712 getLocalRole(deviceId) != MastershipRole.MASTER) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700713 log.debug("deleteGroupDescription: Device {} local role is not MASTER",
714 deviceId);
715 if (mastershipService.getMasterFor(deviceId) == null) {
716 log.error("No Master for device {}..."
717 + "Can not perform delete group operation",
718 deviceId);
719 //TODO: Send Group operation failure event
720 return;
721 }
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700722 GroupStoreMessage groupOp = GroupStoreMessage.
723 createGroupDeleteRequestMsg(deviceId,
724 appCookie);
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700725
Madan Jampani175e8fd2015-05-20 14:10:45 -0700726 clusterCommunicator.unicast(groupOp,
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700727 GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST,
HIGUCHI Yuta180d70f2015-10-01 16:13:56 -0700728 clusterMsgSerializer::serialize,
Madan Jampani175e8fd2015-05-20 14:10:45 -0700729 mastershipService.getMasterFor(deviceId)).whenComplete((result, error) -> {
730 if (error != null) {
731 log.warn("Failed to send request to master: {} to {}",
732 groupOp,
733 mastershipService.getMasterFor(deviceId), error);
734 }
735 //TODO: Send Group operation failure event
736 });
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700737 return;
738 }
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700739 log.debug("deleteGroupDescription in device {} is getting handled locally",
740 deviceId);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700741 deleteGroupDescriptionInternal(deviceId, appCookie);
742 }
743
744 private void deleteGroupDescriptionInternal(DeviceId deviceId,
745 GroupKey appCookie) {
alshabib10580802015-02-18 18:30:33 -0800746 // Check if a group is existing with the provided key
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700747 StoredGroupEntry existing = getStoredGroupEntry(deviceId, appCookie);
alshabib10580802015-02-18 18:30:33 -0800748 if (existing == null) {
749 return;
750 }
751
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700752 log.debug("deleteGroupDescriptionInternal: group entry {} in device {} moving from {} to PENDING_DELETE",
753 existing.id(),
754 existing.deviceId(),
755 existing.state());
alshabib10580802015-02-18 18:30:33 -0800756 synchronized (existing) {
757 existing.setState(GroupState.PENDING_DELETE);
758 }
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700759 log.debug("deleteGroupDescriptionInternal: in device {} issuing GROUP_REMOVE_REQUESTED",
760 deviceId);
alshabib10580802015-02-18 18:30:33 -0800761 notifyDelegate(new GroupEvent(Type.GROUP_REMOVE_REQUESTED, existing));
762 }
763
764 /**
765 * Stores a new group entry, or updates an existing entry.
766 *
767 * @param group group entry
768 */
769 @Override
770 public void addOrUpdateGroupEntry(Group group) {
771 // check if this new entry is an update to an existing entry
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700772 StoredGroupEntry existing = getStoredGroupEntry(group.deviceId(),
773 group.id());
alshabib10580802015-02-18 18:30:33 -0800774 GroupEvent event = null;
775
776 if (existing != null) {
Saurav Das0fd79d92016-03-07 10:58:36 -0800777 log.trace("addOrUpdateGroupEntry: updating group entry {} in device {}",
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700778 group.id(),
779 group.deviceId());
alshabib10580802015-02-18 18:30:33 -0800780 synchronized (existing) {
Srikanth Vavilapalli10e75cd2015-04-13 16:21:24 -0700781 for (GroupBucket bucket:group.buckets().buckets()) {
Sho SHIMIZU30d639b2015-05-05 09:30:35 -0700782 Optional<GroupBucket> matchingBucket =
Srikanth Vavilapalli10e75cd2015-04-13 16:21:24 -0700783 existing.buckets().buckets()
784 .stream()
785 .filter((existingBucket)->(existingBucket.equals(bucket)))
786 .findFirst();
787 if (matchingBucket.isPresent()) {
788 ((StoredGroupBucketEntry) matchingBucket.
789 get()).setPackets(bucket.packets());
790 ((StoredGroupBucketEntry) matchingBucket.
791 get()).setBytes(bucket.bytes());
792 } else {
793 log.warn("addOrUpdateGroupEntry: No matching "
794 + "buckets to update stats");
795 }
796 }
alshabib10580802015-02-18 18:30:33 -0800797 existing.setLife(group.life());
798 existing.setPackets(group.packets());
799 existing.setBytes(group.bytes());
Srikanth Vavilapalli5428b6c2015-05-14 20:22:47 -0700800 if ((existing.state() == GroupState.PENDING_ADD) ||
801 (existing.state() == GroupState.PENDING_ADD_RETRY)) {
Saurav Das0fd79d92016-03-07 10:58:36 -0800802 log.trace("addOrUpdateGroupEntry: group entry {} in device {} moving from {} to ADDED",
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700803 existing.id(),
804 existing.deviceId(),
Srikanth Vavilapalli5428b6c2015-05-14 20:22:47 -0700805 existing.state());
alshabib10580802015-02-18 18:30:33 -0800806 existing.setState(GroupState.ADDED);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700807 existing.setIsGroupStateAddedFirstTime(true);
alshabib10580802015-02-18 18:30:33 -0800808 event = new GroupEvent(Type.GROUP_ADDED, existing);
809 } else {
Saurav Das0fd79d92016-03-07 10:58:36 -0800810 log.trace("addOrUpdateGroupEntry: group entry {} in device {} moving from {} to ADDED",
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700811 existing.id(),
812 existing.deviceId(),
813 GroupState.PENDING_UPDATE);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700814 existing.setState(GroupState.ADDED);
815 existing.setIsGroupStateAddedFirstTime(false);
alshabib10580802015-02-18 18:30:33 -0800816 event = new GroupEvent(Type.GROUP_UPDATED, existing);
817 }
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700818 //Re-PUT map entries to trigger map update events
819 getGroupStoreKeyMap().
820 put(new GroupStoreKeyMapKey(existing.deviceId(),
821 existing.appCookie()), existing);
alshabib10580802015-02-18 18:30:33 -0800822 }
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700823 } else {
824 log.warn("addOrUpdateGroupEntry: Group update "
825 + "happening for a non-existing entry in the map");
alshabib10580802015-02-18 18:30:33 -0800826 }
827
828 if (event != null) {
829 notifyDelegate(event);
830 }
831 }
832
833 /**
834 * Removes the group entry from store.
835 *
836 * @param group group entry
837 */
838 @Override
839 public void removeGroupEntry(Group group) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700840 StoredGroupEntry existing = getStoredGroupEntry(group.deviceId(),
841 group.id());
alshabib10580802015-02-18 18:30:33 -0800842
843 if (existing != null) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700844 log.debug("removeGroupEntry: removing group entry {} in device {}",
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700845 group.id(),
846 group.deviceId());
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700847 //Removal from groupid based map will happen in the
848 //map update listener
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700849 getGroupStoreKeyMap().remove(new GroupStoreKeyMapKey(existing.deviceId(),
850 existing.appCookie()));
alshabib10580802015-02-18 18:30:33 -0800851 notifyDelegate(new GroupEvent(Type.GROUP_REMOVED, existing));
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700852 } else {
853 log.warn("removeGroupEntry for {} in device{} is "
854 + "not existing in our maps",
855 group.id(),
856 group.deviceId());
alshabib10580802015-02-18 18:30:33 -0800857 }
858 }
859
860 @Override
Charles Chan0c7c43b2016-01-14 17:39:20 -0800861 public void purgeGroupEntry(DeviceId deviceId) {
862 Set<Entry<GroupStoreKeyMapKey, StoredGroupEntry>> entryPendingRemove =
863 new HashSet<>();
864
Madan Jampani0b847532016-03-03 13:44:15 -0800865 getGroupStoreKeyMap().entrySet().stream()
Charles Chan0c7c43b2016-01-14 17:39:20 -0800866 .filter(entry -> entry.getKey().deviceId().equals(deviceId))
867 .forEach(entryPendingRemove::add);
868
869 entryPendingRemove.forEach(entry -> {
870 groupStoreEntriesByKey.remove(entry.getKey());
871 notifyDelegate(new GroupEvent(Type.GROUP_REMOVED, entry.getValue()));
872 });
873 }
874
875 @Override
alshabib10580802015-02-18 18:30:33 -0800876 public void deviceInitialAuditCompleted(DeviceId deviceId,
877 boolean completed) {
878 synchronized (deviceAuditStatus) {
879 if (completed) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700880 log.debug("AUDIT completed for device {}",
881 deviceId);
alshabib10580802015-02-18 18:30:33 -0800882 deviceAuditStatus.put(deviceId, true);
883 // Execute all pending group requests
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700884 List<StoredGroupEntry> pendingGroupRequests =
885 getPendingGroupKeyTable().values()
886 .stream()
887 .filter(g-> g.deviceId().equals(deviceId))
888 .collect(Collectors.toList());
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700889 log.debug("processing pending group add requests for device {} and number of pending requests {}",
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700890 deviceId,
891 pendingGroupRequests.size());
892 for (Group group:pendingGroupRequests) {
alshabib10580802015-02-18 18:30:33 -0800893 GroupDescription tmp = new DefaultGroupDescription(
894 group.deviceId(),
895 group.type(),
896 group.buckets(),
897 group.appCookie(),
Saurav Das100e3b82015-04-30 11:12:10 -0700898 group.givenGroupId(),
alshabib10580802015-02-18 18:30:33 -0800899 group.appId());
900 storeGroupDescriptionInternal(tmp);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700901 getPendingGroupKeyTable().
902 remove(new GroupStoreKeyMapKey(deviceId, group.appCookie()));
alshabib10580802015-02-18 18:30:33 -0800903 }
alshabib10580802015-02-18 18:30:33 -0800904 } else {
Thomas Vachuskac40d4632015-04-09 16:55:03 -0700905 Boolean audited = deviceAuditStatus.get(deviceId);
906 if (audited != null && audited) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700907 log.debug("Clearing AUDIT status for device {}", deviceId);
alshabib10580802015-02-18 18:30:33 -0800908 deviceAuditStatus.put(deviceId, false);
909 }
910 }
911 }
912 }
913
914 @Override
915 public boolean deviceInitialAuditStatus(DeviceId deviceId) {
916 synchronized (deviceAuditStatus) {
Thomas Vachuskac40d4632015-04-09 16:55:03 -0700917 Boolean audited = deviceAuditStatus.get(deviceId);
918 return audited != null && audited;
alshabib10580802015-02-18 18:30:33 -0800919 }
920 }
921
922 @Override
923 public void groupOperationFailed(DeviceId deviceId, GroupOperation operation) {
924
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700925 StoredGroupEntry existing = getStoredGroupEntry(deviceId,
926 operation.groupId());
alshabib10580802015-02-18 18:30:33 -0800927
928 if (existing == null) {
929 log.warn("No group entry with ID {} found ", operation.groupId());
930 return;
931 }
932
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700933 log.warn("groupOperationFailed: group operation {} failed"
Saurav Das0fd79d92016-03-07 10:58:36 -0800934 + "for group {} in device {} with code {}",
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700935 operation.opType(),
936 existing.id(),
Saurav Das0fd79d92016-03-07 10:58:36 -0800937 existing.deviceId(),
938 operation.failureCode());
939 if (operation.failureCode() == GroupOperation.GroupMsgErrorCode.GROUP_EXISTS) {
940 log.warn("Current extraneous groups in device:{} are: {}",
941 deviceId,
942 getExtraneousGroups(deviceId));
943 }
alshabib10580802015-02-18 18:30:33 -0800944 switch (operation.opType()) {
945 case ADD:
Srikanth Vavilapalli5428b6c2015-05-14 20:22:47 -0700946 if (existing.state() == GroupState.PENDING_ADD) {
Srikanth Vavilapalli5428b6c2015-05-14 20:22:47 -0700947 notifyDelegate(new GroupEvent(Type.GROUP_ADD_FAILED, existing));
948 log.warn("groupOperationFailed: cleaningup "
949 + "group {} from store in device {}....",
950 existing.id(),
951 existing.deviceId());
952 //Removal from groupid based map will happen in the
953 //map update listener
954 getGroupStoreKeyMap().remove(new GroupStoreKeyMapKey(existing.deviceId(),
955 existing.appCookie()));
956 }
alshabib10580802015-02-18 18:30:33 -0800957 break;
958 case MODIFY:
959 notifyDelegate(new GroupEvent(Type.GROUP_UPDATE_FAILED, existing));
960 break;
961 case DELETE:
962 notifyDelegate(new GroupEvent(Type.GROUP_REMOVE_FAILED, existing));
963 break;
964 default:
965 log.warn("Unknown group operation type {}", operation.opType());
966 }
alshabib10580802015-02-18 18:30:33 -0800967 }
968
969 @Override
970 public void addOrUpdateExtraneousGroupEntry(Group group) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700971 log.debug("add/update extraneous group entry {} in device {}",
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700972 group.id(),
973 group.deviceId());
alshabib10580802015-02-18 18:30:33 -0800974 ConcurrentMap<GroupId, Group> extraneousIdTable =
975 getExtraneousGroupIdTable(group.deviceId());
976 extraneousIdTable.put(group.id(), group);
Srikanth Vavilapallie48b3cf2015-07-06 11:43:07 -0700977 // Don't remove the extraneous groups, instead re-use it when
978 // a group request comes with the same set of buckets
alshabib10580802015-02-18 18:30:33 -0800979 }
980
981 @Override
982 public void removeExtraneousGroupEntry(Group group) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700983 log.debug("remove extraneous group entry {} of device {} from store",
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700984 group.id(),
985 group.deviceId());
alshabib10580802015-02-18 18:30:33 -0800986 ConcurrentMap<GroupId, Group> extraneousIdTable =
987 getExtraneousGroupIdTable(group.deviceId());
988 extraneousIdTable.remove(group.id());
989 }
990
991 @Override
992 public Iterable<Group> getExtraneousGroups(DeviceId deviceId) {
993 // flatten and make iterator unmodifiable
994 return FluentIterable.from(
995 getExtraneousGroupIdTable(deviceId).values());
996 }
997
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700998 /**
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700999 * Map handler to receive any events when the group key map is updated.
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001000 */
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -07001001 private class GroupStoreKeyMapListener implements
Madan Jampani0b847532016-03-03 13:44:15 -08001002 MapEventListener<GroupStoreKeyMapKey, StoredGroupEntry> {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001003
1004 @Override
Madan Jampani0b847532016-03-03 13:44:15 -08001005 public void event(MapEvent<GroupStoreKeyMapKey, StoredGroupEntry> mapEvent) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001006 GroupEvent groupEvent = null;
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001007 GroupStoreKeyMapKey key = mapEvent.key();
Madan Jampani0b847532016-03-03 13:44:15 -08001008 StoredGroupEntry group = Versioned.valueOrNull(mapEvent.newValue());
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001009 if ((key == null) && (group == null)) {
1010 log.error("GroupStoreKeyMapListener: Received "
1011 + "event {} with null entry", mapEvent.type());
1012 return;
1013 } else if (group == null) {
1014 group = getGroupIdTable(key.deviceId()).values()
1015 .stream()
1016 .filter((storedGroup) -> (storedGroup.appCookie().equals(key.appCookie)))
1017 .findFirst().get();
1018 if (group == null) {
1019 log.error("GroupStoreKeyMapListener: Received "
1020 + "event {} with null entry... can not process", mapEvent.type());
1021 return;
1022 }
1023 }
1024 log.trace("received groupid map event {} for id {} in device {}",
1025 mapEvent.type(),
1026 group.id(),
1027 key.deviceId());
Madan Jampani0b847532016-03-03 13:44:15 -08001028 if (mapEvent.type() == MapEvent.Type.INSERT || mapEvent.type() == MapEvent.Type.UPDATE) {
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -07001029 // Update the group ID table
1030 getGroupIdTable(group.deviceId()).put(group.id(), group);
Madan Jampani0b847532016-03-03 13:44:15 -08001031 StoredGroupEntry value = Versioned.valueOrNull(mapEvent.newValue());
1032 if (value.state() == Group.GroupState.ADDED) {
1033 if (value.isGroupStateAddedFirstTime()) {
1034 groupEvent = new GroupEvent(Type.GROUP_ADDED, value);
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001035 log.trace("Received first time GROUP_ADDED state update for id {} in device {}",
1036 group.id(),
1037 group.deviceId());
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001038 } else {
Madan Jampani0b847532016-03-03 13:44:15 -08001039 groupEvent = new GroupEvent(Type.GROUP_UPDATED, value);
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001040 log.trace("Received following GROUP_ADDED state update for id {} in device {}",
1041 group.id(),
1042 group.deviceId());
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001043 }
1044 }
Madan Jampani0b847532016-03-03 13:44:15 -08001045 } else if (mapEvent.type() == MapEvent.Type.REMOVE) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001046 groupEvent = new GroupEvent(Type.GROUP_REMOVED, group);
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -07001047 // Remove the entry from the group ID table
1048 getGroupIdTable(group.deviceId()).remove(group.id(), group);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001049 }
1050
1051 if (groupEvent != null) {
1052 notifyDelegate(groupEvent);
1053 }
1054 }
1055 }
Madan Jampani01e05fb2015-08-13 13:29:36 -07001056
1057 private void process(GroupStoreMessage groupOp) {
1058 log.debug("Received remote group operation {} request for device {}",
1059 groupOp.type(),
1060 groupOp.deviceId());
1061 if (!mastershipService.isLocalMaster(groupOp.deviceId())) {
1062 log.warn("This node is not MASTER for device {}", groupOp.deviceId());
1063 return;
1064 }
1065 if (groupOp.type() == GroupStoreMessage.Type.ADD) {
1066 storeGroupDescriptionInternal(groupOp.groupDesc());
1067 } else if (groupOp.type() == GroupStoreMessage.Type.UPDATE) {
1068 updateGroupDescriptionInternal(groupOp.deviceId(),
1069 groupOp.appCookie(),
1070 groupOp.updateType(),
1071 groupOp.updateBuckets(),
1072 groupOp.newAppCookie());
1073 } else if (groupOp.type() == GroupStoreMessage.Type.DELETE) {
1074 deleteGroupDescriptionInternal(groupOp.deviceId(),
1075 groupOp.appCookie());
1076 }
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001077 }
1078
1079 /**
1080 * Flattened map key to be used to store group entries.
1081 */
Ray Milkeyb3c5ce22015-08-10 09:07:36 -07001082 protected static class GroupStoreMapKey {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001083 private final DeviceId deviceId;
1084
1085 public GroupStoreMapKey(DeviceId deviceId) {
1086 this.deviceId = deviceId;
1087 }
1088
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001089 public DeviceId deviceId() {
1090 return deviceId;
1091 }
1092
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001093 @Override
1094 public boolean equals(Object o) {
1095 if (this == o) {
1096 return true;
1097 }
1098 if (!(o instanceof GroupStoreMapKey)) {
1099 return false;
1100 }
1101 GroupStoreMapKey that = (GroupStoreMapKey) o;
1102 return this.deviceId.equals(that.deviceId);
1103 }
1104
1105 @Override
1106 public int hashCode() {
1107 int result = 17;
1108
1109 result = 31 * result + Objects.hash(this.deviceId);
1110
1111 return result;
1112 }
1113 }
1114
Ray Milkeyb3c5ce22015-08-10 09:07:36 -07001115 protected static class GroupStoreKeyMapKey extends GroupStoreMapKey {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001116 private final GroupKey appCookie;
1117 public GroupStoreKeyMapKey(DeviceId deviceId,
1118 GroupKey appCookie) {
1119 super(deviceId);
1120 this.appCookie = appCookie;
1121 }
1122
1123 @Override
1124 public boolean equals(Object o) {
1125 if (this == o) {
1126 return true;
1127 }
1128 if (!(o instanceof GroupStoreKeyMapKey)) {
1129 return false;
1130 }
1131 GroupStoreKeyMapKey that = (GroupStoreKeyMapKey) o;
1132 return (super.equals(that) &&
1133 this.appCookie.equals(that.appCookie));
1134 }
1135
1136 @Override
1137 public int hashCode() {
1138 int result = 17;
1139
1140 result = 31 * result + super.hashCode() + Objects.hash(this.appCookie);
1141
1142 return result;
1143 }
1144 }
1145
Ray Milkeyb3c5ce22015-08-10 09:07:36 -07001146 protected static class GroupStoreIdMapKey extends GroupStoreMapKey {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001147 private final GroupId groupId;
1148 public GroupStoreIdMapKey(DeviceId deviceId,
1149 GroupId groupId) {
1150 super(deviceId);
1151 this.groupId = groupId;
1152 }
1153
1154 @Override
1155 public boolean equals(Object o) {
1156 if (this == o) {
1157 return true;
1158 }
1159 if (!(o instanceof GroupStoreIdMapKey)) {
1160 return false;
1161 }
1162 GroupStoreIdMapKey that = (GroupStoreIdMapKey) o;
1163 return (super.equals(that) &&
1164 this.groupId.equals(that.groupId));
1165 }
1166
1167 @Override
1168 public int hashCode() {
1169 int result = 17;
1170
1171 result = 31 * result + super.hashCode() + Objects.hash(this.groupId);
1172
1173 return result;
1174 }
1175 }
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001176
1177 @Override
1178 public void pushGroupMetrics(DeviceId deviceId,
1179 Collection<Group> groupEntries) {
1180 boolean deviceInitialAuditStatus =
1181 deviceInitialAuditStatus(deviceId);
1182 Set<Group> southboundGroupEntries =
1183 Sets.newHashSet(groupEntries);
1184 Set<StoredGroupEntry> storedGroupEntries =
1185 Sets.newHashSet(getStoredGroups(deviceId));
1186 Set<Group> extraneousStoredEntries =
1187 Sets.newHashSet(getExtraneousGroups(deviceId));
1188
1189 log.trace("pushGroupMetrics: Displaying all ({}) southboundGroupEntries for device {}",
1190 southboundGroupEntries.size(),
1191 deviceId);
1192 for (Iterator<Group> it = southboundGroupEntries.iterator(); it.hasNext();) {
1193 Group group = it.next();
1194 log.trace("Group {} in device {}", group, deviceId);
1195 }
1196
1197 log.trace("Displaying all ({}) stored group entries for device {}",
1198 storedGroupEntries.size(),
1199 deviceId);
1200 for (Iterator<StoredGroupEntry> it1 = storedGroupEntries.iterator();
1201 it1.hasNext();) {
1202 Group group = it1.next();
1203 log.trace("Stored Group {} for device {}", group, deviceId);
1204 }
1205
1206 for (Iterator<Group> it2 = southboundGroupEntries.iterator(); it2.hasNext();) {
1207 Group group = it2.next();
1208 if (storedGroupEntries.remove(group)) {
1209 // we both have the group, let's update some info then.
1210 log.trace("Group AUDIT: group {} exists in both planes for device {}",
1211 group.id(), deviceId);
1212 groupAdded(group);
1213 it2.remove();
1214 }
1215 }
1216 for (Group group : southboundGroupEntries) {
1217 if (getGroup(group.deviceId(), group.id()) != null) {
1218 // There is a group existing with the same id
1219 // It is possible that group update is
1220 // in progress while we got a stale info from switch
1221 if (!storedGroupEntries.remove(getGroup(
1222 group.deviceId(), group.id()))) {
1223 log.warn("Group AUDIT: Inconsistent state:"
1224 + "Group exists in ID based table while "
1225 + "not present in key based table");
1226 }
1227 } else {
1228 // there are groups in the switch that aren't in the store
1229 log.debug("Group AUDIT: extraneous group {} exists in data plane for device {}",
1230 group.id(), deviceId);
1231 extraneousStoredEntries.remove(group);
1232 extraneousGroup(group);
1233 }
1234 }
1235 for (Group group : storedGroupEntries) {
1236 // there are groups in the store that aren't in the switch
1237 log.debug("Group AUDIT: group {} missing in data plane for device {}",
1238 group.id(), deviceId);
1239 groupMissing(group);
1240 }
1241 for (Group group : extraneousStoredEntries) {
1242 // there are groups in the extraneous store that
1243 // aren't in the switch
Saurav Das0fd79d92016-03-07 10:58:36 -08001244 log.debug("Group AUDIT: clearing extraneous group {} from store for device {}",
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001245 group.id(), deviceId);
1246 removeExtraneousGroupEntry(group);
1247 }
1248
1249 if (!deviceInitialAuditStatus) {
Saurav Das0fd79d92016-03-07 10:58:36 -08001250 log.info("Group AUDIT: Setting device {} initial AUDIT completed",
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001251 deviceId);
1252 deviceInitialAuditCompleted(deviceId, true);
1253 }
1254 }
1255
1256 private void groupMissing(Group group) {
1257 switch (group.state()) {
1258 case PENDING_DELETE:
1259 log.debug("Group {} delete confirmation from device {}",
1260 group, group.deviceId());
1261 removeGroupEntry(group);
1262 break;
1263 case ADDED:
1264 case PENDING_ADD:
Srikanth Vavilapalli5428b6c2015-05-14 20:22:47 -07001265 case PENDING_ADD_RETRY:
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001266 case PENDING_UPDATE:
1267 log.debug("Group {} is in store but not on device {}",
1268 group, group.deviceId());
1269 StoredGroupEntry existing =
1270 getStoredGroupEntry(group.deviceId(), group.id());
Srikanth Vavilapalli5428b6c2015-05-14 20:22:47 -07001271 log.debug("groupMissing: group entry {} in device {} moving from {} to PENDING_ADD_RETRY",
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001272 existing.id(),
1273 existing.deviceId(),
1274 existing.state());
Srikanth Vavilapalli5428b6c2015-05-14 20:22:47 -07001275 existing.setState(Group.GroupState.PENDING_ADD_RETRY);
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001276 //Re-PUT map entries to trigger map update events
1277 getGroupStoreKeyMap().
1278 put(new GroupStoreKeyMapKey(existing.deviceId(),
1279 existing.appCookie()), existing);
1280 notifyDelegate(new GroupEvent(GroupEvent.Type.GROUP_ADD_REQUESTED,
1281 group));
1282 break;
1283 default:
1284 log.debug("Group {} has not been installed.", group);
1285 break;
1286 }
1287 }
1288
1289 private void extraneousGroup(Group group) {
Saurav Das0fd79d92016-03-07 10:58:36 -08001290 log.trace("Group {} is on device {} but not in store.",
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001291 group, group.deviceId());
1292 addOrUpdateExtraneousGroupEntry(group);
1293 }
1294
1295 private void groupAdded(Group group) {
1296 log.trace("Group {} Added or Updated in device {}",
1297 group, group.deviceId());
1298 addOrUpdateGroupEntry(group);
1299 }
alshabib10580802015-02-18 18:30:33 -08001300}