blob: 61c70baa03d3eb07d1010b2812e5222212b0152e [file] [log] [blame]
alshabib10580802015-02-18 18:30:33 -08001/*
Brian O'Connor5ab426f2016-04-09 01:19:45 -07002 * Copyright 2015-present Open Networking Laboratory
alshabib10580802015-02-18 18:30:33 -08003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.group.impl;
17
Jonathan Hart6ec029a2015-03-24 17:12:35 -070018import com.google.common.collect.FluentIterable;
Charles Chanf4838a72015-12-07 18:13:45 -080019import com.google.common.collect.ImmutableSet;
Jonathan Hart6ec029a2015-03-24 17:12:35 -070020import com.google.common.collect.Iterables;
Srikanth Vavilapalli23181912015-05-04 09:48:09 -070021import com.google.common.collect.Sets;
alshabib10580802015-02-18 18:30:33 -080022import org.apache.felix.scr.annotations.Activate;
23import org.apache.felix.scr.annotations.Component;
24import org.apache.felix.scr.annotations.Deactivate;
alshabibb0285992016-03-28 23:30:37 -070025import org.apache.felix.scr.annotations.Modified;
26import org.apache.felix.scr.annotations.Property;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070027import org.apache.felix.scr.annotations.Reference;
28import org.apache.felix.scr.annotations.ReferenceCardinality;
alshabib10580802015-02-18 18:30:33 -080029import org.apache.felix.scr.annotations.Service;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070030import org.onlab.util.KryoNamespace;
alshabibb0285992016-03-28 23:30:37 -070031import org.onosproject.cfg.ComponentConfigService;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070032import org.onosproject.cluster.ClusterService;
Charles Chanf4838a72015-12-07 18:13:45 -080033import org.onosproject.cluster.NodeId;
alshabib10580802015-02-18 18:30:33 -080034import org.onosproject.core.GroupId;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070035import org.onosproject.mastership.MastershipService;
alshabib10580802015-02-18 18:30:33 -080036import org.onosproject.net.DeviceId;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070037import org.onosproject.net.MastershipRole;
alshabib10580802015-02-18 18:30:33 -080038import org.onosproject.net.group.DefaultGroup;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070039import org.onosproject.net.group.DefaultGroupBucket;
alshabib10580802015-02-18 18:30:33 -080040import org.onosproject.net.group.DefaultGroupDescription;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070041import org.onosproject.net.group.DefaultGroupKey;
alshabib10580802015-02-18 18:30:33 -080042import org.onosproject.net.group.Group;
43import org.onosproject.net.group.Group.GroupState;
44import org.onosproject.net.group.GroupBucket;
45import org.onosproject.net.group.GroupBuckets;
46import org.onosproject.net.group.GroupDescription;
47import org.onosproject.net.group.GroupEvent;
48import org.onosproject.net.group.GroupEvent.Type;
49import org.onosproject.net.group.GroupKey;
50import org.onosproject.net.group.GroupOperation;
51import org.onosproject.net.group.GroupStore;
52import org.onosproject.net.group.GroupStoreDelegate;
Srikanth Vavilapalli10e75cd2015-04-13 16:21:24 -070053import org.onosproject.net.group.StoredGroupBucketEntry;
alshabib10580802015-02-18 18:30:33 -080054import org.onosproject.net.group.StoredGroupEntry;
55import org.onosproject.store.AbstractStore;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070056import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070057import org.onosproject.store.serializers.KryoNamespaces;
Madan Jampani0b847532016-03-03 13:44:15 -080058import org.onosproject.store.service.ConsistentMap;
59import org.onosproject.store.service.MapEvent;
60import org.onosproject.store.service.MapEventListener;
alshabibb0285992016-03-28 23:30:37 -070061import org.onosproject.store.service.MultiValuedTimestamp;
Madan Jampani0b847532016-03-03 13:44:15 -080062import org.onosproject.store.service.Serializer;
Jonathan Hart6ec029a2015-03-24 17:12:35 -070063import org.onosproject.store.service.StorageService;
helenyrwua1c41152016-08-18 16:16:14 -070064import org.onosproject.store.service.Topic;
Madan Jampani0b847532016-03-03 13:44:15 -080065import org.onosproject.store.service.Versioned;
Kavitha Alagesanc884c3ef2017-01-19 12:32:26 +053066import org.onosproject.store.service.DistributedPrimitive.Status;
alshabibb0285992016-03-28 23:30:37 -070067import org.osgi.service.component.ComponentContext;
alshabib10580802015-02-18 18:30:33 -080068import org.slf4j.Logger;
69
Jonathan Hart6ec029a2015-03-24 17:12:35 -070070import java.util.ArrayList;
Srikanth Vavilapalli23181912015-05-04 09:48:09 -070071import java.util.Collection;
Charles Chanf4838a72015-12-07 18:13:45 -080072import java.util.Collections;
alshabibb0285992016-03-28 23:30:37 -070073import java.util.Dictionary;
Jonathan Hart6ec029a2015-03-24 17:12:35 -070074import java.util.HashMap;
Charles Chan0c7c43b2016-01-14 17:39:20 -080075import java.util.HashSet;
Srikanth Vavilapalli23181912015-05-04 09:48:09 -070076import java.util.Iterator;
Jonathan Hart6ec029a2015-03-24 17:12:35 -070077import java.util.List;
Madan Jampani0b847532016-03-03 13:44:15 -080078import java.util.Map;
Charles Chan0c7c43b2016-01-14 17:39:20 -080079import java.util.Map.Entry;
Jonathan Hart6ec029a2015-03-24 17:12:35 -070080import java.util.Objects;
Sho SHIMIZU30d639b2015-05-05 09:30:35 -070081import java.util.Optional;
alshabibb0285992016-03-28 23:30:37 -070082import java.util.Properties;
Srikanth Vavilapalli23181912015-05-04 09:48:09 -070083import java.util.Set;
Jonathan Hart6ec029a2015-03-24 17:12:35 -070084import java.util.concurrent.ConcurrentHashMap;
85import java.util.concurrent.ConcurrentMap;
86import java.util.concurrent.ExecutorService;
87import java.util.concurrent.Executors;
Kavitha Alagesanc884c3ef2017-01-19 12:32:26 +053088import java.util.concurrent.ScheduledExecutorService;
Jonathan Hart6ec029a2015-03-24 17:12:35 -070089import java.util.concurrent.atomic.AtomicInteger;
Kavitha Alagesanc884c3ef2017-01-19 12:32:26 +053090import java.util.function.Consumer;
Jonathan Hart6ec029a2015-03-24 17:12:35 -070091import java.util.stream.Collectors;
92
alshabibb0285992016-03-28 23:30:37 -070093import static com.google.common.base.Strings.isNullOrEmpty;
Kavitha Alagesanc884c3ef2017-01-19 12:32:26 +053094import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
alshabibb0285992016-03-28 23:30:37 -070095import static org.onlab.util.Tools.get;
Jonathan Hart6ec029a2015-03-24 17:12:35 -070096import static org.onlab.util.Tools.groupedThreads;
97import static org.slf4j.LoggerFactory.getLogger;
alshabib10580802015-02-18 18:30:33 -080098
99/**
Saurav Das0fd79d92016-03-07 10:58:36 -0800100 * Manages inventory of group entries using distributed group stores from the
101 * storage service.
alshabib10580802015-02-18 18:30:33 -0800102 */
103@Component(immediate = true)
104@Service
105public class DistributedGroupStore
106 extends AbstractStore<GroupEvent, GroupStoreDelegate>
107 implements GroupStore {
108
109 private final Logger log = getLogger(getClass());
110
alshabibb0285992016-03-28 23:30:37 -0700111 private static final boolean GARBAGE_COLLECT = false;
112 private static final int GC_THRESH = 6;
113
alshabib10580802015-02-18 18:30:33 -0800114 private final int dummyId = 0xffffffff;
Yi Tsengfa394de2017-02-01 11:26:40 -0800115 private final GroupId dummyGroupId = new GroupId(dummyId);
alshabib10580802015-02-18 18:30:33 -0800116
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700117 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
118 protected ClusterCommunicationService clusterCommunicator;
119
120 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
121 protected ClusterService clusterService;
122
123 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700124 protected StorageService storageService;
125
126 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700127 protected MastershipService mastershipService;
128
alshabibb0285992016-03-28 23:30:37 -0700129 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
130 protected ComponentConfigService cfgService;
131
Kavitha Alagesanc884c3ef2017-01-19 12:32:26 +0530132 private ScheduledExecutorService executor;
133 private Consumer<Status> statusChangeListener;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700134 // Per device group table with (device id + app cookie) as key
Madan Jampani0b847532016-03-03 13:44:15 -0800135 private ConsistentMap<GroupStoreKeyMapKey,
alshabibb0285992016-03-28 23:30:37 -0700136 StoredGroupEntry> groupStoreEntriesByKey = null;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700137 // Per device group table with (device id + group id) as key
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700138 private final ConcurrentMap<DeviceId, ConcurrentMap<GroupId, StoredGroupEntry>>
alshabibb0285992016-03-28 23:30:37 -0700139 groupEntriesById = new ConcurrentHashMap<>();
Madan Jampani0b847532016-03-03 13:44:15 -0800140 private ConsistentMap<GroupStoreKeyMapKey,
alshabibb0285992016-03-28 23:30:37 -0700141 StoredGroupEntry> auditPendingReqQueue = null;
Frank Wange0eb5ce2016-07-01 18:21:25 +0800142 private MapEventListener<GroupStoreKeyMapKey, StoredGroupEntry>
143 mapListener = new GroupStoreKeyMapListener();
alshabib10580802015-02-18 18:30:33 -0800144 private final ConcurrentMap<DeviceId, ConcurrentMap<GroupId, Group>>
145 extraneousGroupEntriesById = new ConcurrentHashMap<>();
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700146 private ExecutorService messageHandlingExecutor;
147 private static final int MESSAGE_HANDLER_THREAD_POOL_SIZE = 1;
Sho SHIMIZU7a4087b2015-09-10 09:23:16 -0700148 private final HashMap<DeviceId, Boolean> deviceAuditStatus = new HashMap<>();
alshabib10580802015-02-18 18:30:33 -0800149
150 private final AtomicInteger groupIdGen = new AtomicInteger();
151
HIGUCHI Yuta180d70f2015-10-01 16:13:56 -0700152 private KryoNamespace clusterMsgSerializer;
153
helenyrwua1c41152016-08-18 16:16:14 -0700154 private static Topic<GroupStoreMessage> groupTopic;
155
alshabibb0285992016-03-28 23:30:37 -0700156 @Property(name = "garbageCollect", boolValue = GARBAGE_COLLECT,
157 label = "Enable group garbage collection")
158 private boolean garbageCollect = GARBAGE_COLLECT;
159
160 @Property(name = "gcThresh", intValue = GC_THRESH,
161 label = "Number of rounds for group garbage collection")
162 private int gcThresh = GC_THRESH;
163
164
alshabib10580802015-02-18 18:30:33 -0800165 @Activate
166 public void activate() {
alshabibb0285992016-03-28 23:30:37 -0700167 cfgService.registerProperties(getClass());
HIGUCHI Yuta3a84b322016-05-18 13:38:07 -0700168 KryoNamespace.Builder kryoBuilder = new KryoNamespace.Builder()
alshabibb0285992016-03-28 23:30:37 -0700169 .register(KryoNamespaces.API)
HIGUCHI Yuta3a84b322016-05-18 13:38:07 -0700170 .nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID)
alshabibb0285992016-03-28 23:30:37 -0700171 .register(DefaultGroup.class,
172 DefaultGroupBucket.class,
173 DefaultGroupDescription.class,
174 DefaultGroupKey.class,
175 GroupDescription.Type.class,
176 Group.GroupState.class,
177 GroupBuckets.class,
alshabibb0285992016-03-28 23:30:37 -0700178 GroupStoreMessage.class,
179 GroupStoreMessage.Type.class,
180 UpdateType.class,
181 GroupStoreMessageSubjects.class,
182 MultiValuedTimestamp.class,
183 GroupStoreKeyMapKey.class,
184 GroupStoreIdMapKey.class,
185 GroupStoreMapKey.class
186 );
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700187
HIGUCHI Yuta3a84b322016-05-18 13:38:07 -0700188 clusterMsgSerializer = kryoBuilder.build("GroupStore");
189 Serializer serializer = Serializer.using(clusterMsgSerializer);
HIGUCHI Yuta180d70f2015-10-01 16:13:56 -0700190
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700191 messageHandlingExecutor = Executors.
192 newFixedThreadPool(MESSAGE_HANDLER_THREAD_POOL_SIZE,
193 groupedThreads("onos/store/group",
HIGUCHI Yutad9e01052016-04-14 09:31:42 -0700194 "message-handlers",
195 log));
Madan Jampani01e05fb2015-08-13 13:29:36 -0700196
197 clusterCommunicator.addSubscriber(GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST,
alshabibb0285992016-03-28 23:30:37 -0700198 clusterMsgSerializer::deserialize,
199 this::process,
200 messageHandlingExecutor);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700201
Madan Jampani0b847532016-03-03 13:44:15 -0800202 log.debug("Creating Consistent map onos-group-store-keymap");
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700203
Madan Jampani0b847532016-03-03 13:44:15 -0800204 groupStoreEntriesByKey = storageService.<GroupStoreKeyMapKey, StoredGroupEntry>consistentMapBuilder()
205 .withName("onos-group-store-keymap")
HIGUCHI Yuta3a84b322016-05-18 13:38:07 -0700206 .withSerializer(serializer)
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700207 .build();
Frank Wange0eb5ce2016-07-01 18:21:25 +0800208 groupStoreEntriesByKey.addListener(mapListener);
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700209 log.debug("Current size of groupstorekeymap:{}",
210 groupStoreEntriesByKey.size());
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700211
Kavitha Alagesanc884c3ef2017-01-19 12:32:26 +0530212 log.debug("Creating GroupStoreId Map From GroupStoreKey Map");
213 matchGroupEntries();
214 executor = newSingleThreadScheduledExecutor(groupedThreads("onos/group", "store", log));
215 statusChangeListener = status -> {
216 if (status == Status.ACTIVE) {
217 executor.execute(this::matchGroupEntries);
218 }
219 };
220 groupStoreEntriesByKey.addStatusChangeListener(statusChangeListener);
221
Madan Jampani0b847532016-03-03 13:44:15 -0800222 log.debug("Creating Consistent map pendinggroupkeymap");
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700223
Madan Jampani0b847532016-03-03 13:44:15 -0800224 auditPendingReqQueue = storageService.<GroupStoreKeyMapKey, StoredGroupEntry>consistentMapBuilder()
225 .withName("onos-pending-group-keymap")
HIGUCHI Yuta3a84b322016-05-18 13:38:07 -0700226 .withSerializer(serializer)
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700227 .build();
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700228 log.debug("Current size of pendinggroupkeymap:{}",
229 auditPendingReqQueue.size());
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700230
helenyrwua1c41152016-08-18 16:16:14 -0700231 groupTopic = getOrCreateGroupTopic(serializer);
232 groupTopic.subscribe(this::processGroupMessage);
233
alshabib10580802015-02-18 18:30:33 -0800234 log.info("Started");
235 }
236
237 @Deactivate
238 public void deactivate() {
Frank Wange0eb5ce2016-07-01 18:21:25 +0800239 groupStoreEntriesByKey.removeListener(mapListener);
alshabibb0285992016-03-28 23:30:37 -0700240 cfgService.unregisterProperties(getClass(), false);
HIGUCHI Yuta180d70f2015-10-01 16:13:56 -0700241 clusterCommunicator.removeSubscriber(GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST);
alshabib10580802015-02-18 18:30:33 -0800242 log.info("Stopped");
243 }
244
alshabibb0285992016-03-28 23:30:37 -0700245 @Modified
246 public void modified(ComponentContext context) {
247 Dictionary<?, ?> properties = context != null ? context.getProperties() : new Properties();
248
249 try {
250 String s = get(properties, "garbageCollect");
251 garbageCollect = isNullOrEmpty(s) ? GARBAGE_COLLECT : Boolean.parseBoolean(s.trim());
252
253 s = get(properties, "gcThresh");
254 gcThresh = isNullOrEmpty(s) ? GC_THRESH : Integer.parseInt(s.trim());
255 } catch (Exception e) {
256 gcThresh = GC_THRESH;
257 garbageCollect = GARBAGE_COLLECT;
258 }
259 }
260
helenyrwua1c41152016-08-18 16:16:14 -0700261 private Topic<GroupStoreMessage> getOrCreateGroupTopic(Serializer serializer) {
262 if (groupTopic == null) {
263 return storageService.getTopic("group-failover-notif", serializer);
264 } else {
265 return groupTopic;
266 }
Sho SHIMIZUa6285542017-01-12 15:08:24 -0800267 }
helenyrwua1c41152016-08-18 16:16:14 -0700268
alshabib10580802015-02-18 18:30:33 -0800269 /**
Kavitha Alagesanc884c3ef2017-01-19 12:32:26 +0530270 * Updating values of groupEntriesById.
271 */
272 private void matchGroupEntries() {
273 for (Entry<GroupStoreKeyMapKey, StoredGroupEntry> entry : groupStoreEntriesByKey.asJavaMap().entrySet()) {
274 StoredGroupEntry group = entry.getValue();
275 getGroupIdTable(entry.getKey().deviceId()).put(group.id(), group);
276 }
277 }
278
279 /**
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700280 * Returns the group store eventual consistent key map.
alshabib10580802015-02-18 18:30:33 -0800281 *
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700282 * @return Map representing group key table.
alshabib10580802015-02-18 18:30:33 -0800283 */
Madan Jampani0b847532016-03-03 13:44:15 -0800284 private Map<GroupStoreKeyMapKey, StoredGroupEntry>
alshabibb0285992016-03-28 23:30:37 -0700285 getGroupStoreKeyMap() {
Madan Jampani0b847532016-03-03 13:44:15 -0800286 return groupStoreEntriesByKey.asJavaMap();
alshabib10580802015-02-18 18:30:33 -0800287 }
288
289 /**
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700290 * Returns the group id table for specified device.
alshabib10580802015-02-18 18:30:33 -0800291 *
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700292 * @param deviceId identifier of the device
293 * @return Map representing group key table of given device.
alshabib10580802015-02-18 18:30:33 -0800294 */
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700295 private ConcurrentMap<GroupId, StoredGroupEntry> getGroupIdTable(DeviceId deviceId) {
Yuta HIGUCHIc2e68152016-08-16 13:47:36 -0700296 return groupEntriesById.computeIfAbsent(deviceId, k -> new ConcurrentHashMap<>());
alshabib10580802015-02-18 18:30:33 -0800297 }
298
299 /**
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700300 * Returns the pending group request table.
alshabib10580802015-02-18 18:30:33 -0800301 *
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700302 * @return Map representing group key table.
alshabib10580802015-02-18 18:30:33 -0800303 */
Madan Jampani0b847532016-03-03 13:44:15 -0800304 private Map<GroupStoreKeyMapKey, StoredGroupEntry>
alshabibb0285992016-03-28 23:30:37 -0700305 getPendingGroupKeyTable() {
Madan Jampani0b847532016-03-03 13:44:15 -0800306 return auditPendingReqQueue.asJavaMap();
alshabib10580802015-02-18 18:30:33 -0800307 }
308
309 /**
310 * Returns the extraneous group id table for specified device.
311 *
312 * @param deviceId identifier of the device
313 * @return Map representing group key table of given device.
314 */
315 private ConcurrentMap<GroupId, Group>
316 getExtraneousGroupIdTable(DeviceId deviceId) {
Yuta HIGUCHIc2e68152016-08-16 13:47:36 -0700317 return extraneousGroupEntriesById.computeIfAbsent(deviceId, k -> new ConcurrentHashMap<>());
alshabib10580802015-02-18 18:30:33 -0800318 }
319
320 /**
321 * Returns the number of groups for the specified device in the store.
322 *
323 * @return number of groups for the specified device
324 */
325 @Override
326 public int getGroupCount(DeviceId deviceId) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700327 return (getGroups(deviceId) != null) ?
alshabibb0285992016-03-28 23:30:37 -0700328 Iterables.size(getGroups(deviceId)) : 0;
alshabib10580802015-02-18 18:30:33 -0800329 }
330
331 /**
332 * Returns the groups associated with a device.
333 *
334 * @param deviceId the device ID
alshabib10580802015-02-18 18:30:33 -0800335 * @return the group entries
336 */
337 @Override
338 public Iterable<Group> getGroups(DeviceId deviceId) {
Charles Chanf4838a72015-12-07 18:13:45 -0800339 // Let ImmutableSet.copyOf do the type conversion
340 return ImmutableSet.copyOf(getStoredGroups(deviceId));
alshabib10580802015-02-18 18:30:33 -0800341 }
342
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700343 private Iterable<StoredGroupEntry> getStoredGroups(DeviceId deviceId) {
Charles Chanf4838a72015-12-07 18:13:45 -0800344 NodeId master = mastershipService.getMasterFor(deviceId);
345 if (master == null) {
346 log.debug("Failed to getGroups: No master for {}", deviceId);
347 return Collections.emptySet();
348 }
349
350 Set<StoredGroupEntry> storedGroups = getGroupStoreKeyMap().values()
351 .stream()
352 .filter(input -> input.deviceId().equals(deviceId))
353 .collect(Collectors.toSet());
354 return ImmutableSet.copyOf(storedGroups);
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700355 }
356
alshabib10580802015-02-18 18:30:33 -0800357 /**
358 * Returns the stored group entry.
359 *
alshabibb0285992016-03-28 23:30:37 -0700360 * @param deviceId the device ID
alshabib10580802015-02-18 18:30:33 -0800361 * @param appCookie the group key
alshabib10580802015-02-18 18:30:33 -0800362 * @return a group associated with the key
363 */
364 @Override
365 public Group getGroup(DeviceId deviceId, GroupKey appCookie) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700366 return getStoredGroupEntry(deviceId, appCookie);
367 }
368
369 private StoredGroupEntry getStoredGroupEntry(DeviceId deviceId,
370 GroupKey appCookie) {
371 return getGroupStoreKeyMap().get(new GroupStoreKeyMapKey(deviceId,
372 appCookie));
373 }
374
375 @Override
376 public Group getGroup(DeviceId deviceId, GroupId groupId) {
377 return getStoredGroupEntry(deviceId, groupId);
378 }
379
380 private StoredGroupEntry getStoredGroupEntry(DeviceId deviceId,
381 GroupId groupId) {
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700382 return getGroupIdTable(deviceId).get(groupId);
alshabib10580802015-02-18 18:30:33 -0800383 }
384
385 private int getFreeGroupIdValue(DeviceId deviceId) {
386 int freeId = groupIdGen.incrementAndGet();
387
388 while (true) {
Yi Tsengfa394de2017-02-01 11:26:40 -0800389 Group existing = getGroup(deviceId, new GroupId(freeId));
alshabib10580802015-02-18 18:30:33 -0800390 if (existing == null) {
391 existing = (
392 extraneousGroupEntriesById.get(deviceId) != null) ?
393 extraneousGroupEntriesById.get(deviceId).
Yi Tsengfa394de2017-02-01 11:26:40 -0800394 get(new GroupId(freeId)) :
alshabib10580802015-02-18 18:30:33 -0800395 null;
396 }
397 if (existing != null) {
398 freeId = groupIdGen.incrementAndGet();
399 } else {
400 break;
401 }
402 }
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700403 log.debug("getFreeGroupIdValue: Next Free ID is {}", freeId);
alshabib10580802015-02-18 18:30:33 -0800404 return freeId;
405 }
406
407 /**
408 * Stores a new group entry using the information from group description.
409 *
410 * @param groupDesc group description to be used to create group entry
411 */
412 @Override
413 public void storeGroupDescription(GroupDescription groupDesc) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700414 log.debug("In storeGroupDescription");
alshabib10580802015-02-18 18:30:33 -0800415 // Check if a group is existing with the same key
Saurav Das8a0732e2015-11-20 15:27:53 -0800416 Group existingGroup = getGroup(groupDesc.deviceId(), groupDesc.appCookie());
417 if (existingGroup != null) {
Charles Chan216e3c82016-04-23 14:48:16 -0700418 log.info("Group already exists with the same key {} in dev:{} with id:0x{}",
Saurav Das8a0732e2015-11-20 15:27:53 -0800419 groupDesc.appCookie(), groupDesc.deviceId(),
420 Integer.toHexString(existingGroup.id().id()));
alshabib10580802015-02-18 18:30:33 -0800421 return;
422 }
423
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700424 // Check if group to be created by a remote instance
Madan Jampani175e8fd2015-05-20 14:10:45 -0700425 if (mastershipService.getLocalRole(groupDesc.deviceId()) != MastershipRole.MASTER) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700426 log.debug("storeGroupDescription: Device {} local role is not MASTER",
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700427 groupDesc.deviceId());
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700428 if (mastershipService.getMasterFor(groupDesc.deviceId()) == null) {
429 log.error("No Master for device {}..."
alshabibb0285992016-03-28 23:30:37 -0700430 + "Can not perform add group operation",
431 groupDesc.deviceId());
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700432 //TODO: Send Group operation failure event
433 return;
434 }
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700435 GroupStoreMessage groupOp = GroupStoreMessage.
436 createGroupAddRequestMsg(groupDesc.deviceId(),
437 groupDesc);
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700438
Madan Jampani175e8fd2015-05-20 14:10:45 -0700439 clusterCommunicator.unicast(groupOp,
alshabibb0285992016-03-28 23:30:37 -0700440 GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST,
441 clusterMsgSerializer::serialize,
442 mastershipService.getMasterFor(groupDesc.deviceId()))
443 .whenComplete((result, error) -> {
Madan Jampani175e8fd2015-05-20 14:10:45 -0700444 if (error != null) {
445 log.warn("Failed to send request to master: {} to {}",
alshabibb0285992016-03-28 23:30:37 -0700446 groupOp,
447 mastershipService.getMasterFor(groupDesc.deviceId()));
Madan Jampani175e8fd2015-05-20 14:10:45 -0700448 //TODO: Send Group operation failure event
449 } else {
450 log.debug("Sent Group operation request for device {} "
alshabibb0285992016-03-28 23:30:37 -0700451 + "to remote MASTER {}",
452 groupDesc.deviceId(),
453 mastershipService.getMasterFor(groupDesc.deviceId()));
Madan Jampani175e8fd2015-05-20 14:10:45 -0700454 }
455 });
alshabib10580802015-02-18 18:30:33 -0800456 return;
457 }
458
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700459 log.debug("Store group for device {} is getting handled locally",
460 groupDesc.deviceId());
alshabib10580802015-02-18 18:30:33 -0800461 storeGroupDescriptionInternal(groupDesc);
462 }
463
Srikanth Vavilapallie48b3cf2015-07-06 11:43:07 -0700464 private Group getMatchingExtraneousGroupbyId(DeviceId deviceId, Integer groupId) {
465 ConcurrentMap<GroupId, Group> extraneousMap =
466 extraneousGroupEntriesById.get(deviceId);
467 if (extraneousMap == null) {
468 return null;
469 }
Yi Tsengfa394de2017-02-01 11:26:40 -0800470 return extraneousMap.get(new GroupId(groupId));
Srikanth Vavilapallie48b3cf2015-07-06 11:43:07 -0700471 }
472
473 private Group getMatchingExtraneousGroupbyBuckets(DeviceId deviceId,
474 GroupBuckets buckets) {
475 ConcurrentMap<GroupId, Group> extraneousMap =
476 extraneousGroupEntriesById.get(deviceId);
477 if (extraneousMap == null) {
478 return null;
479 }
480
alshabibb0285992016-03-28 23:30:37 -0700481 for (Group extraneousGroup : extraneousMap.values()) {
Srikanth Vavilapallie48b3cf2015-07-06 11:43:07 -0700482 if (extraneousGroup.buckets().equals(buckets)) {
483 return extraneousGroup;
484 }
485 }
486 return null;
487 }
488
alshabib10580802015-02-18 18:30:33 -0800489 private void storeGroupDescriptionInternal(GroupDescription groupDesc) {
490 // Check if a group is existing with the same key
491 if (getGroup(groupDesc.deviceId(), groupDesc.appCookie()) != null) {
492 return;
493 }
494
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700495 if (deviceAuditStatus.get(groupDesc.deviceId()) == null) {
496 // Device group audit has not completed yet
497 // Add this group description to pending group key table
498 // Create a group entry object with Dummy Group ID
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700499 log.debug("storeGroupDescriptionInternal: Device {} AUDIT pending...Queuing Group ADD request",
alshabibb0285992016-03-28 23:30:37 -0700500 groupDesc.deviceId());
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700501 StoredGroupEntry group = new DefaultGroup(dummyGroupId, groupDesc);
502 group.setState(GroupState.WAITING_AUDIT_COMPLETE);
Madan Jampani0b847532016-03-03 13:44:15 -0800503 Map<GroupStoreKeyMapKey, StoredGroupEntry> pendingKeyTable =
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700504 getPendingGroupKeyTable();
505 pendingKeyTable.put(new GroupStoreKeyMapKey(groupDesc.deviceId(),
506 groupDesc.appCookie()),
507 group);
508 return;
509 }
510
Srikanth Vavilapallie48b3cf2015-07-06 11:43:07 -0700511 Group matchingExtraneousGroup = null;
512 if (groupDesc.givenGroupId() != null) {
513 //Check if there is a extraneous group existing with the same Id
514 matchingExtraneousGroup = getMatchingExtraneousGroupbyId(
alshabibb0285992016-03-28 23:30:37 -0700515 groupDesc.deviceId(), groupDesc.givenGroupId());
Srikanth Vavilapallie48b3cf2015-07-06 11:43:07 -0700516 if (matchingExtraneousGroup != null) {
Saurav Das0fd79d92016-03-07 10:58:36 -0800517 log.debug("storeGroupDescriptionInternal: Matching extraneous group "
alshabibb0285992016-03-28 23:30:37 -0700518 + "found in Device {} for group id 0x{}",
Srikanth Vavilapallie48b3cf2015-07-06 11:43:07 -0700519 groupDesc.deviceId(),
Saurav Das0fd79d92016-03-07 10:58:36 -0800520 Integer.toHexString(groupDesc.givenGroupId()));
Srikanth Vavilapallie48b3cf2015-07-06 11:43:07 -0700521 //Check if the group buckets matches with user provided buckets
522 if (matchingExtraneousGroup.buckets().equals(groupDesc.buckets())) {
523 //Group is already existing with the same buckets and Id
524 // Create a group entry object
Saurav Das0fd79d92016-03-07 10:58:36 -0800525 log.debug("storeGroupDescriptionInternal: Buckets also matching "
alshabibb0285992016-03-28 23:30:37 -0700526 + "in Device {} for group id 0x{}",
Srikanth Vavilapallie48b3cf2015-07-06 11:43:07 -0700527 groupDesc.deviceId(),
Saurav Das0fd79d92016-03-07 10:58:36 -0800528 Integer.toHexString(groupDesc.givenGroupId()));
Srikanth Vavilapallie48b3cf2015-07-06 11:43:07 -0700529 StoredGroupEntry group = new DefaultGroup(
alshabibb0285992016-03-28 23:30:37 -0700530 matchingExtraneousGroup.id(), groupDesc);
Srikanth Vavilapallie48b3cf2015-07-06 11:43:07 -0700531 // Insert the newly created group entry into key and id maps
532 getGroupStoreKeyMap().
alshabibb0285992016-03-28 23:30:37 -0700533 put(new GroupStoreKeyMapKey(groupDesc.deviceId(),
534 groupDesc.appCookie()), group);
Srikanth Vavilapallie48b3cf2015-07-06 11:43:07 -0700535 // Ensure it also inserted into group id based table to
536 // avoid any chances of duplication in group id generation
537 getGroupIdTable(groupDesc.deviceId()).
alshabibb0285992016-03-28 23:30:37 -0700538 put(matchingExtraneousGroup.id(), group);
Srikanth Vavilapallie48b3cf2015-07-06 11:43:07 -0700539 addOrUpdateGroupEntry(matchingExtraneousGroup);
540 removeExtraneousGroupEntry(matchingExtraneousGroup);
541 return;
542 } else {
543 //Group buckets are not matching. Update group
544 //with user provided buckets.
Saurav Das0fd79d92016-03-07 10:58:36 -0800545 log.debug("storeGroupDescriptionInternal: Buckets are not "
alshabibb0285992016-03-28 23:30:37 -0700546 + "matching in Device {} for group id 0x{}",
Srikanth Vavilapallie48b3cf2015-07-06 11:43:07 -0700547 groupDesc.deviceId(),
Saurav Das0fd79d92016-03-07 10:58:36 -0800548 Integer.toHexString(groupDesc.givenGroupId()));
549 StoredGroupEntry modifiedGroup = new DefaultGroup(
alshabibb0285992016-03-28 23:30:37 -0700550 matchingExtraneousGroup.id(), groupDesc);
Saurav Das0fd79d92016-03-07 10:58:36 -0800551 modifiedGroup.setState(GroupState.PENDING_UPDATE);
552 getGroupStoreKeyMap().
alshabibb0285992016-03-28 23:30:37 -0700553 put(new GroupStoreKeyMapKey(groupDesc.deviceId(),
554 groupDesc.appCookie()), modifiedGroup);
Saurav Das0fd79d92016-03-07 10:58:36 -0800555 // Ensure it also inserted into group id based table to
556 // avoid any chances of duplication in group id generation
557 getGroupIdTable(groupDesc.deviceId()).
alshabibb0285992016-03-28 23:30:37 -0700558 put(matchingExtraneousGroup.id(), modifiedGroup);
Saurav Das0fd79d92016-03-07 10:58:36 -0800559 removeExtraneousGroupEntry(matchingExtraneousGroup);
560 log.debug("storeGroupDescriptionInternal: Triggering Group "
alshabibb0285992016-03-28 23:30:37 -0700561 + "UPDATE request for {} in device {}",
Saurav Das0fd79d92016-03-07 10:58:36 -0800562 matchingExtraneousGroup.id(),
563 groupDesc.deviceId());
564 notifyDelegate(new GroupEvent(Type.GROUP_UPDATE_REQUESTED, modifiedGroup));
565 return;
Srikanth Vavilapallie48b3cf2015-07-06 11:43:07 -0700566 }
567 }
568 } else {
569 //Check if there is an extraneous group with user provided buckets
570 matchingExtraneousGroup = getMatchingExtraneousGroupbyBuckets(
alshabibb0285992016-03-28 23:30:37 -0700571 groupDesc.deviceId(), groupDesc.buckets());
Srikanth Vavilapallie48b3cf2015-07-06 11:43:07 -0700572 if (matchingExtraneousGroup != null) {
573 //Group is already existing with the same buckets.
574 //So reuse this group.
575 log.debug("storeGroupDescriptionInternal: Matching extraneous group found in Device {}",
576 groupDesc.deviceId());
577 //Create a group entry object
578 StoredGroupEntry group = new DefaultGroup(
alshabibb0285992016-03-28 23:30:37 -0700579 matchingExtraneousGroup.id(), groupDesc);
Srikanth Vavilapallie48b3cf2015-07-06 11:43:07 -0700580 // Insert the newly created group entry into key and id maps
581 getGroupStoreKeyMap().
alshabibb0285992016-03-28 23:30:37 -0700582 put(new GroupStoreKeyMapKey(groupDesc.deviceId(),
583 groupDesc.appCookie()), group);
Srikanth Vavilapallie48b3cf2015-07-06 11:43:07 -0700584 // Ensure it also inserted into group id based table to
585 // avoid any chances of duplication in group id generation
586 getGroupIdTable(groupDesc.deviceId()).
alshabibb0285992016-03-28 23:30:37 -0700587 put(matchingExtraneousGroup.id(), group);
Srikanth Vavilapallie48b3cf2015-07-06 11:43:07 -0700588 addOrUpdateGroupEntry(matchingExtraneousGroup);
589 removeExtraneousGroupEntry(matchingExtraneousGroup);
590 return;
591 } else {
592 //TODO: Check if there are any empty groups that can be used here
593 log.debug("storeGroupDescriptionInternal: No matching extraneous groups found in Device {}",
594 groupDesc.deviceId());
595 }
596 }
597
Saurav Das100e3b82015-04-30 11:12:10 -0700598 GroupId id = null;
599 if (groupDesc.givenGroupId() == null) {
600 // Get a new group identifier
Yi Tsengfa394de2017-02-01 11:26:40 -0800601 id = new GroupId(getFreeGroupIdValue(groupDesc.deviceId()));
Saurav Das100e3b82015-04-30 11:12:10 -0700602 } else {
Saurav Das8be4e3a2016-03-11 17:19:07 -0800603 // we need to use the identifier passed in by caller, but check if
604 // already used
605 Group existing = getGroup(groupDesc.deviceId(),
Yi Tsengfa394de2017-02-01 11:26:40 -0800606 new GroupId(groupDesc.givenGroupId()));
Saurav Das8be4e3a2016-03-11 17:19:07 -0800607 if (existing != null) {
608 log.warn("Group already exists with the same id: 0x{} in dev:{} "
alshabibb0285992016-03-28 23:30:37 -0700609 + "but with different key: {} (request gkey: {})",
610 Integer.toHexString(groupDesc.givenGroupId()),
611 groupDesc.deviceId(),
612 existing.appCookie(),
613 groupDesc.appCookie());
Saurav Das8be4e3a2016-03-11 17:19:07 -0800614 return;
615 }
Yi Tsengfa394de2017-02-01 11:26:40 -0800616 id = new GroupId(groupDesc.givenGroupId());
Saurav Das100e3b82015-04-30 11:12:10 -0700617 }
alshabib10580802015-02-18 18:30:33 -0800618 // Create a group entry object
619 StoredGroupEntry group = new DefaultGroup(id, groupDesc);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700620 // Insert the newly created group entry into key and id maps
621 getGroupStoreKeyMap().
alshabibb0285992016-03-28 23:30:37 -0700622 put(new GroupStoreKeyMapKey(groupDesc.deviceId(),
623 groupDesc.appCookie()), group);
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700624 // Ensure it also inserted into group id based table to
625 // avoid any chances of duplication in group id generation
626 getGroupIdTable(groupDesc.deviceId()).
alshabibb0285992016-03-28 23:30:37 -0700627 put(id, group);
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700628 log.debug("storeGroupDescriptionInternal: Processing Group ADD request for Id {} in device {}",
alshabibb0285992016-03-28 23:30:37 -0700629 id,
630 groupDesc.deviceId());
alshabib10580802015-02-18 18:30:33 -0800631 notifyDelegate(new GroupEvent(GroupEvent.Type.GROUP_ADD_REQUESTED,
632 group));
633 }
634
635 /**
636 * Updates the existing group entry with the information
637 * from group description.
638 *
alshabibb0285992016-03-28 23:30:37 -0700639 * @param deviceId the device ID
alshabib10580802015-02-18 18:30:33 -0800640 * @param oldAppCookie the current group key
alshabibb0285992016-03-28 23:30:37 -0700641 * @param type update type
642 * @param newBuckets group buckets for updates
alshabib10580802015-02-18 18:30:33 -0800643 * @param newAppCookie optional new group key
644 */
645 @Override
646 public void updateGroupDescription(DeviceId deviceId,
647 GroupKey oldAppCookie,
648 UpdateType type,
649 GroupBuckets newBuckets,
650 GroupKey newAppCookie) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700651 // Check if group update to be done by a remote instance
sangho52abe3a2015-05-05 14:13:34 -0700652 if (mastershipService.getMasterFor(deviceId) != null &&
653 mastershipService.getLocalRole(deviceId) != MastershipRole.MASTER) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700654 log.debug("updateGroupDescription: Device {} local role is not MASTER",
655 deviceId);
656 if (mastershipService.getMasterFor(deviceId) == null) {
657 log.error("No Master for device {}..."
alshabibb0285992016-03-28 23:30:37 -0700658 + "Can not perform update group operation",
659 deviceId);
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700660 //TODO: Send Group operation failure event
661 return;
662 }
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700663 GroupStoreMessage groupOp = GroupStoreMessage.
664 createGroupUpdateRequestMsg(deviceId,
665 oldAppCookie,
666 type,
667 newBuckets,
668 newAppCookie);
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700669
Madan Jampani175e8fd2015-05-20 14:10:45 -0700670 clusterCommunicator.unicast(groupOp,
alshabibb0285992016-03-28 23:30:37 -0700671 GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST,
672 clusterMsgSerializer::serialize,
673 mastershipService.getMasterFor(deviceId)).whenComplete((result, error) -> {
674 if (error != null) {
675 log.warn("Failed to send request to master: {} to {}",
676 groupOp,
677 mastershipService.getMasterFor(deviceId), error);
678 }
679 //TODO: Send Group operation failure event
680 });
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700681 return;
682 }
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700683 log.debug("updateGroupDescription for device {} is getting handled locally",
684 deviceId);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700685 updateGroupDescriptionInternal(deviceId,
686 oldAppCookie,
687 type,
688 newBuckets,
689 newAppCookie);
690 }
691
692 private void updateGroupDescriptionInternal(DeviceId deviceId,
alshabibb0285992016-03-28 23:30:37 -0700693 GroupKey oldAppCookie,
694 UpdateType type,
695 GroupBuckets newBuckets,
696 GroupKey newAppCookie) {
alshabib10580802015-02-18 18:30:33 -0800697 // Check if a group is existing with the provided key
698 Group oldGroup = getGroup(deviceId, oldAppCookie);
699 if (oldGroup == null) {
Saurav Das8be4e3a2016-03-11 17:19:07 -0800700 log.warn("updateGroupDescriptionInternal: Group not found...strange. "
alshabibb0285992016-03-28 23:30:37 -0700701 + "GroupKey:{} DeviceId:{}", oldAppCookie, deviceId);
alshabib10580802015-02-18 18:30:33 -0800702 return;
703 }
704
705 List<GroupBucket> newBucketList = getUpdatedBucketList(oldGroup,
706 type,
707 newBuckets);
708 if (newBucketList != null) {
709 // Create a new group object from the old group
710 GroupBuckets updatedBuckets = new GroupBuckets(newBucketList);
711 GroupKey newCookie = (newAppCookie != null) ? newAppCookie : oldAppCookie;
712 GroupDescription updatedGroupDesc = new DefaultGroupDescription(
713 oldGroup.deviceId(),
714 oldGroup.type(),
715 updatedBuckets,
716 newCookie,
Saurav Das100e3b82015-04-30 11:12:10 -0700717 oldGroup.givenGroupId(),
alshabib10580802015-02-18 18:30:33 -0800718 oldGroup.appId());
719 StoredGroupEntry newGroup = new DefaultGroup(oldGroup.id(),
720 updatedGroupDesc);
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700721 log.debug("updateGroupDescriptionInternal: group entry {} in device {} moving from {} to PENDING_UPDATE",
alshabibb0285992016-03-28 23:30:37 -0700722 oldGroup.id(),
723 oldGroup.deviceId(),
724 oldGroup.state());
alshabib10580802015-02-18 18:30:33 -0800725 newGroup.setState(GroupState.PENDING_UPDATE);
726 newGroup.setLife(oldGroup.life());
727 newGroup.setPackets(oldGroup.packets());
728 newGroup.setBytes(oldGroup.bytes());
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700729 //Update the group entry in groupkey based map.
730 //Update to groupid based map will happen in the
731 //groupkey based map update listener
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700732 log.debug("updateGroupDescriptionInternal with type {}: Group updated with buckets",
733 type);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700734 getGroupStoreKeyMap().
alshabibb0285992016-03-28 23:30:37 -0700735 put(new GroupStoreKeyMapKey(newGroup.deviceId(),
736 newGroup.appCookie()), newGroup);
alshabib10580802015-02-18 18:30:33 -0800737 notifyDelegate(new GroupEvent(Type.GROUP_UPDATE_REQUESTED, newGroup));
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700738 } else {
739 log.warn("updateGroupDescriptionInternal with type {}: No "
alshabibb0285992016-03-28 23:30:37 -0700740 + "change in the buckets in update", type);
alshabib10580802015-02-18 18:30:33 -0800741 }
742 }
743
744 private List<GroupBucket> getUpdatedBucketList(Group oldGroup,
745 UpdateType type,
746 GroupBuckets buckets) {
Victor Silva0282ab82016-11-15 16:30:27 -0300747 if (type == UpdateType.SET) {
748 return buckets.buckets();
749 }
750
Victor Silvadf1eeae2016-08-12 15:28:57 -0300751 List<GroupBucket> oldBuckets = oldGroup.buckets().buckets();
752 List<GroupBucket> updatedBucketList = new ArrayList<>();
alshabib10580802015-02-18 18:30:33 -0800753 boolean groupDescUpdated = false;
754
755 if (type == UpdateType.ADD) {
Victor Silvadf1eeae2016-08-12 15:28:57 -0300756 List<GroupBucket> newBuckets = buckets.buckets();
757
758 // Add old buckets that will not be updated and check if any will be updated.
759 for (GroupBucket oldBucket : oldBuckets) {
760 int newBucketIndex = newBuckets.indexOf(oldBucket);
761
762 if (newBucketIndex != -1) {
763 GroupBucket newBucket = newBuckets.get(newBucketIndex);
764 if (!newBucket.hasSameParameters(oldBucket)) {
765 // Bucket will be updated
766 groupDescUpdated = true;
767 }
768 } else {
769 // Old bucket will remain the same - add it.
770 updatedBucketList.add(oldBucket);
alshabib10580802015-02-18 18:30:33 -0800771 }
772 }
Victor Silvadf1eeae2016-08-12 15:28:57 -0300773
774 // Add all new buckets
775 updatedBucketList.addAll(newBuckets);
776 if (!oldBuckets.containsAll(newBuckets)) {
777 groupDescUpdated = true;
778 }
779
alshabib10580802015-02-18 18:30:33 -0800780 } else if (type == UpdateType.REMOVE) {
Victor Silvadf1eeae2016-08-12 15:28:57 -0300781 List<GroupBucket> bucketsToRemove = buckets.buckets();
782
783 // Check which old buckets should remain
784 for (GroupBucket oldBucket : oldBuckets) {
785 if (!bucketsToRemove.contains(oldBucket)) {
786 updatedBucketList.add(oldBucket);
787 } else {
alshabib10580802015-02-18 18:30:33 -0800788 groupDescUpdated = true;
789 }
790 }
791 }
792
793 if (groupDescUpdated) {
Victor Silvadf1eeae2016-08-12 15:28:57 -0300794 return updatedBucketList;
alshabib10580802015-02-18 18:30:33 -0800795 } else {
796 return null;
797 }
798 }
799
800 /**
801 * Triggers deleting the existing group entry.
802 *
alshabibb0285992016-03-28 23:30:37 -0700803 * @param deviceId the device ID
alshabib10580802015-02-18 18:30:33 -0800804 * @param appCookie the group key
805 */
806 @Override
807 public void deleteGroupDescription(DeviceId deviceId,
808 GroupKey appCookie) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700809 // Check if group to be deleted by a remote instance
810 if (mastershipService.
811 getLocalRole(deviceId) != MastershipRole.MASTER) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700812 log.debug("deleteGroupDescription: Device {} local role is not MASTER",
813 deviceId);
814 if (mastershipService.getMasterFor(deviceId) == null) {
815 log.error("No Master for device {}..."
alshabibb0285992016-03-28 23:30:37 -0700816 + "Can not perform delete group operation",
817 deviceId);
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700818 //TODO: Send Group operation failure event
819 return;
820 }
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700821 GroupStoreMessage groupOp = GroupStoreMessage.
822 createGroupDeleteRequestMsg(deviceId,
823 appCookie);
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700824
Madan Jampani175e8fd2015-05-20 14:10:45 -0700825 clusterCommunicator.unicast(groupOp,
alshabibb0285992016-03-28 23:30:37 -0700826 GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST,
827 clusterMsgSerializer::serialize,
828 mastershipService.getMasterFor(deviceId)).whenComplete((result, error) -> {
829 if (error != null) {
830 log.warn("Failed to send request to master: {} to {}",
831 groupOp,
832 mastershipService.getMasterFor(deviceId), error);
833 }
834 //TODO: Send Group operation failure event
835 });
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700836 return;
837 }
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700838 log.debug("deleteGroupDescription in device {} is getting handled locally",
839 deviceId);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700840 deleteGroupDescriptionInternal(deviceId, appCookie);
841 }
842
843 private void deleteGroupDescriptionInternal(DeviceId deviceId,
844 GroupKey appCookie) {
alshabib10580802015-02-18 18:30:33 -0800845 // Check if a group is existing with the provided key
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700846 StoredGroupEntry existing = getStoredGroupEntry(deviceId, appCookie);
alshabib10580802015-02-18 18:30:33 -0800847 if (existing == null) {
848 return;
849 }
850
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700851 log.debug("deleteGroupDescriptionInternal: group entry {} in device {} moving from {} to PENDING_DELETE",
alshabibb0285992016-03-28 23:30:37 -0700852 existing.id(),
853 existing.deviceId(),
854 existing.state());
alshabib10580802015-02-18 18:30:33 -0800855 synchronized (existing) {
856 existing.setState(GroupState.PENDING_DELETE);
Saurav Das80980c72016-03-23 11:22:49 -0700857 getGroupStoreKeyMap().
alshabibb0285992016-03-28 23:30:37 -0700858 put(new GroupStoreKeyMapKey(existing.deviceId(), existing.appCookie()),
859 existing);
alshabib10580802015-02-18 18:30:33 -0800860 }
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700861 log.debug("deleteGroupDescriptionInternal: in device {} issuing GROUP_REMOVE_REQUESTED",
862 deviceId);
alshabib10580802015-02-18 18:30:33 -0800863 notifyDelegate(new GroupEvent(Type.GROUP_REMOVE_REQUESTED, existing));
864 }
865
866 /**
867 * Stores a new group entry, or updates an existing entry.
868 *
869 * @param group group entry
870 */
871 @Override
872 public void addOrUpdateGroupEntry(Group group) {
873 // check if this new entry is an update to an existing entry
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700874 StoredGroupEntry existing = getStoredGroupEntry(group.deviceId(),
875 group.id());
alshabib10580802015-02-18 18:30:33 -0800876 GroupEvent event = null;
877
878 if (existing != null) {
Saurav Das0fd79d92016-03-07 10:58:36 -0800879 log.trace("addOrUpdateGroupEntry: updating group entry {} in device {}",
alshabibb0285992016-03-28 23:30:37 -0700880 group.id(),
881 group.deviceId());
alshabib10580802015-02-18 18:30:33 -0800882 synchronized (existing) {
alshabibb0285992016-03-28 23:30:37 -0700883 for (GroupBucket bucket : group.buckets().buckets()) {
Sho SHIMIZU30d639b2015-05-05 09:30:35 -0700884 Optional<GroupBucket> matchingBucket =
Srikanth Vavilapalli10e75cd2015-04-13 16:21:24 -0700885 existing.buckets().buckets()
alshabibb0285992016-03-28 23:30:37 -0700886 .stream()
887 .filter((existingBucket) -> (existingBucket.equals(bucket)))
888 .findFirst();
Srikanth Vavilapalli10e75cd2015-04-13 16:21:24 -0700889 if (matchingBucket.isPresent()) {
890 ((StoredGroupBucketEntry) matchingBucket.
891 get()).setPackets(bucket.packets());
892 ((StoredGroupBucketEntry) matchingBucket.
893 get()).setBytes(bucket.bytes());
894 } else {
895 log.warn("addOrUpdateGroupEntry: No matching "
alshabibb0285992016-03-28 23:30:37 -0700896 + "buckets to update stats");
Srikanth Vavilapalli10e75cd2015-04-13 16:21:24 -0700897 }
898 }
alshabib10580802015-02-18 18:30:33 -0800899 existing.setLife(group.life());
900 existing.setPackets(group.packets());
901 existing.setBytes(group.bytes());
alshabibb0285992016-03-28 23:30:37 -0700902 existing.setReferenceCount(group.referenceCount());
Srikanth Vavilapalli5428b6c2015-05-14 20:22:47 -0700903 if ((existing.state() == GroupState.PENDING_ADD) ||
alshabibb0285992016-03-28 23:30:37 -0700904 (existing.state() == GroupState.PENDING_ADD_RETRY)) {
Saurav Das0fd79d92016-03-07 10:58:36 -0800905 log.trace("addOrUpdateGroupEntry: group entry {} in device {} moving from {} to ADDED",
alshabibb0285992016-03-28 23:30:37 -0700906 existing.id(),
907 existing.deviceId(),
908 existing.state());
alshabib10580802015-02-18 18:30:33 -0800909 existing.setState(GroupState.ADDED);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700910 existing.setIsGroupStateAddedFirstTime(true);
alshabib10580802015-02-18 18:30:33 -0800911 event = new GroupEvent(Type.GROUP_ADDED, existing);
912 } else {
Saurav Das0fd79d92016-03-07 10:58:36 -0800913 log.trace("addOrUpdateGroupEntry: group entry {} in device {} moving from {} to ADDED",
alshabibb0285992016-03-28 23:30:37 -0700914 existing.id(),
915 existing.deviceId(),
916 GroupState.PENDING_UPDATE);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700917 existing.setState(GroupState.ADDED);
918 existing.setIsGroupStateAddedFirstTime(false);
alshabib10580802015-02-18 18:30:33 -0800919 event = new GroupEvent(Type.GROUP_UPDATED, existing);
920 }
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700921 //Re-PUT map entries to trigger map update events
922 getGroupStoreKeyMap().
alshabibb0285992016-03-28 23:30:37 -0700923 put(new GroupStoreKeyMapKey(existing.deviceId(),
924 existing.appCookie()), existing);
alshabib10580802015-02-18 18:30:33 -0800925 }
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700926 } else {
927 log.warn("addOrUpdateGroupEntry: Group update "
alshabibb0285992016-03-28 23:30:37 -0700928 + "happening for a non-existing entry in the map");
alshabib10580802015-02-18 18:30:33 -0800929 }
930
931 if (event != null) {
932 notifyDelegate(event);
933 }
934 }
935
936 /**
937 * Removes the group entry from store.
938 *
939 * @param group group entry
940 */
941 @Override
942 public void removeGroupEntry(Group group) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700943 StoredGroupEntry existing = getStoredGroupEntry(group.deviceId(),
944 group.id());
alshabib10580802015-02-18 18:30:33 -0800945
946 if (existing != null) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700947 log.debug("removeGroupEntry: removing group entry {} in device {}",
alshabibb0285992016-03-28 23:30:37 -0700948 group.id(),
949 group.deviceId());
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700950 //Removal from groupid based map will happen in the
951 //map update listener
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700952 getGroupStoreKeyMap().remove(new GroupStoreKeyMapKey(existing.deviceId(),
953 existing.appCookie()));
alshabib10580802015-02-18 18:30:33 -0800954 notifyDelegate(new GroupEvent(Type.GROUP_REMOVED, existing));
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700955 } else {
956 log.warn("removeGroupEntry for {} in device{} is "
alshabibb0285992016-03-28 23:30:37 -0700957 + "not existing in our maps",
958 group.id(),
959 group.deviceId());
alshabib10580802015-02-18 18:30:33 -0800960 }
961 }
962
Victor Silva4e8b7832016-08-17 17:11:19 -0300963 private void purgeGroupEntries(Set<Entry<GroupStoreKeyMapKey, StoredGroupEntry>> entries) {
964 entries.forEach(entry -> {
965 groupStoreEntriesByKey.remove(entry.getKey());
966 });
967 }
968
alshabib10580802015-02-18 18:30:33 -0800969 @Override
Charles Chan0c7c43b2016-01-14 17:39:20 -0800970 public void purgeGroupEntry(DeviceId deviceId) {
Victor Silva4e8b7832016-08-17 17:11:19 -0300971 Set<Entry<GroupStoreKeyMapKey, StoredGroupEntry>> entriesPendingRemove =
Charles Chan0c7c43b2016-01-14 17:39:20 -0800972 new HashSet<>();
973
Madan Jampani0b847532016-03-03 13:44:15 -0800974 getGroupStoreKeyMap().entrySet().stream()
Charles Chan0c7c43b2016-01-14 17:39:20 -0800975 .filter(entry -> entry.getKey().deviceId().equals(deviceId))
Victor Silva4e8b7832016-08-17 17:11:19 -0300976 .forEach(entriesPendingRemove::add);
Charles Chan0c7c43b2016-01-14 17:39:20 -0800977
Victor Silva4e8b7832016-08-17 17:11:19 -0300978 purgeGroupEntries(entriesPendingRemove);
979 }
980
981 @Override
982 public void purgeGroupEntries() {
983 purgeGroupEntries(getGroupStoreKeyMap().entrySet());
Charles Chan0c7c43b2016-01-14 17:39:20 -0800984 }
985
986 @Override
alshabib10580802015-02-18 18:30:33 -0800987 public void deviceInitialAuditCompleted(DeviceId deviceId,
988 boolean completed) {
989 synchronized (deviceAuditStatus) {
990 if (completed) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700991 log.debug("AUDIT completed for device {}",
992 deviceId);
alshabib10580802015-02-18 18:30:33 -0800993 deviceAuditStatus.put(deviceId, true);
994 // Execute all pending group requests
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700995 List<StoredGroupEntry> pendingGroupRequests =
996 getPendingGroupKeyTable().values()
alshabibb0285992016-03-28 23:30:37 -0700997 .stream()
998 .filter(g -> g.deviceId().equals(deviceId))
999 .collect(Collectors.toList());
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001000 log.debug("processing pending group add requests for device {} and number of pending requests {}",
alshabibb0285992016-03-28 23:30:37 -07001001 deviceId,
1002 pendingGroupRequests.size());
1003 for (Group group : pendingGroupRequests) {
alshabib10580802015-02-18 18:30:33 -08001004 GroupDescription tmp = new DefaultGroupDescription(
1005 group.deviceId(),
1006 group.type(),
1007 group.buckets(),
1008 group.appCookie(),
Saurav Das100e3b82015-04-30 11:12:10 -07001009 group.givenGroupId(),
alshabib10580802015-02-18 18:30:33 -08001010 group.appId());
1011 storeGroupDescriptionInternal(tmp);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001012 getPendingGroupKeyTable().
alshabibb0285992016-03-28 23:30:37 -07001013 remove(new GroupStoreKeyMapKey(deviceId, group.appCookie()));
alshabib10580802015-02-18 18:30:33 -08001014 }
alshabib10580802015-02-18 18:30:33 -08001015 } else {
Thomas Vachuskac40d4632015-04-09 16:55:03 -07001016 Boolean audited = deviceAuditStatus.get(deviceId);
1017 if (audited != null && audited) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001018 log.debug("Clearing AUDIT status for device {}", deviceId);
alshabib10580802015-02-18 18:30:33 -08001019 deviceAuditStatus.put(deviceId, false);
1020 }
1021 }
1022 }
1023 }
1024
1025 @Override
1026 public boolean deviceInitialAuditStatus(DeviceId deviceId) {
1027 synchronized (deviceAuditStatus) {
Thomas Vachuskac40d4632015-04-09 16:55:03 -07001028 Boolean audited = deviceAuditStatus.get(deviceId);
1029 return audited != null && audited;
alshabib10580802015-02-18 18:30:33 -08001030 }
1031 }
1032
1033 @Override
1034 public void groupOperationFailed(DeviceId deviceId, GroupOperation operation) {
1035
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001036 StoredGroupEntry existing = getStoredGroupEntry(deviceId,
1037 operation.groupId());
alshabib10580802015-02-18 18:30:33 -08001038
1039 if (existing == null) {
1040 log.warn("No group entry with ID {} found ", operation.groupId());
1041 return;
1042 }
1043
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001044 log.warn("groupOperationFailed: group operation {} failed"
alshabibb0285992016-03-28 23:30:37 -07001045 + "for group {} in device {} with code {}",
1046 operation.opType(),
1047 existing.id(),
1048 existing.deviceId(),
1049 operation.failureCode());
Saurav Das0fd79d92016-03-07 10:58:36 -08001050 if (operation.failureCode() == GroupOperation.GroupMsgErrorCode.GROUP_EXISTS) {
1051 log.warn("Current extraneous groups in device:{} are: {}",
1052 deviceId,
1053 getExtraneousGroups(deviceId));
Saurav Das8be4e3a2016-03-11 17:19:07 -08001054 if (operation.buckets().equals(existing.buckets())) {
1055 if (existing.state() == GroupState.PENDING_ADD) {
1056 log.info("GROUP_EXISTS: GroupID and Buckets match for group in pending "
alshabibb0285992016-03-28 23:30:37 -07001057 + "add state - moving to ADDED for group {} in device {}",
1058 existing.id(), deviceId);
Saurav Das8be4e3a2016-03-11 17:19:07 -08001059 addOrUpdateGroupEntry(existing);
1060 return;
1061 } else {
1062 log.warn("GROUP EXISTS: Group ID matched but buckets did not. "
alshabibb0285992016-03-28 23:30:37 -07001063 + "Operation: {} Existing: {}", operation.buckets(),
1064 existing.buckets());
Saurav Das8be4e3a2016-03-11 17:19:07 -08001065 }
1066 }
Saurav Das0fd79d92016-03-07 10:58:36 -08001067 }
alshabib10580802015-02-18 18:30:33 -08001068 switch (operation.opType()) {
1069 case ADD:
Srikanth Vavilapalli5428b6c2015-05-14 20:22:47 -07001070 if (existing.state() == GroupState.PENDING_ADD) {
Srikanth Vavilapalli5428b6c2015-05-14 20:22:47 -07001071 notifyDelegate(new GroupEvent(Type.GROUP_ADD_FAILED, existing));
1072 log.warn("groupOperationFailed: cleaningup "
alshabibb0285992016-03-28 23:30:37 -07001073 + "group {} from store in device {}....",
1074 existing.id(),
1075 existing.deviceId());
Srikanth Vavilapalli5428b6c2015-05-14 20:22:47 -07001076 //Removal from groupid based map will happen in the
1077 //map update listener
1078 getGroupStoreKeyMap().remove(new GroupStoreKeyMapKey(existing.deviceId(),
1079 existing.appCookie()));
1080 }
alshabib10580802015-02-18 18:30:33 -08001081 break;
1082 case MODIFY:
1083 notifyDelegate(new GroupEvent(Type.GROUP_UPDATE_FAILED, existing));
1084 break;
1085 case DELETE:
1086 notifyDelegate(new GroupEvent(Type.GROUP_REMOVE_FAILED, existing));
1087 break;
1088 default:
1089 log.warn("Unknown group operation type {}", operation.opType());
1090 }
alshabib10580802015-02-18 18:30:33 -08001091 }
1092
1093 @Override
1094 public void addOrUpdateExtraneousGroupEntry(Group group) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001095 log.debug("add/update extraneous group entry {} in device {}",
alshabibb0285992016-03-28 23:30:37 -07001096 group.id(),
1097 group.deviceId());
alshabib10580802015-02-18 18:30:33 -08001098 ConcurrentMap<GroupId, Group> extraneousIdTable =
1099 getExtraneousGroupIdTable(group.deviceId());
1100 extraneousIdTable.put(group.id(), group);
Srikanth Vavilapallie48b3cf2015-07-06 11:43:07 -07001101 // Don't remove the extraneous groups, instead re-use it when
1102 // a group request comes with the same set of buckets
alshabib10580802015-02-18 18:30:33 -08001103 }
1104
1105 @Override
1106 public void removeExtraneousGroupEntry(Group group) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001107 log.debug("remove extraneous group entry {} of device {} from store",
alshabibb0285992016-03-28 23:30:37 -07001108 group.id(),
1109 group.deviceId());
alshabib10580802015-02-18 18:30:33 -08001110 ConcurrentMap<GroupId, Group> extraneousIdTable =
1111 getExtraneousGroupIdTable(group.deviceId());
1112 extraneousIdTable.remove(group.id());
1113 }
1114
1115 @Override
1116 public Iterable<Group> getExtraneousGroups(DeviceId deviceId) {
1117 // flatten and make iterator unmodifiable
1118 return FluentIterable.from(
1119 getExtraneousGroupIdTable(deviceId).values());
1120 }
1121
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001122 /**
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -07001123 * Map handler to receive any events when the group key map is updated.
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001124 */
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -07001125 private class GroupStoreKeyMapListener implements
Madan Jampani0b847532016-03-03 13:44:15 -08001126 MapEventListener<GroupStoreKeyMapKey, StoredGroupEntry> {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001127
1128 @Override
Madan Jampani0b847532016-03-03 13:44:15 -08001129 public void event(MapEvent<GroupStoreKeyMapKey, StoredGroupEntry> mapEvent) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001130 GroupEvent groupEvent = null;
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001131 GroupStoreKeyMapKey key = mapEvent.key();
Madan Jampani0b847532016-03-03 13:44:15 -08001132 StoredGroupEntry group = Versioned.valueOrNull(mapEvent.newValue());
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001133 if ((key == null) && (group == null)) {
1134 log.error("GroupStoreKeyMapListener: Received "
alshabibb0285992016-03-28 23:30:37 -07001135 + "event {} with null entry", mapEvent.type());
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001136 return;
1137 } else if (group == null) {
1138 group = getGroupIdTable(key.deviceId()).values()
1139 .stream()
1140 .filter((storedGroup) -> (storedGroup.appCookie().equals(key.appCookie)))
Yuta HIGUCHI6e5f4702016-11-21 11:42:11 -08001141 .findFirst().orElse(null);
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001142 if (group == null) {
1143 log.error("GroupStoreKeyMapListener: Received "
alshabibb0285992016-03-28 23:30:37 -07001144 + "event {} with null entry... can not process", mapEvent.type());
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001145 return;
1146 }
1147 }
1148 log.trace("received groupid map event {} for id {} in device {}",
1149 mapEvent.type(),
1150 group.id(),
jaegonkim68e080c2016-12-01 22:31:01 +09001151 (key != null ? key.deviceId() : null));
Madan Jampani0b847532016-03-03 13:44:15 -08001152 if (mapEvent.type() == MapEvent.Type.INSERT || mapEvent.type() == MapEvent.Type.UPDATE) {
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -07001153 // Update the group ID table
1154 getGroupIdTable(group.deviceId()).put(group.id(), group);
Madan Jampani0b847532016-03-03 13:44:15 -08001155 StoredGroupEntry value = Versioned.valueOrNull(mapEvent.newValue());
1156 if (value.state() == Group.GroupState.ADDED) {
1157 if (value.isGroupStateAddedFirstTime()) {
1158 groupEvent = new GroupEvent(Type.GROUP_ADDED, value);
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001159 log.trace("Received first time GROUP_ADDED state update for id {} in device {}",
alshabibb0285992016-03-28 23:30:37 -07001160 group.id(),
1161 group.deviceId());
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001162 } else {
Madan Jampani0b847532016-03-03 13:44:15 -08001163 groupEvent = new GroupEvent(Type.GROUP_UPDATED, value);
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001164 log.trace("Received following GROUP_ADDED state update for id {} in device {}",
alshabibb0285992016-03-28 23:30:37 -07001165 group.id(),
1166 group.deviceId());
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001167 }
1168 }
Madan Jampani0b847532016-03-03 13:44:15 -08001169 } else if (mapEvent.type() == MapEvent.Type.REMOVE) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001170 groupEvent = new GroupEvent(Type.GROUP_REMOVED, group);
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -07001171 // Remove the entry from the group ID table
1172 getGroupIdTable(group.deviceId()).remove(group.id(), group);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001173 }
1174
1175 if (groupEvent != null) {
1176 notifyDelegate(groupEvent);
1177 }
1178 }
1179 }
Madan Jampani01e05fb2015-08-13 13:29:36 -07001180
helenyrwua1c41152016-08-18 16:16:14 -07001181 private void processGroupMessage(GroupStoreMessage message) {
1182 if (message.type() == GroupStoreMessage.Type.FAILOVER) {
1183 // FIXME: groupStoreEntriesByKey inaccessible here
1184 getGroupIdTable(message.deviceId()).values()
1185 .stream()
1186 .filter((storedGroup) -> (storedGroup.appCookie().equals(message.appCookie())))
1187 .findFirst().ifPresent(group -> notifyDelegate(new GroupEvent(Type.GROUP_BUCKET_FAILOVER, group)));
1188 }
1189 }
1190
Madan Jampani01e05fb2015-08-13 13:29:36 -07001191 private void process(GroupStoreMessage groupOp) {
1192 log.debug("Received remote group operation {} request for device {}",
alshabibb0285992016-03-28 23:30:37 -07001193 groupOp.type(),
1194 groupOp.deviceId());
1195 if (!mastershipService.isLocalMaster(groupOp.deviceId())) {
1196 log.warn("This node is not MASTER for device {}", groupOp.deviceId());
1197 return;
1198 }
1199 if (groupOp.type() == GroupStoreMessage.Type.ADD) {
1200 storeGroupDescriptionInternal(groupOp.groupDesc());
1201 } else if (groupOp.type() == GroupStoreMessage.Type.UPDATE) {
1202 updateGroupDescriptionInternal(groupOp.deviceId(),
1203 groupOp.appCookie(),
1204 groupOp.updateType(),
1205 groupOp.updateBuckets(),
1206 groupOp.newAppCookie());
1207 } else if (groupOp.type() == GroupStoreMessage.Type.DELETE) {
1208 deleteGroupDescriptionInternal(groupOp.deviceId(),
1209 groupOp.appCookie());
1210 }
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001211 }
1212
1213 /**
1214 * Flattened map key to be used to store group entries.
1215 */
Ray Milkeyb3c5ce22015-08-10 09:07:36 -07001216 protected static class GroupStoreMapKey {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001217 private final DeviceId deviceId;
1218
1219 public GroupStoreMapKey(DeviceId deviceId) {
1220 this.deviceId = deviceId;
1221 }
1222
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001223 public DeviceId deviceId() {
1224 return deviceId;
1225 }
1226
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001227 @Override
1228 public boolean equals(Object o) {
1229 if (this == o) {
1230 return true;
1231 }
1232 if (!(o instanceof GroupStoreMapKey)) {
1233 return false;
1234 }
1235 GroupStoreMapKey that = (GroupStoreMapKey) o;
1236 return this.deviceId.equals(that.deviceId);
1237 }
1238
1239 @Override
1240 public int hashCode() {
1241 int result = 17;
1242
1243 result = 31 * result + Objects.hash(this.deviceId);
1244
1245 return result;
1246 }
1247 }
1248
Ray Milkeyb3c5ce22015-08-10 09:07:36 -07001249 protected static class GroupStoreKeyMapKey extends GroupStoreMapKey {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001250 private final GroupKey appCookie;
alshabibb0285992016-03-28 23:30:37 -07001251
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001252 public GroupStoreKeyMapKey(DeviceId deviceId,
1253 GroupKey appCookie) {
1254 super(deviceId);
1255 this.appCookie = appCookie;
1256 }
1257
1258 @Override
1259 public boolean equals(Object o) {
1260 if (this == o) {
1261 return true;
1262 }
1263 if (!(o instanceof GroupStoreKeyMapKey)) {
1264 return false;
1265 }
1266 GroupStoreKeyMapKey that = (GroupStoreKeyMapKey) o;
1267 return (super.equals(that) &&
1268 this.appCookie.equals(that.appCookie));
1269 }
1270
1271 @Override
1272 public int hashCode() {
1273 int result = 17;
1274
1275 result = 31 * result + super.hashCode() + Objects.hash(this.appCookie);
1276
1277 return result;
1278 }
1279 }
1280
Ray Milkeyb3c5ce22015-08-10 09:07:36 -07001281 protected static class GroupStoreIdMapKey extends GroupStoreMapKey {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001282 private final GroupId groupId;
alshabibb0285992016-03-28 23:30:37 -07001283
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001284 public GroupStoreIdMapKey(DeviceId deviceId,
1285 GroupId groupId) {
1286 super(deviceId);
1287 this.groupId = groupId;
1288 }
1289
1290 @Override
1291 public boolean equals(Object o) {
1292 if (this == o) {
1293 return true;
1294 }
1295 if (!(o instanceof GroupStoreIdMapKey)) {
1296 return false;
1297 }
1298 GroupStoreIdMapKey that = (GroupStoreIdMapKey) o;
1299 return (super.equals(that) &&
1300 this.groupId.equals(that.groupId));
1301 }
1302
1303 @Override
1304 public int hashCode() {
1305 int result = 17;
1306
1307 result = 31 * result + super.hashCode() + Objects.hash(this.groupId);
1308
1309 return result;
1310 }
1311 }
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001312
1313 @Override
1314 public void pushGroupMetrics(DeviceId deviceId,
1315 Collection<Group> groupEntries) {
1316 boolean deviceInitialAuditStatus =
1317 deviceInitialAuditStatus(deviceId);
1318 Set<Group> southboundGroupEntries =
1319 Sets.newHashSet(groupEntries);
1320 Set<StoredGroupEntry> storedGroupEntries =
1321 Sets.newHashSet(getStoredGroups(deviceId));
1322 Set<Group> extraneousStoredEntries =
1323 Sets.newHashSet(getExtraneousGroups(deviceId));
1324
Sho SHIMIZU695bac62016-08-15 12:41:59 -07001325 if (log.isTraceEnabled()) {
1326 log.trace("pushGroupMetrics: Displaying all ({}) southboundGroupEntries for device {}",
1327 southboundGroupEntries.size(),
1328 deviceId);
1329 for (Group group : southboundGroupEntries) {
1330 log.trace("Group {} in device {}", group, deviceId);
1331 }
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001332
Sho SHIMIZU695bac62016-08-15 12:41:59 -07001333 log.trace("Displaying all ({}) stored group entries for device {}",
1334 storedGroupEntries.size(),
1335 deviceId);
1336 for (StoredGroupEntry group : storedGroupEntries) {
1337 log.trace("Stored Group {} for device {}", group, deviceId);
1338 }
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001339 }
1340
alshabibb0285992016-03-28 23:30:37 -07001341 garbageCollect(deviceId, southboundGroupEntries, storedGroupEntries);
1342
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001343 for (Iterator<Group> it2 = southboundGroupEntries.iterator(); it2.hasNext();) {
1344 Group group = it2.next();
1345 if (storedGroupEntries.remove(group)) {
1346 // we both have the group, let's update some info then.
1347 log.trace("Group AUDIT: group {} exists in both planes for device {}",
alshabibb0285992016-03-28 23:30:37 -07001348 group.id(), deviceId);
1349
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001350 groupAdded(group);
1351 it2.remove();
1352 }
1353 }
1354 for (Group group : southboundGroupEntries) {
1355 if (getGroup(group.deviceId(), group.id()) != null) {
1356 // There is a group existing with the same id
1357 // It is possible that group update is
1358 // in progress while we got a stale info from switch
1359 if (!storedGroupEntries.remove(getGroup(
alshabibb0285992016-03-28 23:30:37 -07001360 group.deviceId(), group.id()))) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001361 log.warn("Group AUDIT: Inconsistent state:"
alshabibb0285992016-03-28 23:30:37 -07001362 + "Group exists in ID based table while "
1363 + "not present in key based table");
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001364 }
1365 } else {
1366 // there are groups in the switch that aren't in the store
1367 log.debug("Group AUDIT: extraneous group {} exists in data plane for device {}",
alshabibb0285992016-03-28 23:30:37 -07001368 group.id(), deviceId);
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001369 extraneousStoredEntries.remove(group);
1370 extraneousGroup(group);
1371 }
1372 }
1373 for (Group group : storedGroupEntries) {
1374 // there are groups in the store that aren't in the switch
1375 log.debug("Group AUDIT: group {} missing in data plane for device {}",
alshabibb0285992016-03-28 23:30:37 -07001376 group.id(), deviceId);
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001377 groupMissing(group);
1378 }
1379 for (Group group : extraneousStoredEntries) {
1380 // there are groups in the extraneous store that
1381 // aren't in the switch
Saurav Das0fd79d92016-03-07 10:58:36 -08001382 log.debug("Group AUDIT: clearing extraneous group {} from store for device {}",
alshabibb0285992016-03-28 23:30:37 -07001383 group.id(), deviceId);
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001384 removeExtraneousGroupEntry(group);
1385 }
1386
1387 if (!deviceInitialAuditStatus) {
Saurav Das0fd79d92016-03-07 10:58:36 -08001388 log.info("Group AUDIT: Setting device {} initial AUDIT completed",
alshabibb0285992016-03-28 23:30:37 -07001389 deviceId);
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001390 deviceInitialAuditCompleted(deviceId, true);
1391 }
1392 }
1393
helenyrwu89470f12016-08-12 13:18:10 -07001394 @Override
1395 public void notifyOfFailovers(Collection<Group> failoverGroups) {
helenyrwu89470f12016-08-12 13:18:10 -07001396 failoverGroups.forEach(group -> {
1397 if (group.type() == Group.Type.FAILOVER) {
helenyrwua1c41152016-08-18 16:16:14 -07001398 groupTopic.publish(GroupStoreMessage.createGroupFailoverMsg(
1399 group.deviceId(), group));
helenyrwu89470f12016-08-12 13:18:10 -07001400 }
1401 });
helenyrwu89470f12016-08-12 13:18:10 -07001402 }
1403
alshabibb0285992016-03-28 23:30:37 -07001404 private void garbageCollect(DeviceId deviceId,
1405 Set<Group> southboundGroupEntries,
1406 Set<StoredGroupEntry> storedGroupEntries) {
1407 if (!garbageCollect) {
1408 return;
1409 }
1410
1411 Iterator<StoredGroupEntry> it = storedGroupEntries.iterator();
1412 while (it.hasNext()) {
1413 StoredGroupEntry group = it.next();
1414 if (group.state() != GroupState.PENDING_DELETE && checkGroupRefCount(group)) {
1415 log.debug("Garbage collecting group {} on {}", group, deviceId);
1416 deleteGroupDescription(deviceId, group.appCookie());
1417 southboundGroupEntries.remove(group);
1418 it.remove();
1419 }
1420 }
1421 }
1422
1423 private boolean checkGroupRefCount(Group group) {
1424 return (group.referenceCount() == 0 && group.age() >= gcThresh);
1425 }
1426
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001427 private void groupMissing(Group group) {
1428 switch (group.state()) {
1429 case PENDING_DELETE:
1430 log.debug("Group {} delete confirmation from device {}",
1431 group, group.deviceId());
1432 removeGroupEntry(group);
1433 break;
1434 case ADDED:
1435 case PENDING_ADD:
Srikanth Vavilapalli5428b6c2015-05-14 20:22:47 -07001436 case PENDING_ADD_RETRY:
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001437 case PENDING_UPDATE:
1438 log.debug("Group {} is in store but not on device {}",
1439 group, group.deviceId());
1440 StoredGroupEntry existing =
1441 getStoredGroupEntry(group.deviceId(), group.id());
Srikanth Vavilapalli5428b6c2015-05-14 20:22:47 -07001442 log.debug("groupMissing: group entry {} in device {} moving from {} to PENDING_ADD_RETRY",
alshabibb0285992016-03-28 23:30:37 -07001443 existing.id(),
1444 existing.deviceId(),
1445 existing.state());
Srikanth Vavilapalli5428b6c2015-05-14 20:22:47 -07001446 existing.setState(Group.GroupState.PENDING_ADD_RETRY);
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001447 //Re-PUT map entries to trigger map update events
1448 getGroupStoreKeyMap().
alshabibb0285992016-03-28 23:30:37 -07001449 put(new GroupStoreKeyMapKey(existing.deviceId(),
1450 existing.appCookie()), existing);
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001451 notifyDelegate(new GroupEvent(GroupEvent.Type.GROUP_ADD_REQUESTED,
1452 group));
1453 break;
1454 default:
1455 log.debug("Group {} has not been installed.", group);
1456 break;
1457 }
1458 }
1459
1460 private void extraneousGroup(Group group) {
Saurav Das0fd79d92016-03-07 10:58:36 -08001461 log.trace("Group {} is on device {} but not in store.",
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001462 group, group.deviceId());
1463 addOrUpdateExtraneousGroupEntry(group);
1464 }
1465
1466 private void groupAdded(Group group) {
1467 log.trace("Group {} Added or Updated in device {}",
1468 group, group.deviceId());
1469 addOrUpdateGroupEntry(group);
1470 }
alshabib10580802015-02-18 18:30:33 -08001471}