blob: 64b7c87988d1a5e03ebe84dd26e44e4eb965a744 [file] [log] [blame]
alshabib10580802015-02-18 18:30:33 -08001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.group.impl;
17
Jonathan Hart6ec029a2015-03-24 17:12:35 -070018import com.google.common.collect.FluentIterable;
Charles Chanf4838a72015-12-07 18:13:45 -080019import com.google.common.collect.ImmutableSet;
Jonathan Hart6ec029a2015-03-24 17:12:35 -070020import com.google.common.collect.Iterables;
Srikanth Vavilapalli23181912015-05-04 09:48:09 -070021import com.google.common.collect.Sets;
Srikanth Vavilapalli10e75cd2015-04-13 16:21:24 -070022
alshabib10580802015-02-18 18:30:33 -080023import org.apache.felix.scr.annotations.Activate;
24import org.apache.felix.scr.annotations.Component;
25import org.apache.felix.scr.annotations.Deactivate;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070026import org.apache.felix.scr.annotations.Reference;
27import org.apache.felix.scr.annotations.ReferenceCardinality;
alshabib10580802015-02-18 18:30:33 -080028import org.apache.felix.scr.annotations.Service;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070029import org.onlab.util.KryoNamespace;
alshabib10580802015-02-18 18:30:33 -080030import org.onlab.util.NewConcurrentHashMap;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070031import org.onosproject.cluster.ClusterService;
Charles Chanf4838a72015-12-07 18:13:45 -080032import org.onosproject.cluster.NodeId;
alshabib10580802015-02-18 18:30:33 -080033import org.onosproject.core.DefaultGroupId;
34import org.onosproject.core.GroupId;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070035import org.onosproject.mastership.MastershipService;
alshabib10580802015-02-18 18:30:33 -080036import org.onosproject.net.DeviceId;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070037import org.onosproject.net.MastershipRole;
alshabib10580802015-02-18 18:30:33 -080038import org.onosproject.net.group.DefaultGroup;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070039import org.onosproject.net.group.DefaultGroupBucket;
alshabib10580802015-02-18 18:30:33 -080040import org.onosproject.net.group.DefaultGroupDescription;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070041import org.onosproject.net.group.DefaultGroupKey;
alshabib10580802015-02-18 18:30:33 -080042import org.onosproject.net.group.Group;
43import org.onosproject.net.group.Group.GroupState;
44import org.onosproject.net.group.GroupBucket;
45import org.onosproject.net.group.GroupBuckets;
46import org.onosproject.net.group.GroupDescription;
47import org.onosproject.net.group.GroupEvent;
48import org.onosproject.net.group.GroupEvent.Type;
49import org.onosproject.net.group.GroupKey;
50import org.onosproject.net.group.GroupOperation;
51import org.onosproject.net.group.GroupStore;
52import org.onosproject.net.group.GroupStoreDelegate;
Srikanth Vavilapalli10e75cd2015-04-13 16:21:24 -070053import org.onosproject.net.group.StoredGroupBucketEntry;
alshabib10580802015-02-18 18:30:33 -080054import org.onosproject.net.group.StoredGroupEntry;
55import org.onosproject.store.AbstractStore;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070056import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
Jonathan Hart63939a32015-05-08 11:57:03 -070057import org.onosproject.store.service.MultiValuedTimestamp;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070058import org.onosproject.store.serializers.KryoNamespaces;
Madan Jampani0b847532016-03-03 13:44:15 -080059import org.onosproject.store.service.ConsistentMap;
60import org.onosproject.store.service.MapEvent;
61import org.onosproject.store.service.MapEventListener;
62import org.onosproject.store.service.Serializer;
Jonathan Hart6ec029a2015-03-24 17:12:35 -070063import org.onosproject.store.service.StorageService;
Madan Jampani0b847532016-03-03 13:44:15 -080064import org.onosproject.store.service.Versioned;
alshabib10580802015-02-18 18:30:33 -080065import org.slf4j.Logger;
66
Jonathan Hart6ec029a2015-03-24 17:12:35 -070067import java.util.ArrayList;
Srikanth Vavilapalli23181912015-05-04 09:48:09 -070068import java.util.Collection;
Charles Chanf4838a72015-12-07 18:13:45 -080069import java.util.Collections;
Jonathan Hart6ec029a2015-03-24 17:12:35 -070070import java.util.HashMap;
Charles Chan0c7c43b2016-01-14 17:39:20 -080071import java.util.HashSet;
Srikanth Vavilapalli23181912015-05-04 09:48:09 -070072import java.util.Iterator;
Jonathan Hart6ec029a2015-03-24 17:12:35 -070073import java.util.List;
Madan Jampani0b847532016-03-03 13:44:15 -080074import java.util.Map;
Charles Chan0c7c43b2016-01-14 17:39:20 -080075import java.util.Map.Entry;
Jonathan Hart6ec029a2015-03-24 17:12:35 -070076import java.util.Objects;
Sho SHIMIZU30d639b2015-05-05 09:30:35 -070077import java.util.Optional;
Srikanth Vavilapalli23181912015-05-04 09:48:09 -070078import java.util.Set;
Jonathan Hart6ec029a2015-03-24 17:12:35 -070079import java.util.concurrent.ConcurrentHashMap;
80import java.util.concurrent.ConcurrentMap;
81import java.util.concurrent.ExecutorService;
82import java.util.concurrent.Executors;
83import java.util.concurrent.atomic.AtomicInteger;
Jonathan Hart6ec029a2015-03-24 17:12:35 -070084import java.util.stream.Collectors;
85
86import static org.apache.commons.lang3.concurrent.ConcurrentUtils.createIfAbsentUnchecked;
87import static org.onlab.util.Tools.groupedThreads;
88import static org.slf4j.LoggerFactory.getLogger;
alshabib10580802015-02-18 18:30:33 -080089
90/**
91 * Manages inventory of group entries using trivial in-memory implementation.
92 */
93@Component(immediate = true)
94@Service
95public class DistributedGroupStore
96 extends AbstractStore<GroupEvent, GroupStoreDelegate>
97 implements GroupStore {
98
99 private final Logger log = getLogger(getClass());
100
101 private final int dummyId = 0xffffffff;
102 private final GroupId dummyGroupId = new DefaultGroupId(dummyId);
103
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700104 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
105 protected ClusterCommunicationService clusterCommunicator;
106
107 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
108 protected ClusterService clusterService;
109
110 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700111 protected StorageService storageService;
112
113 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700114 protected MastershipService mastershipService;
115
116 // Per device group table with (device id + app cookie) as key
Madan Jampani0b847532016-03-03 13:44:15 -0800117 private ConsistentMap<GroupStoreKeyMapKey,
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700118 StoredGroupEntry> groupStoreEntriesByKey = null;
119 // Per device group table with (device id + group id) as key
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700120 private final ConcurrentMap<DeviceId, ConcurrentMap<GroupId, StoredGroupEntry>>
121 groupEntriesById = new ConcurrentHashMap<>();
Madan Jampani0b847532016-03-03 13:44:15 -0800122 private ConsistentMap<GroupStoreKeyMapKey,
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700123 StoredGroupEntry> auditPendingReqQueue = null;
alshabib10580802015-02-18 18:30:33 -0800124 private final ConcurrentMap<DeviceId, ConcurrentMap<GroupId, Group>>
125 extraneousGroupEntriesById = new ConcurrentHashMap<>();
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700126 private ExecutorService messageHandlingExecutor;
127 private static final int MESSAGE_HANDLER_THREAD_POOL_SIZE = 1;
alshabib10580802015-02-18 18:30:33 -0800128
Sho SHIMIZU7a4087b2015-09-10 09:23:16 -0700129 private final HashMap<DeviceId, Boolean> deviceAuditStatus = new HashMap<>();
alshabib10580802015-02-18 18:30:33 -0800130
131 private final AtomicInteger groupIdGen = new AtomicInteger();
132
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700133 private KryoNamespace.Builder kryoBuilder = null;
134
HIGUCHI Yuta180d70f2015-10-01 16:13:56 -0700135 private KryoNamespace clusterMsgSerializer;
136
alshabib10580802015-02-18 18:30:33 -0800137 @Activate
138 public void activate() {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700139 kryoBuilder = new KryoNamespace.Builder()
Charles Chan138cd5a2015-09-29 16:57:41 -0700140 .register(KryoNamespaces.API)
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700141 .register(DefaultGroup.class,
142 DefaultGroupBucket.class,
143 DefaultGroupDescription.class,
144 DefaultGroupKey.class,
145 GroupDescription.Type.class,
146 Group.GroupState.class,
147 GroupBuckets.class,
148 DefaultGroupId.class,
149 GroupStoreMessage.class,
150 GroupStoreMessage.Type.class,
151 UpdateType.class,
152 GroupStoreMessageSubjects.class,
153 MultiValuedTimestamp.class,
154 GroupStoreKeyMapKey.class,
155 GroupStoreIdMapKey.class,
156 GroupStoreMapKey.class
Charles Chan138cd5a2015-09-29 16:57:41 -0700157 );
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700158
HIGUCHI Yuta180d70f2015-10-01 16:13:56 -0700159 clusterMsgSerializer = kryoBuilder.build();
160
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700161 messageHandlingExecutor = Executors.
162 newFixedThreadPool(MESSAGE_HANDLER_THREAD_POOL_SIZE,
163 groupedThreads("onos/store/group",
164 "message-handlers"));
Madan Jampani01e05fb2015-08-13 13:29:36 -0700165
166 clusterCommunicator.addSubscriber(GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST,
HIGUCHI Yuta180d70f2015-10-01 16:13:56 -0700167 clusterMsgSerializer::deserialize,
Madan Jampani01e05fb2015-08-13 13:29:36 -0700168 this::process,
169 messageHandlingExecutor);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700170
Madan Jampani0b847532016-03-03 13:44:15 -0800171 log.debug("Creating Consistent map onos-group-store-keymap");
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700172
Madan Jampani0b847532016-03-03 13:44:15 -0800173 groupStoreEntriesByKey = storageService.<GroupStoreKeyMapKey, StoredGroupEntry>consistentMapBuilder()
174 .withName("onos-group-store-keymap")
175 .withSerializer(Serializer.using(kryoBuilder.build()))
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700176 .build();
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700177 groupStoreEntriesByKey.addListener(new GroupStoreKeyMapListener());
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700178 log.debug("Current size of groupstorekeymap:{}",
179 groupStoreEntriesByKey.size());
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700180
Madan Jampani0b847532016-03-03 13:44:15 -0800181 log.debug("Creating Consistent map pendinggroupkeymap");
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700182
Madan Jampani0b847532016-03-03 13:44:15 -0800183 auditPendingReqQueue = storageService.<GroupStoreKeyMapKey, StoredGroupEntry>consistentMapBuilder()
184 .withName("onos-pending-group-keymap")
185 .withSerializer(Serializer.using(kryoBuilder.build()))
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700186 .build();
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700187 log.debug("Current size of pendinggroupkeymap:{}",
188 auditPendingReqQueue.size());
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700189
alshabib10580802015-02-18 18:30:33 -0800190 log.info("Started");
191 }
192
193 @Deactivate
194 public void deactivate() {
HIGUCHI Yuta180d70f2015-10-01 16:13:56 -0700195 clusterCommunicator.removeSubscriber(GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST);
Thomas Vachuska152f9fd2015-04-02 16:28:13 -0700196 groupStoreEntriesByKey.destroy();
Thomas Vachuska152f9fd2015-04-02 16:28:13 -0700197 auditPendingReqQueue.destroy();
alshabib10580802015-02-18 18:30:33 -0800198 log.info("Stopped");
199 }
200
alshabib10580802015-02-18 18:30:33 -0800201 private static NewConcurrentHashMap<GroupId, Group>
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700202 lazyEmptyExtraneousGroupIdTable() {
alshabib10580802015-02-18 18:30:33 -0800203 return NewConcurrentHashMap.<GroupId, Group>ifNeeded();
204 }
205
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700206 private static NewConcurrentHashMap<GroupId, StoredGroupEntry>
207 lazyEmptyGroupIdTable() {
208 return NewConcurrentHashMap.<GroupId, StoredGroupEntry>ifNeeded();
209 }
210
alshabib10580802015-02-18 18:30:33 -0800211 /**
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700212 * Returns the group store eventual consistent key map.
alshabib10580802015-02-18 18:30:33 -0800213 *
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700214 * @return Map representing group key table.
alshabib10580802015-02-18 18:30:33 -0800215 */
Madan Jampani0b847532016-03-03 13:44:15 -0800216 private Map<GroupStoreKeyMapKey, StoredGroupEntry>
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700217 getGroupStoreKeyMap() {
Madan Jampani0b847532016-03-03 13:44:15 -0800218 return groupStoreEntriesByKey.asJavaMap();
alshabib10580802015-02-18 18:30:33 -0800219 }
220
221 /**
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700222 * Returns the group id table for specified device.
alshabib10580802015-02-18 18:30:33 -0800223 *
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700224 * @param deviceId identifier of the device
225 * @return Map representing group key table of given device.
alshabib10580802015-02-18 18:30:33 -0800226 */
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700227 private ConcurrentMap<GroupId, StoredGroupEntry> getGroupIdTable(DeviceId deviceId) {
228 return createIfAbsentUnchecked(groupEntriesById,
229 deviceId, lazyEmptyGroupIdTable());
alshabib10580802015-02-18 18:30:33 -0800230 }
231
232 /**
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700233 * Returns the pending group request table.
alshabib10580802015-02-18 18:30:33 -0800234 *
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700235 * @return Map representing group key table.
alshabib10580802015-02-18 18:30:33 -0800236 */
Madan Jampani0b847532016-03-03 13:44:15 -0800237 private Map<GroupStoreKeyMapKey, StoredGroupEntry>
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700238 getPendingGroupKeyTable() {
Madan Jampani0b847532016-03-03 13:44:15 -0800239 return auditPendingReqQueue.asJavaMap();
alshabib10580802015-02-18 18:30:33 -0800240 }
241
242 /**
243 * Returns the extraneous group id table for specified device.
244 *
245 * @param deviceId identifier of the device
246 * @return Map representing group key table of given device.
247 */
248 private ConcurrentMap<GroupId, Group>
249 getExtraneousGroupIdTable(DeviceId deviceId) {
250 return createIfAbsentUnchecked(extraneousGroupEntriesById,
251 deviceId,
252 lazyEmptyExtraneousGroupIdTable());
253 }
254
255 /**
256 * Returns the number of groups for the specified device in the store.
257 *
258 * @return number of groups for the specified device
259 */
260 @Override
261 public int getGroupCount(DeviceId deviceId) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700262 return (getGroups(deviceId) != null) ?
263 Iterables.size(getGroups(deviceId)) : 0;
alshabib10580802015-02-18 18:30:33 -0800264 }
265
266 /**
267 * Returns the groups associated with a device.
268 *
269 * @param deviceId the device ID
270 *
271 * @return the group entries
272 */
273 @Override
274 public Iterable<Group> getGroups(DeviceId deviceId) {
Charles Chanf4838a72015-12-07 18:13:45 -0800275 // Let ImmutableSet.copyOf do the type conversion
276 return ImmutableSet.copyOf(getStoredGroups(deviceId));
alshabib10580802015-02-18 18:30:33 -0800277 }
278
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700279 private Iterable<StoredGroupEntry> getStoredGroups(DeviceId deviceId) {
Charles Chanf4838a72015-12-07 18:13:45 -0800280 NodeId master = mastershipService.getMasterFor(deviceId);
281 if (master == null) {
282 log.debug("Failed to getGroups: No master for {}", deviceId);
283 return Collections.emptySet();
284 }
285
286 Set<StoredGroupEntry> storedGroups = getGroupStoreKeyMap().values()
287 .stream()
288 .filter(input -> input.deviceId().equals(deviceId))
289 .collect(Collectors.toSet());
290 return ImmutableSet.copyOf(storedGroups);
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700291 }
292
alshabib10580802015-02-18 18:30:33 -0800293 /**
294 * Returns the stored group entry.
295 *
296 * @param deviceId the device ID
297 * @param appCookie the group key
298 *
299 * @return a group associated with the key
300 */
301 @Override
302 public Group getGroup(DeviceId deviceId, GroupKey appCookie) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700303 return getStoredGroupEntry(deviceId, appCookie);
304 }
305
306 private StoredGroupEntry getStoredGroupEntry(DeviceId deviceId,
307 GroupKey appCookie) {
308 return getGroupStoreKeyMap().get(new GroupStoreKeyMapKey(deviceId,
309 appCookie));
310 }
311
312 @Override
313 public Group getGroup(DeviceId deviceId, GroupId groupId) {
314 return getStoredGroupEntry(deviceId, groupId);
315 }
316
317 private StoredGroupEntry getStoredGroupEntry(DeviceId deviceId,
318 GroupId groupId) {
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700319 return getGroupIdTable(deviceId).get(groupId);
alshabib10580802015-02-18 18:30:33 -0800320 }
321
322 private int getFreeGroupIdValue(DeviceId deviceId) {
323 int freeId = groupIdGen.incrementAndGet();
324
325 while (true) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700326 Group existing = getGroup(deviceId, new DefaultGroupId(freeId));
alshabib10580802015-02-18 18:30:33 -0800327 if (existing == null) {
328 existing = (
329 extraneousGroupEntriesById.get(deviceId) != null) ?
330 extraneousGroupEntriesById.get(deviceId).
331 get(new DefaultGroupId(freeId)) :
332 null;
333 }
334 if (existing != null) {
335 freeId = groupIdGen.incrementAndGet();
336 } else {
337 break;
338 }
339 }
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700340 log.debug("getFreeGroupIdValue: Next Free ID is {}", freeId);
alshabib10580802015-02-18 18:30:33 -0800341 return freeId;
342 }
343
344 /**
345 * Stores a new group entry using the information from group description.
346 *
347 * @param groupDesc group description to be used to create group entry
348 */
349 @Override
350 public void storeGroupDescription(GroupDescription groupDesc) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700351 log.debug("In storeGroupDescription");
alshabib10580802015-02-18 18:30:33 -0800352 // Check if a group is existing with the same key
Saurav Das8a0732e2015-11-20 15:27:53 -0800353 Group existingGroup = getGroup(groupDesc.deviceId(), groupDesc.appCookie());
354 if (existingGroup != null) {
Saurav Das4ce45962015-11-24 23:21:05 -0800355 log.warn("Group already exists with the same key {} in dev:{} with id:0x{}",
Saurav Das8a0732e2015-11-20 15:27:53 -0800356 groupDesc.appCookie(), groupDesc.deviceId(),
357 Integer.toHexString(existingGroup.id().id()));
alshabib10580802015-02-18 18:30:33 -0800358 return;
359 }
360
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700361 // Check if group to be created by a remote instance
Madan Jampani175e8fd2015-05-20 14:10:45 -0700362 if (mastershipService.getLocalRole(groupDesc.deviceId()) != MastershipRole.MASTER) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700363 log.debug("storeGroupDescription: Device {} local role is not MASTER",
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700364 groupDesc.deviceId());
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700365 if (mastershipService.getMasterFor(groupDesc.deviceId()) == null) {
366 log.error("No Master for device {}..."
367 + "Can not perform add group operation",
368 groupDesc.deviceId());
369 //TODO: Send Group operation failure event
370 return;
371 }
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700372 GroupStoreMessage groupOp = GroupStoreMessage.
373 createGroupAddRequestMsg(groupDesc.deviceId(),
374 groupDesc);
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700375
Madan Jampani175e8fd2015-05-20 14:10:45 -0700376 clusterCommunicator.unicast(groupOp,
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700377 GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST,
HIGUCHI Yuta180d70f2015-10-01 16:13:56 -0700378 clusterMsgSerializer::serialize,
Madan Jampani175e8fd2015-05-20 14:10:45 -0700379 mastershipService.getMasterFor(groupDesc.deviceId())).whenComplete((result, error) -> {
380 if (error != null) {
381 log.warn("Failed to send request to master: {} to {}",
382 groupOp,
383 mastershipService.getMasterFor(groupDesc.deviceId()));
384 //TODO: Send Group operation failure event
385 } else {
386 log.debug("Sent Group operation request for device {} "
387 + "to remote MASTER {}",
388 groupDesc.deviceId(),
389 mastershipService.getMasterFor(groupDesc.deviceId()));
390 }
391 });
alshabib10580802015-02-18 18:30:33 -0800392 return;
393 }
394
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700395 log.debug("Store group for device {} is getting handled locally",
396 groupDesc.deviceId());
alshabib10580802015-02-18 18:30:33 -0800397 storeGroupDescriptionInternal(groupDesc);
398 }
399
Srikanth Vavilapallie48b3cf2015-07-06 11:43:07 -0700400 private Group getMatchingExtraneousGroupbyId(DeviceId deviceId, Integer groupId) {
401 ConcurrentMap<GroupId, Group> extraneousMap =
402 extraneousGroupEntriesById.get(deviceId);
403 if (extraneousMap == null) {
404 return null;
405 }
406 return extraneousMap.get(new DefaultGroupId(groupId));
407 }
408
409 private Group getMatchingExtraneousGroupbyBuckets(DeviceId deviceId,
410 GroupBuckets buckets) {
411 ConcurrentMap<GroupId, Group> extraneousMap =
412 extraneousGroupEntriesById.get(deviceId);
413 if (extraneousMap == null) {
414 return null;
415 }
416
417 for (Group extraneousGroup:extraneousMap.values()) {
418 if (extraneousGroup.buckets().equals(buckets)) {
419 return extraneousGroup;
420 }
421 }
422 return null;
423 }
424
alshabib10580802015-02-18 18:30:33 -0800425 private void storeGroupDescriptionInternal(GroupDescription groupDesc) {
426 // Check if a group is existing with the same key
427 if (getGroup(groupDesc.deviceId(), groupDesc.appCookie()) != null) {
428 return;
429 }
430
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700431 if (deviceAuditStatus.get(groupDesc.deviceId()) == null) {
432 // Device group audit has not completed yet
433 // Add this group description to pending group key table
434 // Create a group entry object with Dummy Group ID
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700435 log.debug("storeGroupDescriptionInternal: Device {} AUDIT pending...Queuing Group ADD request",
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700436 groupDesc.deviceId());
437 StoredGroupEntry group = new DefaultGroup(dummyGroupId, groupDesc);
438 group.setState(GroupState.WAITING_AUDIT_COMPLETE);
Madan Jampani0b847532016-03-03 13:44:15 -0800439 Map<GroupStoreKeyMapKey, StoredGroupEntry> pendingKeyTable =
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700440 getPendingGroupKeyTable();
441 pendingKeyTable.put(new GroupStoreKeyMapKey(groupDesc.deviceId(),
442 groupDesc.appCookie()),
443 group);
444 return;
445 }
446
Srikanth Vavilapallie48b3cf2015-07-06 11:43:07 -0700447 Group matchingExtraneousGroup = null;
448 if (groupDesc.givenGroupId() != null) {
449 //Check if there is a extraneous group existing with the same Id
450 matchingExtraneousGroup = getMatchingExtraneousGroupbyId(
451 groupDesc.deviceId(), groupDesc.givenGroupId());
452 if (matchingExtraneousGroup != null) {
453 log.debug("storeGroupDescriptionInternal: Matching extraneous group found in Device {} for group id {}",
454 groupDesc.deviceId(),
455 groupDesc.givenGroupId());
456 //Check if the group buckets matches with user provided buckets
457 if (matchingExtraneousGroup.buckets().equals(groupDesc.buckets())) {
458 //Group is already existing with the same buckets and Id
459 // Create a group entry object
460 log.debug("storeGroupDescriptionInternal: Buckets also matching in Device {} for group id {}",
461 groupDesc.deviceId(),
462 groupDesc.givenGroupId());
463 StoredGroupEntry group = new DefaultGroup(
464 matchingExtraneousGroup.id(), groupDesc);
465 // Insert the newly created group entry into key and id maps
466 getGroupStoreKeyMap().
467 put(new GroupStoreKeyMapKey(groupDesc.deviceId(),
468 groupDesc.appCookie()), group);
469 // Ensure it also inserted into group id based table to
470 // avoid any chances of duplication in group id generation
471 getGroupIdTable(groupDesc.deviceId()).
472 put(matchingExtraneousGroup.id(), group);
473 addOrUpdateGroupEntry(matchingExtraneousGroup);
474 removeExtraneousGroupEntry(matchingExtraneousGroup);
475 return;
476 } else {
477 //Group buckets are not matching. Update group
478 //with user provided buckets.
479 //TODO
480 log.debug("storeGroupDescriptionInternal: Buckets are not matching in Device {} for group id {}",
481 groupDesc.deviceId(),
482 groupDesc.givenGroupId());
483 }
484 }
485 } else {
486 //Check if there is an extraneous group with user provided buckets
487 matchingExtraneousGroup = getMatchingExtraneousGroupbyBuckets(
488 groupDesc.deviceId(), groupDesc.buckets());
489 if (matchingExtraneousGroup != null) {
490 //Group is already existing with the same buckets.
491 //So reuse this group.
492 log.debug("storeGroupDescriptionInternal: Matching extraneous group found in Device {}",
493 groupDesc.deviceId());
494 //Create a group entry object
495 StoredGroupEntry group = new DefaultGroup(
496 matchingExtraneousGroup.id(), groupDesc);
497 // Insert the newly created group entry into key and id maps
498 getGroupStoreKeyMap().
499 put(new GroupStoreKeyMapKey(groupDesc.deviceId(),
500 groupDesc.appCookie()), group);
501 // Ensure it also inserted into group id based table to
502 // avoid any chances of duplication in group id generation
503 getGroupIdTable(groupDesc.deviceId()).
504 put(matchingExtraneousGroup.id(), group);
505 addOrUpdateGroupEntry(matchingExtraneousGroup);
506 removeExtraneousGroupEntry(matchingExtraneousGroup);
507 return;
508 } else {
509 //TODO: Check if there are any empty groups that can be used here
510 log.debug("storeGroupDescriptionInternal: No matching extraneous groups found in Device {}",
511 groupDesc.deviceId());
512 }
513 }
514
Saurav Das100e3b82015-04-30 11:12:10 -0700515 GroupId id = null;
516 if (groupDesc.givenGroupId() == null) {
517 // Get a new group identifier
518 id = new DefaultGroupId(getFreeGroupIdValue(groupDesc.deviceId()));
519 } else {
520 id = new DefaultGroupId(groupDesc.givenGroupId());
521 }
alshabib10580802015-02-18 18:30:33 -0800522 // Create a group entry object
523 StoredGroupEntry group = new DefaultGroup(id, groupDesc);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700524 // Insert the newly created group entry into key and id maps
525 getGroupStoreKeyMap().
526 put(new GroupStoreKeyMapKey(groupDesc.deviceId(),
527 groupDesc.appCookie()), group);
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700528 // Ensure it also inserted into group id based table to
529 // avoid any chances of duplication in group id generation
530 getGroupIdTable(groupDesc.deviceId()).
531 put(id, group);
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700532 log.debug("storeGroupDescriptionInternal: Processing Group ADD request for Id {} in device {}",
533 id,
534 groupDesc.deviceId());
alshabib10580802015-02-18 18:30:33 -0800535 notifyDelegate(new GroupEvent(GroupEvent.Type.GROUP_ADD_REQUESTED,
536 group));
537 }
538
539 /**
540 * Updates the existing group entry with the information
541 * from group description.
542 *
543 * @param deviceId the device ID
544 * @param oldAppCookie the current group key
545 * @param type update type
546 * @param newBuckets group buckets for updates
547 * @param newAppCookie optional new group key
548 */
549 @Override
550 public void updateGroupDescription(DeviceId deviceId,
551 GroupKey oldAppCookie,
552 UpdateType type,
553 GroupBuckets newBuckets,
554 GroupKey newAppCookie) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700555 // Check if group update to be done by a remote instance
sangho52abe3a2015-05-05 14:13:34 -0700556 if (mastershipService.getMasterFor(deviceId) != null &&
557 mastershipService.getLocalRole(deviceId) != MastershipRole.MASTER) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700558 log.debug("updateGroupDescription: Device {} local role is not MASTER",
559 deviceId);
560 if (mastershipService.getMasterFor(deviceId) == null) {
561 log.error("No Master for device {}..."
562 + "Can not perform update group operation",
563 deviceId);
564 //TODO: Send Group operation failure event
565 return;
566 }
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700567 GroupStoreMessage groupOp = GroupStoreMessage.
568 createGroupUpdateRequestMsg(deviceId,
569 oldAppCookie,
570 type,
571 newBuckets,
572 newAppCookie);
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700573
Madan Jampani175e8fd2015-05-20 14:10:45 -0700574 clusterCommunicator.unicast(groupOp,
575 GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST,
HIGUCHI Yuta180d70f2015-10-01 16:13:56 -0700576 clusterMsgSerializer::serialize,
Madan Jampani175e8fd2015-05-20 14:10:45 -0700577 mastershipService.getMasterFor(deviceId)).whenComplete((result, error) -> {
578 if (error != null) {
579 log.warn("Failed to send request to master: {} to {}",
580 groupOp,
581 mastershipService.getMasterFor(deviceId), error);
582 }
583 //TODO: Send Group operation failure event
584 });
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700585 return;
586 }
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700587 log.debug("updateGroupDescription for device {} is getting handled locally",
588 deviceId);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700589 updateGroupDescriptionInternal(deviceId,
590 oldAppCookie,
591 type,
592 newBuckets,
593 newAppCookie);
594 }
595
596 private void updateGroupDescriptionInternal(DeviceId deviceId,
597 GroupKey oldAppCookie,
598 UpdateType type,
599 GroupBuckets newBuckets,
600 GroupKey newAppCookie) {
alshabib10580802015-02-18 18:30:33 -0800601 // Check if a group is existing with the provided key
602 Group oldGroup = getGroup(deviceId, oldAppCookie);
603 if (oldGroup == null) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700604 log.warn("updateGroupDescriptionInternal: Group not found...strange");
alshabib10580802015-02-18 18:30:33 -0800605 return;
606 }
607
608 List<GroupBucket> newBucketList = getUpdatedBucketList(oldGroup,
609 type,
610 newBuckets);
611 if (newBucketList != null) {
612 // Create a new group object from the old group
613 GroupBuckets updatedBuckets = new GroupBuckets(newBucketList);
614 GroupKey newCookie = (newAppCookie != null) ? newAppCookie : oldAppCookie;
615 GroupDescription updatedGroupDesc = new DefaultGroupDescription(
616 oldGroup.deviceId(),
617 oldGroup.type(),
618 updatedBuckets,
619 newCookie,
Saurav Das100e3b82015-04-30 11:12:10 -0700620 oldGroup.givenGroupId(),
alshabib10580802015-02-18 18:30:33 -0800621 oldGroup.appId());
622 StoredGroupEntry newGroup = new DefaultGroup(oldGroup.id(),
623 updatedGroupDesc);
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700624 log.debug("updateGroupDescriptionInternal: group entry {} in device {} moving from {} to PENDING_UPDATE",
625 oldGroup.id(),
626 oldGroup.deviceId(),
627 oldGroup.state());
alshabib10580802015-02-18 18:30:33 -0800628 newGroup.setState(GroupState.PENDING_UPDATE);
629 newGroup.setLife(oldGroup.life());
630 newGroup.setPackets(oldGroup.packets());
631 newGroup.setBytes(oldGroup.bytes());
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700632 //Update the group entry in groupkey based map.
633 //Update to groupid based map will happen in the
634 //groupkey based map update listener
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700635 log.debug("updateGroupDescriptionInternal with type {}: Group updated with buckets",
636 type);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700637 getGroupStoreKeyMap().
638 put(new GroupStoreKeyMapKey(newGroup.deviceId(),
639 newGroup.appCookie()), newGroup);
alshabib10580802015-02-18 18:30:33 -0800640 notifyDelegate(new GroupEvent(Type.GROUP_UPDATE_REQUESTED, newGroup));
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700641 } else {
642 log.warn("updateGroupDescriptionInternal with type {}: No "
643 + "change in the buckets in update", type);
alshabib10580802015-02-18 18:30:33 -0800644 }
645 }
646
647 private List<GroupBucket> getUpdatedBucketList(Group oldGroup,
648 UpdateType type,
649 GroupBuckets buckets) {
650 GroupBuckets oldBuckets = oldGroup.buckets();
Sho SHIMIZU7a4087b2015-09-10 09:23:16 -0700651 List<GroupBucket> newBucketList = new ArrayList<>(oldBuckets.buckets());
alshabib10580802015-02-18 18:30:33 -0800652 boolean groupDescUpdated = false;
653
654 if (type == UpdateType.ADD) {
655 // Check if the any of the new buckets are part of
656 // the old bucket list
657 for (GroupBucket addBucket:buckets.buckets()) {
658 if (!newBucketList.contains(addBucket)) {
659 newBucketList.add(addBucket);
660 groupDescUpdated = true;
661 }
662 }
663 } else if (type == UpdateType.REMOVE) {
664 // Check if the to be removed buckets are part of the
665 // old bucket list
666 for (GroupBucket removeBucket:buckets.buckets()) {
667 if (newBucketList.contains(removeBucket)) {
668 newBucketList.remove(removeBucket);
669 groupDescUpdated = true;
670 }
671 }
672 }
673
674 if (groupDescUpdated) {
675 return newBucketList;
676 } else {
677 return null;
678 }
679 }
680
681 /**
682 * Triggers deleting the existing group entry.
683 *
684 * @param deviceId the device ID
685 * @param appCookie the group key
686 */
687 @Override
688 public void deleteGroupDescription(DeviceId deviceId,
689 GroupKey appCookie) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700690 // Check if group to be deleted by a remote instance
691 if (mastershipService.
692 getLocalRole(deviceId) != MastershipRole.MASTER) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700693 log.debug("deleteGroupDescription: Device {} local role is not MASTER",
694 deviceId);
695 if (mastershipService.getMasterFor(deviceId) == null) {
696 log.error("No Master for device {}..."
697 + "Can not perform delete group operation",
698 deviceId);
699 //TODO: Send Group operation failure event
700 return;
701 }
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700702 GroupStoreMessage groupOp = GroupStoreMessage.
703 createGroupDeleteRequestMsg(deviceId,
704 appCookie);
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700705
Madan Jampani175e8fd2015-05-20 14:10:45 -0700706 clusterCommunicator.unicast(groupOp,
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700707 GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST,
HIGUCHI Yuta180d70f2015-10-01 16:13:56 -0700708 clusterMsgSerializer::serialize,
Madan Jampani175e8fd2015-05-20 14:10:45 -0700709 mastershipService.getMasterFor(deviceId)).whenComplete((result, error) -> {
710 if (error != null) {
711 log.warn("Failed to send request to master: {} to {}",
712 groupOp,
713 mastershipService.getMasterFor(deviceId), error);
714 }
715 //TODO: Send Group operation failure event
716 });
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700717 return;
718 }
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700719 log.debug("deleteGroupDescription in device {} is getting handled locally",
720 deviceId);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700721 deleteGroupDescriptionInternal(deviceId, appCookie);
722 }
723
724 private void deleteGroupDescriptionInternal(DeviceId deviceId,
725 GroupKey appCookie) {
alshabib10580802015-02-18 18:30:33 -0800726 // Check if a group is existing with the provided key
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700727 StoredGroupEntry existing = getStoredGroupEntry(deviceId, appCookie);
alshabib10580802015-02-18 18:30:33 -0800728 if (existing == null) {
729 return;
730 }
731
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700732 log.debug("deleteGroupDescriptionInternal: group entry {} in device {} moving from {} to PENDING_DELETE",
733 existing.id(),
734 existing.deviceId(),
735 existing.state());
alshabib10580802015-02-18 18:30:33 -0800736 synchronized (existing) {
737 existing.setState(GroupState.PENDING_DELETE);
738 }
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700739 log.debug("deleteGroupDescriptionInternal: in device {} issuing GROUP_REMOVE_REQUESTED",
740 deviceId);
alshabib10580802015-02-18 18:30:33 -0800741 notifyDelegate(new GroupEvent(Type.GROUP_REMOVE_REQUESTED, existing));
742 }
743
744 /**
745 * Stores a new group entry, or updates an existing entry.
746 *
747 * @param group group entry
748 */
749 @Override
750 public void addOrUpdateGroupEntry(Group group) {
751 // check if this new entry is an update to an existing entry
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700752 StoredGroupEntry existing = getStoredGroupEntry(group.deviceId(),
753 group.id());
alshabib10580802015-02-18 18:30:33 -0800754 GroupEvent event = null;
755
756 if (existing != null) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700757 log.debug("addOrUpdateGroupEntry: updating group entry {} in device {}",
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700758 group.id(),
759 group.deviceId());
alshabib10580802015-02-18 18:30:33 -0800760 synchronized (existing) {
Srikanth Vavilapalli10e75cd2015-04-13 16:21:24 -0700761 for (GroupBucket bucket:group.buckets().buckets()) {
Sho SHIMIZU30d639b2015-05-05 09:30:35 -0700762 Optional<GroupBucket> matchingBucket =
Srikanth Vavilapalli10e75cd2015-04-13 16:21:24 -0700763 existing.buckets().buckets()
764 .stream()
765 .filter((existingBucket)->(existingBucket.equals(bucket)))
766 .findFirst();
767 if (matchingBucket.isPresent()) {
768 ((StoredGroupBucketEntry) matchingBucket.
769 get()).setPackets(bucket.packets());
770 ((StoredGroupBucketEntry) matchingBucket.
771 get()).setBytes(bucket.bytes());
772 } else {
773 log.warn("addOrUpdateGroupEntry: No matching "
774 + "buckets to update stats");
775 }
776 }
alshabib10580802015-02-18 18:30:33 -0800777 existing.setLife(group.life());
778 existing.setPackets(group.packets());
779 existing.setBytes(group.bytes());
Srikanth Vavilapalli5428b6c2015-05-14 20:22:47 -0700780 if ((existing.state() == GroupState.PENDING_ADD) ||
781 (existing.state() == GroupState.PENDING_ADD_RETRY)) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700782 log.debug("addOrUpdateGroupEntry: group entry {} in device {} moving from {} to ADDED",
783 existing.id(),
784 existing.deviceId(),
Srikanth Vavilapalli5428b6c2015-05-14 20:22:47 -0700785 existing.state());
alshabib10580802015-02-18 18:30:33 -0800786 existing.setState(GroupState.ADDED);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700787 existing.setIsGroupStateAddedFirstTime(true);
alshabib10580802015-02-18 18:30:33 -0800788 event = new GroupEvent(Type.GROUP_ADDED, existing);
789 } else {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700790 log.debug("addOrUpdateGroupEntry: group entry {} in device {} moving from {} to ADDED",
791 existing.id(),
792 existing.deviceId(),
793 GroupState.PENDING_UPDATE);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700794 existing.setState(GroupState.ADDED);
795 existing.setIsGroupStateAddedFirstTime(false);
alshabib10580802015-02-18 18:30:33 -0800796 event = new GroupEvent(Type.GROUP_UPDATED, existing);
797 }
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700798 //Re-PUT map entries to trigger map update events
799 getGroupStoreKeyMap().
800 put(new GroupStoreKeyMapKey(existing.deviceId(),
801 existing.appCookie()), existing);
alshabib10580802015-02-18 18:30:33 -0800802 }
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700803 } else {
804 log.warn("addOrUpdateGroupEntry: Group update "
805 + "happening for a non-existing entry in the map");
alshabib10580802015-02-18 18:30:33 -0800806 }
807
808 if (event != null) {
809 notifyDelegate(event);
810 }
811 }
812
813 /**
814 * Removes the group entry from store.
815 *
816 * @param group group entry
817 */
818 @Override
819 public void removeGroupEntry(Group group) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700820 StoredGroupEntry existing = getStoredGroupEntry(group.deviceId(),
821 group.id());
alshabib10580802015-02-18 18:30:33 -0800822
823 if (existing != null) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700824 log.debug("removeGroupEntry: removing group entry {} in device {}",
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700825 group.id(),
826 group.deviceId());
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700827 //Removal from groupid based map will happen in the
828 //map update listener
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700829 getGroupStoreKeyMap().remove(new GroupStoreKeyMapKey(existing.deviceId(),
830 existing.appCookie()));
alshabib10580802015-02-18 18:30:33 -0800831 notifyDelegate(new GroupEvent(Type.GROUP_REMOVED, existing));
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700832 } else {
833 log.warn("removeGroupEntry for {} in device{} is "
834 + "not existing in our maps",
835 group.id(),
836 group.deviceId());
alshabib10580802015-02-18 18:30:33 -0800837 }
838 }
839
840 @Override
Charles Chan0c7c43b2016-01-14 17:39:20 -0800841 public void purgeGroupEntry(DeviceId deviceId) {
842 Set<Entry<GroupStoreKeyMapKey, StoredGroupEntry>> entryPendingRemove =
843 new HashSet<>();
844
Madan Jampani0b847532016-03-03 13:44:15 -0800845 getGroupStoreKeyMap().entrySet().stream()
Charles Chan0c7c43b2016-01-14 17:39:20 -0800846 .filter(entry -> entry.getKey().deviceId().equals(deviceId))
847 .forEach(entryPendingRemove::add);
848
849 entryPendingRemove.forEach(entry -> {
850 groupStoreEntriesByKey.remove(entry.getKey());
851 notifyDelegate(new GroupEvent(Type.GROUP_REMOVED, entry.getValue()));
852 });
853 }
854
855 @Override
alshabib10580802015-02-18 18:30:33 -0800856 public void deviceInitialAuditCompleted(DeviceId deviceId,
857 boolean completed) {
858 synchronized (deviceAuditStatus) {
859 if (completed) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700860 log.debug("AUDIT completed for device {}",
861 deviceId);
alshabib10580802015-02-18 18:30:33 -0800862 deviceAuditStatus.put(deviceId, true);
863 // Execute all pending group requests
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700864 List<StoredGroupEntry> pendingGroupRequests =
865 getPendingGroupKeyTable().values()
866 .stream()
867 .filter(g-> g.deviceId().equals(deviceId))
868 .collect(Collectors.toList());
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700869 log.debug("processing pending group add requests for device {} and number of pending requests {}",
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700870 deviceId,
871 pendingGroupRequests.size());
872 for (Group group:pendingGroupRequests) {
alshabib10580802015-02-18 18:30:33 -0800873 GroupDescription tmp = new DefaultGroupDescription(
874 group.deviceId(),
875 group.type(),
876 group.buckets(),
877 group.appCookie(),
Saurav Das100e3b82015-04-30 11:12:10 -0700878 group.givenGroupId(),
alshabib10580802015-02-18 18:30:33 -0800879 group.appId());
880 storeGroupDescriptionInternal(tmp);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700881 getPendingGroupKeyTable().
882 remove(new GroupStoreKeyMapKey(deviceId, group.appCookie()));
alshabib10580802015-02-18 18:30:33 -0800883 }
alshabib10580802015-02-18 18:30:33 -0800884 } else {
Thomas Vachuskac40d4632015-04-09 16:55:03 -0700885 Boolean audited = deviceAuditStatus.get(deviceId);
886 if (audited != null && audited) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700887 log.debug("Clearing AUDIT status for device {}", deviceId);
alshabib10580802015-02-18 18:30:33 -0800888 deviceAuditStatus.put(deviceId, false);
889 }
890 }
891 }
892 }
893
894 @Override
895 public boolean deviceInitialAuditStatus(DeviceId deviceId) {
896 synchronized (deviceAuditStatus) {
Thomas Vachuskac40d4632015-04-09 16:55:03 -0700897 Boolean audited = deviceAuditStatus.get(deviceId);
898 return audited != null && audited;
alshabib10580802015-02-18 18:30:33 -0800899 }
900 }
901
902 @Override
903 public void groupOperationFailed(DeviceId deviceId, GroupOperation operation) {
904
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700905 StoredGroupEntry existing = getStoredGroupEntry(deviceId,
906 operation.groupId());
alshabib10580802015-02-18 18:30:33 -0800907
908 if (existing == null) {
909 log.warn("No group entry with ID {} found ", operation.groupId());
910 return;
911 }
912
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700913 log.warn("groupOperationFailed: group operation {} failed"
914 + "for group {} in device {}",
915 operation.opType(),
916 existing.id(),
917 existing.deviceId());
alshabib10580802015-02-18 18:30:33 -0800918 switch (operation.opType()) {
919 case ADD:
Srikanth Vavilapalli5428b6c2015-05-14 20:22:47 -0700920 if (existing.state() == GroupState.PENDING_ADD) {
921 //TODO: Need to add support for passing the group
922 //operation failure reason from group provider.
923 //If the error type is anything other than GROUP_EXISTS,
924 //then the GROUP_ADD_FAILED event should be raised even
925 //in PENDING_ADD_RETRY state also.
926 notifyDelegate(new GroupEvent(Type.GROUP_ADD_FAILED, existing));
927 log.warn("groupOperationFailed: cleaningup "
928 + "group {} from store in device {}....",
929 existing.id(),
930 existing.deviceId());
931 //Removal from groupid based map will happen in the
932 //map update listener
933 getGroupStoreKeyMap().remove(new GroupStoreKeyMapKey(existing.deviceId(),
934 existing.appCookie()));
935 }
alshabib10580802015-02-18 18:30:33 -0800936 break;
937 case MODIFY:
938 notifyDelegate(new GroupEvent(Type.GROUP_UPDATE_FAILED, existing));
939 break;
940 case DELETE:
941 notifyDelegate(new GroupEvent(Type.GROUP_REMOVE_FAILED, existing));
942 break;
943 default:
944 log.warn("Unknown group operation type {}", operation.opType());
945 }
alshabib10580802015-02-18 18:30:33 -0800946 }
947
948 @Override
949 public void addOrUpdateExtraneousGroupEntry(Group group) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700950 log.debug("add/update extraneous group entry {} in device {}",
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700951 group.id(),
952 group.deviceId());
alshabib10580802015-02-18 18:30:33 -0800953 ConcurrentMap<GroupId, Group> extraneousIdTable =
954 getExtraneousGroupIdTable(group.deviceId());
955 extraneousIdTable.put(group.id(), group);
Srikanth Vavilapallie48b3cf2015-07-06 11:43:07 -0700956 // Don't remove the extraneous groups, instead re-use it when
957 // a group request comes with the same set of buckets
alshabib10580802015-02-18 18:30:33 -0800958 }
959
960 @Override
961 public void removeExtraneousGroupEntry(Group group) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700962 log.debug("remove extraneous group entry {} of device {} from store",
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700963 group.id(),
964 group.deviceId());
alshabib10580802015-02-18 18:30:33 -0800965 ConcurrentMap<GroupId, Group> extraneousIdTable =
966 getExtraneousGroupIdTable(group.deviceId());
967 extraneousIdTable.remove(group.id());
968 }
969
970 @Override
971 public Iterable<Group> getExtraneousGroups(DeviceId deviceId) {
972 // flatten and make iterator unmodifiable
973 return FluentIterable.from(
974 getExtraneousGroupIdTable(deviceId).values());
975 }
976
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700977 /**
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700978 * Map handler to receive any events when the group key map is updated.
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700979 */
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700980 private class GroupStoreKeyMapListener implements
Madan Jampani0b847532016-03-03 13:44:15 -0800981 MapEventListener<GroupStoreKeyMapKey, StoredGroupEntry> {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700982
983 @Override
Madan Jampani0b847532016-03-03 13:44:15 -0800984 public void event(MapEvent<GroupStoreKeyMapKey, StoredGroupEntry> mapEvent) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700985 GroupEvent groupEvent = null;
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700986 GroupStoreKeyMapKey key = mapEvent.key();
Madan Jampani0b847532016-03-03 13:44:15 -0800987 StoredGroupEntry group = Versioned.valueOrNull(mapEvent.newValue());
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700988 if ((key == null) && (group == null)) {
989 log.error("GroupStoreKeyMapListener: Received "
990 + "event {} with null entry", mapEvent.type());
991 return;
992 } else if (group == null) {
993 group = getGroupIdTable(key.deviceId()).values()
994 .stream()
995 .filter((storedGroup) -> (storedGroup.appCookie().equals(key.appCookie)))
996 .findFirst().get();
997 if (group == null) {
998 log.error("GroupStoreKeyMapListener: Received "
999 + "event {} with null entry... can not process", mapEvent.type());
1000 return;
1001 }
1002 }
1003 log.trace("received groupid map event {} for id {} in device {}",
1004 mapEvent.type(),
1005 group.id(),
1006 key.deviceId());
Madan Jampani0b847532016-03-03 13:44:15 -08001007 if (mapEvent.type() == MapEvent.Type.INSERT || mapEvent.type() == MapEvent.Type.UPDATE) {
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -07001008 // Update the group ID table
1009 getGroupIdTable(group.deviceId()).put(group.id(), group);
Madan Jampani0b847532016-03-03 13:44:15 -08001010 StoredGroupEntry value = Versioned.valueOrNull(mapEvent.newValue());
1011 if (value.state() == Group.GroupState.ADDED) {
1012 if (value.isGroupStateAddedFirstTime()) {
1013 groupEvent = new GroupEvent(Type.GROUP_ADDED, value);
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001014 log.trace("Received first time GROUP_ADDED state update for id {} in device {}",
1015 group.id(),
1016 group.deviceId());
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001017 } else {
Madan Jampani0b847532016-03-03 13:44:15 -08001018 groupEvent = new GroupEvent(Type.GROUP_UPDATED, value);
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001019 log.trace("Received following GROUP_ADDED state update for id {} in device {}",
1020 group.id(),
1021 group.deviceId());
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001022 }
1023 }
Madan Jampani0b847532016-03-03 13:44:15 -08001024 } else if (mapEvent.type() == MapEvent.Type.REMOVE) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001025 groupEvent = new GroupEvent(Type.GROUP_REMOVED, group);
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -07001026 // Remove the entry from the group ID table
1027 getGroupIdTable(group.deviceId()).remove(group.id(), group);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001028 }
1029
1030 if (groupEvent != null) {
1031 notifyDelegate(groupEvent);
1032 }
1033 }
1034 }
Madan Jampani01e05fb2015-08-13 13:29:36 -07001035
1036 private void process(GroupStoreMessage groupOp) {
1037 log.debug("Received remote group operation {} request for device {}",
1038 groupOp.type(),
1039 groupOp.deviceId());
1040 if (!mastershipService.isLocalMaster(groupOp.deviceId())) {
1041 log.warn("This node is not MASTER for device {}", groupOp.deviceId());
1042 return;
1043 }
1044 if (groupOp.type() == GroupStoreMessage.Type.ADD) {
1045 storeGroupDescriptionInternal(groupOp.groupDesc());
1046 } else if (groupOp.type() == GroupStoreMessage.Type.UPDATE) {
1047 updateGroupDescriptionInternal(groupOp.deviceId(),
1048 groupOp.appCookie(),
1049 groupOp.updateType(),
1050 groupOp.updateBuckets(),
1051 groupOp.newAppCookie());
1052 } else if (groupOp.type() == GroupStoreMessage.Type.DELETE) {
1053 deleteGroupDescriptionInternal(groupOp.deviceId(),
1054 groupOp.appCookie());
1055 }
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001056 }
1057
1058 /**
1059 * Flattened map key to be used to store group entries.
1060 */
Ray Milkeyb3c5ce22015-08-10 09:07:36 -07001061 protected static class GroupStoreMapKey {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001062 private final DeviceId deviceId;
1063
1064 public GroupStoreMapKey(DeviceId deviceId) {
1065 this.deviceId = deviceId;
1066 }
1067
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001068 public DeviceId deviceId() {
1069 return deviceId;
1070 }
1071
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001072 @Override
1073 public boolean equals(Object o) {
1074 if (this == o) {
1075 return true;
1076 }
1077 if (!(o instanceof GroupStoreMapKey)) {
1078 return false;
1079 }
1080 GroupStoreMapKey that = (GroupStoreMapKey) o;
1081 return this.deviceId.equals(that.deviceId);
1082 }
1083
1084 @Override
1085 public int hashCode() {
1086 int result = 17;
1087
1088 result = 31 * result + Objects.hash(this.deviceId);
1089
1090 return result;
1091 }
1092 }
1093
Ray Milkeyb3c5ce22015-08-10 09:07:36 -07001094 protected static class GroupStoreKeyMapKey extends GroupStoreMapKey {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001095 private final GroupKey appCookie;
1096 public GroupStoreKeyMapKey(DeviceId deviceId,
1097 GroupKey appCookie) {
1098 super(deviceId);
1099 this.appCookie = appCookie;
1100 }
1101
1102 @Override
1103 public boolean equals(Object o) {
1104 if (this == o) {
1105 return true;
1106 }
1107 if (!(o instanceof GroupStoreKeyMapKey)) {
1108 return false;
1109 }
1110 GroupStoreKeyMapKey that = (GroupStoreKeyMapKey) o;
1111 return (super.equals(that) &&
1112 this.appCookie.equals(that.appCookie));
1113 }
1114
1115 @Override
1116 public int hashCode() {
1117 int result = 17;
1118
1119 result = 31 * result + super.hashCode() + Objects.hash(this.appCookie);
1120
1121 return result;
1122 }
1123 }
1124
Ray Milkeyb3c5ce22015-08-10 09:07:36 -07001125 protected static class GroupStoreIdMapKey extends GroupStoreMapKey {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001126 private final GroupId groupId;
1127 public GroupStoreIdMapKey(DeviceId deviceId,
1128 GroupId groupId) {
1129 super(deviceId);
1130 this.groupId = groupId;
1131 }
1132
1133 @Override
1134 public boolean equals(Object o) {
1135 if (this == o) {
1136 return true;
1137 }
1138 if (!(o instanceof GroupStoreIdMapKey)) {
1139 return false;
1140 }
1141 GroupStoreIdMapKey that = (GroupStoreIdMapKey) o;
1142 return (super.equals(that) &&
1143 this.groupId.equals(that.groupId));
1144 }
1145
1146 @Override
1147 public int hashCode() {
1148 int result = 17;
1149
1150 result = 31 * result + super.hashCode() + Objects.hash(this.groupId);
1151
1152 return result;
1153 }
1154 }
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001155
1156 @Override
1157 public void pushGroupMetrics(DeviceId deviceId,
1158 Collection<Group> groupEntries) {
1159 boolean deviceInitialAuditStatus =
1160 deviceInitialAuditStatus(deviceId);
1161 Set<Group> southboundGroupEntries =
1162 Sets.newHashSet(groupEntries);
1163 Set<StoredGroupEntry> storedGroupEntries =
1164 Sets.newHashSet(getStoredGroups(deviceId));
1165 Set<Group> extraneousStoredEntries =
1166 Sets.newHashSet(getExtraneousGroups(deviceId));
1167
1168 log.trace("pushGroupMetrics: Displaying all ({}) southboundGroupEntries for device {}",
1169 southboundGroupEntries.size(),
1170 deviceId);
1171 for (Iterator<Group> it = southboundGroupEntries.iterator(); it.hasNext();) {
1172 Group group = it.next();
1173 log.trace("Group {} in device {}", group, deviceId);
1174 }
1175
1176 log.trace("Displaying all ({}) stored group entries for device {}",
1177 storedGroupEntries.size(),
1178 deviceId);
1179 for (Iterator<StoredGroupEntry> it1 = storedGroupEntries.iterator();
1180 it1.hasNext();) {
1181 Group group = it1.next();
1182 log.trace("Stored Group {} for device {}", group, deviceId);
1183 }
1184
1185 for (Iterator<Group> it2 = southboundGroupEntries.iterator(); it2.hasNext();) {
1186 Group group = it2.next();
1187 if (storedGroupEntries.remove(group)) {
1188 // we both have the group, let's update some info then.
1189 log.trace("Group AUDIT: group {} exists in both planes for device {}",
1190 group.id(), deviceId);
1191 groupAdded(group);
1192 it2.remove();
1193 }
1194 }
1195 for (Group group : southboundGroupEntries) {
1196 if (getGroup(group.deviceId(), group.id()) != null) {
1197 // There is a group existing with the same id
1198 // It is possible that group update is
1199 // in progress while we got a stale info from switch
1200 if (!storedGroupEntries.remove(getGroup(
1201 group.deviceId(), group.id()))) {
1202 log.warn("Group AUDIT: Inconsistent state:"
1203 + "Group exists in ID based table while "
1204 + "not present in key based table");
1205 }
1206 } else {
1207 // there are groups in the switch that aren't in the store
1208 log.debug("Group AUDIT: extraneous group {} exists in data plane for device {}",
1209 group.id(), deviceId);
1210 extraneousStoredEntries.remove(group);
1211 extraneousGroup(group);
1212 }
1213 }
1214 for (Group group : storedGroupEntries) {
1215 // there are groups in the store that aren't in the switch
1216 log.debug("Group AUDIT: group {} missing in data plane for device {}",
1217 group.id(), deviceId);
1218 groupMissing(group);
1219 }
1220 for (Group group : extraneousStoredEntries) {
1221 // there are groups in the extraneous store that
1222 // aren't in the switch
1223 log.debug("Group AUDIT: clearing extransoeus group {} from store for device {}",
1224 group.id(), deviceId);
1225 removeExtraneousGroupEntry(group);
1226 }
1227
1228 if (!deviceInitialAuditStatus) {
1229 log.debug("Group AUDIT: Setting device {} initial AUDIT completed",
1230 deviceId);
1231 deviceInitialAuditCompleted(deviceId, true);
1232 }
1233 }
1234
1235 private void groupMissing(Group group) {
1236 switch (group.state()) {
1237 case PENDING_DELETE:
1238 log.debug("Group {} delete confirmation from device {}",
1239 group, group.deviceId());
1240 removeGroupEntry(group);
1241 break;
1242 case ADDED:
1243 case PENDING_ADD:
Srikanth Vavilapalli5428b6c2015-05-14 20:22:47 -07001244 case PENDING_ADD_RETRY:
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001245 case PENDING_UPDATE:
1246 log.debug("Group {} is in store but not on device {}",
1247 group, group.deviceId());
1248 StoredGroupEntry existing =
1249 getStoredGroupEntry(group.deviceId(), group.id());
Srikanth Vavilapalli5428b6c2015-05-14 20:22:47 -07001250 log.debug("groupMissing: group entry {} in device {} moving from {} to PENDING_ADD_RETRY",
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001251 existing.id(),
1252 existing.deviceId(),
1253 existing.state());
Srikanth Vavilapalli5428b6c2015-05-14 20:22:47 -07001254 existing.setState(Group.GroupState.PENDING_ADD_RETRY);
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001255 //Re-PUT map entries to trigger map update events
1256 getGroupStoreKeyMap().
1257 put(new GroupStoreKeyMapKey(existing.deviceId(),
1258 existing.appCookie()), existing);
1259 notifyDelegate(new GroupEvent(GroupEvent.Type.GROUP_ADD_REQUESTED,
1260 group));
1261 break;
1262 default:
1263 log.debug("Group {} has not been installed.", group);
1264 break;
1265 }
1266 }
1267
1268 private void extraneousGroup(Group group) {
1269 log.debug("Group {} is on device {} but not in store.",
1270 group, group.deviceId());
1271 addOrUpdateExtraneousGroupEntry(group);
1272 }
1273
1274 private void groupAdded(Group group) {
1275 log.trace("Group {} Added or Updated in device {}",
1276 group, group.deviceId());
1277 addOrUpdateGroupEntry(group);
1278 }
alshabib10580802015-02-18 18:30:33 -08001279}