blob: 5fea69af189c1d7647484575dd6bca684b4372ed [file] [log] [blame]
alshabib10580802015-02-18 18:30:33 -08001/*
Brian O'Connor5ab426f2016-04-09 01:19:45 -07002 * Copyright 2015-present Open Networking Laboratory
alshabib10580802015-02-18 18:30:33 -08003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.group.impl;
17
Jonathan Hart6ec029a2015-03-24 17:12:35 -070018import com.google.common.collect.FluentIterable;
Charles Chanf4838a72015-12-07 18:13:45 -080019import com.google.common.collect.ImmutableSet;
Jonathan Hart6ec029a2015-03-24 17:12:35 -070020import com.google.common.collect.Iterables;
Srikanth Vavilapalli23181912015-05-04 09:48:09 -070021import com.google.common.collect.Sets;
alshabib10580802015-02-18 18:30:33 -080022import org.apache.felix.scr.annotations.Activate;
23import org.apache.felix.scr.annotations.Component;
24import org.apache.felix.scr.annotations.Deactivate;
alshabibb0285992016-03-28 23:30:37 -070025import org.apache.felix.scr.annotations.Modified;
26import org.apache.felix.scr.annotations.Property;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070027import org.apache.felix.scr.annotations.Reference;
28import org.apache.felix.scr.annotations.ReferenceCardinality;
alshabib10580802015-02-18 18:30:33 -080029import org.apache.felix.scr.annotations.Service;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070030import org.onlab.util.KryoNamespace;
alshabibb0285992016-03-28 23:30:37 -070031import org.onosproject.cfg.ComponentConfigService;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070032import org.onosproject.cluster.ClusterService;
Charles Chanf4838a72015-12-07 18:13:45 -080033import org.onosproject.cluster.NodeId;
alshabib10580802015-02-18 18:30:33 -080034import org.onosproject.core.DefaultGroupId;
35import org.onosproject.core.GroupId;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070036import org.onosproject.mastership.MastershipService;
alshabib10580802015-02-18 18:30:33 -080037import org.onosproject.net.DeviceId;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070038import org.onosproject.net.MastershipRole;
alshabib10580802015-02-18 18:30:33 -080039import org.onosproject.net.group.DefaultGroup;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070040import org.onosproject.net.group.DefaultGroupBucket;
alshabib10580802015-02-18 18:30:33 -080041import org.onosproject.net.group.DefaultGroupDescription;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070042import org.onosproject.net.group.DefaultGroupKey;
alshabib10580802015-02-18 18:30:33 -080043import org.onosproject.net.group.Group;
44import org.onosproject.net.group.Group.GroupState;
45import org.onosproject.net.group.GroupBucket;
46import org.onosproject.net.group.GroupBuckets;
47import org.onosproject.net.group.GroupDescription;
48import org.onosproject.net.group.GroupEvent;
49import org.onosproject.net.group.GroupEvent.Type;
50import org.onosproject.net.group.GroupKey;
51import org.onosproject.net.group.GroupOperation;
52import org.onosproject.net.group.GroupStore;
53import org.onosproject.net.group.GroupStoreDelegate;
Srikanth Vavilapalli10e75cd2015-04-13 16:21:24 -070054import org.onosproject.net.group.StoredGroupBucketEntry;
alshabib10580802015-02-18 18:30:33 -080055import org.onosproject.net.group.StoredGroupEntry;
56import org.onosproject.store.AbstractStore;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070057import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070058import org.onosproject.store.serializers.KryoNamespaces;
Madan Jampani0b847532016-03-03 13:44:15 -080059import org.onosproject.store.service.ConsistentMap;
60import org.onosproject.store.service.MapEvent;
61import org.onosproject.store.service.MapEventListener;
alshabibb0285992016-03-28 23:30:37 -070062import org.onosproject.store.service.MultiValuedTimestamp;
Madan Jampani0b847532016-03-03 13:44:15 -080063import org.onosproject.store.service.Serializer;
Jonathan Hart6ec029a2015-03-24 17:12:35 -070064import org.onosproject.store.service.StorageService;
helenyrwua1c41152016-08-18 16:16:14 -070065import org.onosproject.store.service.Topic;
Madan Jampani0b847532016-03-03 13:44:15 -080066import org.onosproject.store.service.Versioned;
Kavitha Alagesanc884c3ef2017-01-19 12:32:26 +053067import org.onosproject.store.service.DistributedPrimitive.Status;
alshabibb0285992016-03-28 23:30:37 -070068import org.osgi.service.component.ComponentContext;
alshabib10580802015-02-18 18:30:33 -080069import org.slf4j.Logger;
70
Jonathan Hart6ec029a2015-03-24 17:12:35 -070071import java.util.ArrayList;
Srikanth Vavilapalli23181912015-05-04 09:48:09 -070072import java.util.Collection;
Charles Chanf4838a72015-12-07 18:13:45 -080073import java.util.Collections;
alshabibb0285992016-03-28 23:30:37 -070074import java.util.Dictionary;
Jonathan Hart6ec029a2015-03-24 17:12:35 -070075import java.util.HashMap;
Charles Chan0c7c43b2016-01-14 17:39:20 -080076import java.util.HashSet;
Srikanth Vavilapalli23181912015-05-04 09:48:09 -070077import java.util.Iterator;
Jonathan Hart6ec029a2015-03-24 17:12:35 -070078import java.util.List;
Madan Jampani0b847532016-03-03 13:44:15 -080079import java.util.Map;
Charles Chan0c7c43b2016-01-14 17:39:20 -080080import java.util.Map.Entry;
Jonathan Hart6ec029a2015-03-24 17:12:35 -070081import java.util.Objects;
Sho SHIMIZU30d639b2015-05-05 09:30:35 -070082import java.util.Optional;
alshabibb0285992016-03-28 23:30:37 -070083import java.util.Properties;
Srikanth Vavilapalli23181912015-05-04 09:48:09 -070084import java.util.Set;
Jonathan Hart6ec029a2015-03-24 17:12:35 -070085import java.util.concurrent.ConcurrentHashMap;
86import java.util.concurrent.ConcurrentMap;
87import java.util.concurrent.ExecutorService;
88import java.util.concurrent.Executors;
Kavitha Alagesanc884c3ef2017-01-19 12:32:26 +053089import java.util.concurrent.ScheduledExecutorService;
Jonathan Hart6ec029a2015-03-24 17:12:35 -070090import java.util.concurrent.atomic.AtomicInteger;
Kavitha Alagesanc884c3ef2017-01-19 12:32:26 +053091import java.util.function.Consumer;
Jonathan Hart6ec029a2015-03-24 17:12:35 -070092import java.util.stream.Collectors;
93
alshabibb0285992016-03-28 23:30:37 -070094import static com.google.common.base.Strings.isNullOrEmpty;
Kavitha Alagesanc884c3ef2017-01-19 12:32:26 +053095import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
alshabibb0285992016-03-28 23:30:37 -070096import static org.onlab.util.Tools.get;
Jonathan Hart6ec029a2015-03-24 17:12:35 -070097import static org.onlab.util.Tools.groupedThreads;
98import static org.slf4j.LoggerFactory.getLogger;
alshabib10580802015-02-18 18:30:33 -080099
100/**
Saurav Das0fd79d92016-03-07 10:58:36 -0800101 * Manages inventory of group entries using distributed group stores from the
102 * storage service.
alshabib10580802015-02-18 18:30:33 -0800103 */
104@Component(immediate = true)
105@Service
106public class DistributedGroupStore
107 extends AbstractStore<GroupEvent, GroupStoreDelegate>
108 implements GroupStore {
109
110 private final Logger log = getLogger(getClass());
111
alshabibb0285992016-03-28 23:30:37 -0700112 private static final boolean GARBAGE_COLLECT = false;
113 private static final int GC_THRESH = 6;
114
alshabib10580802015-02-18 18:30:33 -0800115 private final int dummyId = 0xffffffff;
116 private final GroupId dummyGroupId = new DefaultGroupId(dummyId);
117
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700118 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
119 protected ClusterCommunicationService clusterCommunicator;
120
121 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
122 protected ClusterService clusterService;
123
124 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700125 protected StorageService storageService;
126
127 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700128 protected MastershipService mastershipService;
129
alshabibb0285992016-03-28 23:30:37 -0700130 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
131 protected ComponentConfigService cfgService;
132
Kavitha Alagesanc884c3ef2017-01-19 12:32:26 +0530133 private ScheduledExecutorService executor;
134 private Consumer<Status> statusChangeListener;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700135 // Per device group table with (device id + app cookie) as key
Madan Jampani0b847532016-03-03 13:44:15 -0800136 private ConsistentMap<GroupStoreKeyMapKey,
alshabibb0285992016-03-28 23:30:37 -0700137 StoredGroupEntry> groupStoreEntriesByKey = null;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700138 // Per device group table with (device id + group id) as key
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700139 private final ConcurrentMap<DeviceId, ConcurrentMap<GroupId, StoredGroupEntry>>
alshabibb0285992016-03-28 23:30:37 -0700140 groupEntriesById = new ConcurrentHashMap<>();
Madan Jampani0b847532016-03-03 13:44:15 -0800141 private ConsistentMap<GroupStoreKeyMapKey,
alshabibb0285992016-03-28 23:30:37 -0700142 StoredGroupEntry> auditPendingReqQueue = null;
Frank Wange0eb5ce2016-07-01 18:21:25 +0800143 private MapEventListener<GroupStoreKeyMapKey, StoredGroupEntry>
144 mapListener = new GroupStoreKeyMapListener();
alshabib10580802015-02-18 18:30:33 -0800145 private final ConcurrentMap<DeviceId, ConcurrentMap<GroupId, Group>>
146 extraneousGroupEntriesById = new ConcurrentHashMap<>();
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700147 private ExecutorService messageHandlingExecutor;
148 private static final int MESSAGE_HANDLER_THREAD_POOL_SIZE = 1;
Sho SHIMIZU7a4087b2015-09-10 09:23:16 -0700149 private final HashMap<DeviceId, Boolean> deviceAuditStatus = new HashMap<>();
alshabib10580802015-02-18 18:30:33 -0800150
151 private final AtomicInteger groupIdGen = new AtomicInteger();
152
HIGUCHI Yuta180d70f2015-10-01 16:13:56 -0700153 private KryoNamespace clusterMsgSerializer;
154
helenyrwua1c41152016-08-18 16:16:14 -0700155 private static Topic<GroupStoreMessage> groupTopic;
156
alshabibb0285992016-03-28 23:30:37 -0700157 @Property(name = "garbageCollect", boolValue = GARBAGE_COLLECT,
158 label = "Enable group garbage collection")
159 private boolean garbageCollect = GARBAGE_COLLECT;
160
161 @Property(name = "gcThresh", intValue = GC_THRESH,
162 label = "Number of rounds for group garbage collection")
163 private int gcThresh = GC_THRESH;
164
165
alshabib10580802015-02-18 18:30:33 -0800166 @Activate
167 public void activate() {
alshabibb0285992016-03-28 23:30:37 -0700168 cfgService.registerProperties(getClass());
HIGUCHI Yuta3a84b322016-05-18 13:38:07 -0700169 KryoNamespace.Builder kryoBuilder = new KryoNamespace.Builder()
alshabibb0285992016-03-28 23:30:37 -0700170 .register(KryoNamespaces.API)
HIGUCHI Yuta3a84b322016-05-18 13:38:07 -0700171 .nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID)
alshabibb0285992016-03-28 23:30:37 -0700172 .register(DefaultGroup.class,
173 DefaultGroupBucket.class,
174 DefaultGroupDescription.class,
175 DefaultGroupKey.class,
176 GroupDescription.Type.class,
177 Group.GroupState.class,
178 GroupBuckets.class,
alshabibb0285992016-03-28 23:30:37 -0700179 GroupStoreMessage.class,
180 GroupStoreMessage.Type.class,
181 UpdateType.class,
182 GroupStoreMessageSubjects.class,
183 MultiValuedTimestamp.class,
184 GroupStoreKeyMapKey.class,
185 GroupStoreIdMapKey.class,
186 GroupStoreMapKey.class
187 );
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700188
HIGUCHI Yuta3a84b322016-05-18 13:38:07 -0700189 clusterMsgSerializer = kryoBuilder.build("GroupStore");
190 Serializer serializer = Serializer.using(clusterMsgSerializer);
HIGUCHI Yuta180d70f2015-10-01 16:13:56 -0700191
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700192 messageHandlingExecutor = Executors.
193 newFixedThreadPool(MESSAGE_HANDLER_THREAD_POOL_SIZE,
194 groupedThreads("onos/store/group",
HIGUCHI Yutad9e01052016-04-14 09:31:42 -0700195 "message-handlers",
196 log));
Madan Jampani01e05fb2015-08-13 13:29:36 -0700197
198 clusterCommunicator.addSubscriber(GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST,
alshabibb0285992016-03-28 23:30:37 -0700199 clusterMsgSerializer::deserialize,
200 this::process,
201 messageHandlingExecutor);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700202
Madan Jampani0b847532016-03-03 13:44:15 -0800203 log.debug("Creating Consistent map onos-group-store-keymap");
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700204
Madan Jampani0b847532016-03-03 13:44:15 -0800205 groupStoreEntriesByKey = storageService.<GroupStoreKeyMapKey, StoredGroupEntry>consistentMapBuilder()
206 .withName("onos-group-store-keymap")
HIGUCHI Yuta3a84b322016-05-18 13:38:07 -0700207 .withSerializer(serializer)
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700208 .build();
Frank Wange0eb5ce2016-07-01 18:21:25 +0800209 groupStoreEntriesByKey.addListener(mapListener);
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700210 log.debug("Current size of groupstorekeymap:{}",
211 groupStoreEntriesByKey.size());
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700212
Kavitha Alagesanc884c3ef2017-01-19 12:32:26 +0530213 log.debug("Creating GroupStoreId Map From GroupStoreKey Map");
214 matchGroupEntries();
215 executor = newSingleThreadScheduledExecutor(groupedThreads("onos/group", "store", log));
216 statusChangeListener = status -> {
217 if (status == Status.ACTIVE) {
218 executor.execute(this::matchGroupEntries);
219 }
220 };
221 groupStoreEntriesByKey.addStatusChangeListener(statusChangeListener);
222
Madan Jampani0b847532016-03-03 13:44:15 -0800223 log.debug("Creating Consistent map pendinggroupkeymap");
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700224
Madan Jampani0b847532016-03-03 13:44:15 -0800225 auditPendingReqQueue = storageService.<GroupStoreKeyMapKey, StoredGroupEntry>consistentMapBuilder()
226 .withName("onos-pending-group-keymap")
HIGUCHI Yuta3a84b322016-05-18 13:38:07 -0700227 .withSerializer(serializer)
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700228 .build();
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700229 log.debug("Current size of pendinggroupkeymap:{}",
230 auditPendingReqQueue.size());
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700231
helenyrwua1c41152016-08-18 16:16:14 -0700232 groupTopic = getOrCreateGroupTopic(serializer);
233 groupTopic.subscribe(this::processGroupMessage);
234
alshabib10580802015-02-18 18:30:33 -0800235 log.info("Started");
236 }
237
238 @Deactivate
239 public void deactivate() {
Frank Wange0eb5ce2016-07-01 18:21:25 +0800240 groupStoreEntriesByKey.removeListener(mapListener);
alshabibb0285992016-03-28 23:30:37 -0700241 cfgService.unregisterProperties(getClass(), false);
HIGUCHI Yuta180d70f2015-10-01 16:13:56 -0700242 clusterCommunicator.removeSubscriber(GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST);
alshabib10580802015-02-18 18:30:33 -0800243 log.info("Stopped");
244 }
245
alshabibb0285992016-03-28 23:30:37 -0700246 @Modified
247 public void modified(ComponentContext context) {
248 Dictionary<?, ?> properties = context != null ? context.getProperties() : new Properties();
249
250 try {
251 String s = get(properties, "garbageCollect");
252 garbageCollect = isNullOrEmpty(s) ? GARBAGE_COLLECT : Boolean.parseBoolean(s.trim());
253
254 s = get(properties, "gcThresh");
255 gcThresh = isNullOrEmpty(s) ? GC_THRESH : Integer.parseInt(s.trim());
256 } catch (Exception e) {
257 gcThresh = GC_THRESH;
258 garbageCollect = GARBAGE_COLLECT;
259 }
260 }
261
helenyrwua1c41152016-08-18 16:16:14 -0700262 private Topic<GroupStoreMessage> getOrCreateGroupTopic(Serializer serializer) {
263 if (groupTopic == null) {
264 return storageService.getTopic("group-failover-notif", serializer);
265 } else {
266 return groupTopic;
267 }
Sho SHIMIZUa6285542017-01-12 15:08:24 -0800268 }
helenyrwua1c41152016-08-18 16:16:14 -0700269
alshabib10580802015-02-18 18:30:33 -0800270 /**
Kavitha Alagesanc884c3ef2017-01-19 12:32:26 +0530271 * Updating values of groupEntriesById.
272 */
273 private void matchGroupEntries() {
274 for (Entry<GroupStoreKeyMapKey, StoredGroupEntry> entry : groupStoreEntriesByKey.asJavaMap().entrySet()) {
275 StoredGroupEntry group = entry.getValue();
276 getGroupIdTable(entry.getKey().deviceId()).put(group.id(), group);
277 }
278 }
279
280 /**
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700281 * Returns the group store eventual consistent key map.
alshabib10580802015-02-18 18:30:33 -0800282 *
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700283 * @return Map representing group key table.
alshabib10580802015-02-18 18:30:33 -0800284 */
Madan Jampani0b847532016-03-03 13:44:15 -0800285 private Map<GroupStoreKeyMapKey, StoredGroupEntry>
alshabibb0285992016-03-28 23:30:37 -0700286 getGroupStoreKeyMap() {
Madan Jampani0b847532016-03-03 13:44:15 -0800287 return groupStoreEntriesByKey.asJavaMap();
alshabib10580802015-02-18 18:30:33 -0800288 }
289
290 /**
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700291 * Returns the group id table for specified device.
alshabib10580802015-02-18 18:30:33 -0800292 *
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700293 * @param deviceId identifier of the device
294 * @return Map representing group key table of given device.
alshabib10580802015-02-18 18:30:33 -0800295 */
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700296 private ConcurrentMap<GroupId, StoredGroupEntry> getGroupIdTable(DeviceId deviceId) {
Yuta HIGUCHIc2e68152016-08-16 13:47:36 -0700297 return groupEntriesById.computeIfAbsent(deviceId, k -> new ConcurrentHashMap<>());
alshabib10580802015-02-18 18:30:33 -0800298 }
299
300 /**
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700301 * Returns the pending group request table.
alshabib10580802015-02-18 18:30:33 -0800302 *
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700303 * @return Map representing group key table.
alshabib10580802015-02-18 18:30:33 -0800304 */
Madan Jampani0b847532016-03-03 13:44:15 -0800305 private Map<GroupStoreKeyMapKey, StoredGroupEntry>
alshabibb0285992016-03-28 23:30:37 -0700306 getPendingGroupKeyTable() {
Madan Jampani0b847532016-03-03 13:44:15 -0800307 return auditPendingReqQueue.asJavaMap();
alshabib10580802015-02-18 18:30:33 -0800308 }
309
310 /**
311 * Returns the extraneous group id table for specified device.
312 *
313 * @param deviceId identifier of the device
314 * @return Map representing group key table of given device.
315 */
316 private ConcurrentMap<GroupId, Group>
317 getExtraneousGroupIdTable(DeviceId deviceId) {
Yuta HIGUCHIc2e68152016-08-16 13:47:36 -0700318 return extraneousGroupEntriesById.computeIfAbsent(deviceId, k -> new ConcurrentHashMap<>());
alshabib10580802015-02-18 18:30:33 -0800319 }
320
321 /**
322 * Returns the number of groups for the specified device in the store.
323 *
324 * @return number of groups for the specified device
325 */
326 @Override
327 public int getGroupCount(DeviceId deviceId) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700328 return (getGroups(deviceId) != null) ?
alshabibb0285992016-03-28 23:30:37 -0700329 Iterables.size(getGroups(deviceId)) : 0;
alshabib10580802015-02-18 18:30:33 -0800330 }
331
332 /**
333 * Returns the groups associated with a device.
334 *
335 * @param deviceId the device ID
alshabib10580802015-02-18 18:30:33 -0800336 * @return the group entries
337 */
338 @Override
339 public Iterable<Group> getGroups(DeviceId deviceId) {
Charles Chanf4838a72015-12-07 18:13:45 -0800340 // Let ImmutableSet.copyOf do the type conversion
341 return ImmutableSet.copyOf(getStoredGroups(deviceId));
alshabib10580802015-02-18 18:30:33 -0800342 }
343
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700344 private Iterable<StoredGroupEntry> getStoredGroups(DeviceId deviceId) {
Charles Chanf4838a72015-12-07 18:13:45 -0800345 NodeId master = mastershipService.getMasterFor(deviceId);
346 if (master == null) {
347 log.debug("Failed to getGroups: No master for {}", deviceId);
348 return Collections.emptySet();
349 }
350
351 Set<StoredGroupEntry> storedGroups = getGroupStoreKeyMap().values()
352 .stream()
353 .filter(input -> input.deviceId().equals(deviceId))
354 .collect(Collectors.toSet());
355 return ImmutableSet.copyOf(storedGroups);
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700356 }
357
alshabib10580802015-02-18 18:30:33 -0800358 /**
359 * Returns the stored group entry.
360 *
alshabibb0285992016-03-28 23:30:37 -0700361 * @param deviceId the device ID
alshabib10580802015-02-18 18:30:33 -0800362 * @param appCookie the group key
alshabib10580802015-02-18 18:30:33 -0800363 * @return a group associated with the key
364 */
365 @Override
366 public Group getGroup(DeviceId deviceId, GroupKey appCookie) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700367 return getStoredGroupEntry(deviceId, appCookie);
368 }
369
370 private StoredGroupEntry getStoredGroupEntry(DeviceId deviceId,
371 GroupKey appCookie) {
372 return getGroupStoreKeyMap().get(new GroupStoreKeyMapKey(deviceId,
373 appCookie));
374 }
375
376 @Override
377 public Group getGroup(DeviceId deviceId, GroupId groupId) {
378 return getStoredGroupEntry(deviceId, groupId);
379 }
380
381 private StoredGroupEntry getStoredGroupEntry(DeviceId deviceId,
382 GroupId groupId) {
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700383 return getGroupIdTable(deviceId).get(groupId);
alshabib10580802015-02-18 18:30:33 -0800384 }
385
386 private int getFreeGroupIdValue(DeviceId deviceId) {
387 int freeId = groupIdGen.incrementAndGet();
388
389 while (true) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700390 Group existing = getGroup(deviceId, new DefaultGroupId(freeId));
alshabib10580802015-02-18 18:30:33 -0800391 if (existing == null) {
392 existing = (
393 extraneousGroupEntriesById.get(deviceId) != null) ?
394 extraneousGroupEntriesById.get(deviceId).
395 get(new DefaultGroupId(freeId)) :
396 null;
397 }
398 if (existing != null) {
399 freeId = groupIdGen.incrementAndGet();
400 } else {
401 break;
402 }
403 }
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700404 log.debug("getFreeGroupIdValue: Next Free ID is {}", freeId);
alshabib10580802015-02-18 18:30:33 -0800405 return freeId;
406 }
407
408 /**
409 * Stores a new group entry using the information from group description.
410 *
411 * @param groupDesc group description to be used to create group entry
412 */
413 @Override
414 public void storeGroupDescription(GroupDescription groupDesc) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700415 log.debug("In storeGroupDescription");
alshabib10580802015-02-18 18:30:33 -0800416 // Check if a group is existing with the same key
Saurav Das8a0732e2015-11-20 15:27:53 -0800417 Group existingGroup = getGroup(groupDesc.deviceId(), groupDesc.appCookie());
418 if (existingGroup != null) {
Charles Chan216e3c82016-04-23 14:48:16 -0700419 log.info("Group already exists with the same key {} in dev:{} with id:0x{}",
Saurav Das8a0732e2015-11-20 15:27:53 -0800420 groupDesc.appCookie(), groupDesc.deviceId(),
421 Integer.toHexString(existingGroup.id().id()));
alshabib10580802015-02-18 18:30:33 -0800422 return;
423 }
424
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700425 // Check if group to be created by a remote instance
Madan Jampani175e8fd2015-05-20 14:10:45 -0700426 if (mastershipService.getLocalRole(groupDesc.deviceId()) != MastershipRole.MASTER) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700427 log.debug("storeGroupDescription: Device {} local role is not MASTER",
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700428 groupDesc.deviceId());
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700429 if (mastershipService.getMasterFor(groupDesc.deviceId()) == null) {
430 log.error("No Master for device {}..."
alshabibb0285992016-03-28 23:30:37 -0700431 + "Can not perform add group operation",
432 groupDesc.deviceId());
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700433 //TODO: Send Group operation failure event
434 return;
435 }
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700436 GroupStoreMessage groupOp = GroupStoreMessage.
437 createGroupAddRequestMsg(groupDesc.deviceId(),
438 groupDesc);
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700439
Madan Jampani175e8fd2015-05-20 14:10:45 -0700440 clusterCommunicator.unicast(groupOp,
alshabibb0285992016-03-28 23:30:37 -0700441 GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST,
442 clusterMsgSerializer::serialize,
443 mastershipService.getMasterFor(groupDesc.deviceId()))
444 .whenComplete((result, error) -> {
Madan Jampani175e8fd2015-05-20 14:10:45 -0700445 if (error != null) {
446 log.warn("Failed to send request to master: {} to {}",
alshabibb0285992016-03-28 23:30:37 -0700447 groupOp,
448 mastershipService.getMasterFor(groupDesc.deviceId()));
Madan Jampani175e8fd2015-05-20 14:10:45 -0700449 //TODO: Send Group operation failure event
450 } else {
451 log.debug("Sent Group operation request for device {} "
alshabibb0285992016-03-28 23:30:37 -0700452 + "to remote MASTER {}",
453 groupDesc.deviceId(),
454 mastershipService.getMasterFor(groupDesc.deviceId()));
Madan Jampani175e8fd2015-05-20 14:10:45 -0700455 }
456 });
alshabib10580802015-02-18 18:30:33 -0800457 return;
458 }
459
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700460 log.debug("Store group for device {} is getting handled locally",
461 groupDesc.deviceId());
alshabib10580802015-02-18 18:30:33 -0800462 storeGroupDescriptionInternal(groupDesc);
463 }
464
Srikanth Vavilapallie48b3cf2015-07-06 11:43:07 -0700465 private Group getMatchingExtraneousGroupbyId(DeviceId deviceId, Integer groupId) {
466 ConcurrentMap<GroupId, Group> extraneousMap =
467 extraneousGroupEntriesById.get(deviceId);
468 if (extraneousMap == null) {
469 return null;
470 }
471 return extraneousMap.get(new DefaultGroupId(groupId));
472 }
473
474 private Group getMatchingExtraneousGroupbyBuckets(DeviceId deviceId,
475 GroupBuckets buckets) {
476 ConcurrentMap<GroupId, Group> extraneousMap =
477 extraneousGroupEntriesById.get(deviceId);
478 if (extraneousMap == null) {
479 return null;
480 }
481
alshabibb0285992016-03-28 23:30:37 -0700482 for (Group extraneousGroup : extraneousMap.values()) {
Srikanth Vavilapallie48b3cf2015-07-06 11:43:07 -0700483 if (extraneousGroup.buckets().equals(buckets)) {
484 return extraneousGroup;
485 }
486 }
487 return null;
488 }
489
alshabib10580802015-02-18 18:30:33 -0800490 private void storeGroupDescriptionInternal(GroupDescription groupDesc) {
491 // Check if a group is existing with the same key
492 if (getGroup(groupDesc.deviceId(), groupDesc.appCookie()) != null) {
493 return;
494 }
495
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700496 if (deviceAuditStatus.get(groupDesc.deviceId()) == null) {
497 // Device group audit has not completed yet
498 // Add this group description to pending group key table
499 // Create a group entry object with Dummy Group ID
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700500 log.debug("storeGroupDescriptionInternal: Device {} AUDIT pending...Queuing Group ADD request",
alshabibb0285992016-03-28 23:30:37 -0700501 groupDesc.deviceId());
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700502 StoredGroupEntry group = new DefaultGroup(dummyGroupId, groupDesc);
503 group.setState(GroupState.WAITING_AUDIT_COMPLETE);
Madan Jampani0b847532016-03-03 13:44:15 -0800504 Map<GroupStoreKeyMapKey, StoredGroupEntry> pendingKeyTable =
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700505 getPendingGroupKeyTable();
506 pendingKeyTable.put(new GroupStoreKeyMapKey(groupDesc.deviceId(),
507 groupDesc.appCookie()),
508 group);
509 return;
510 }
511
Srikanth Vavilapallie48b3cf2015-07-06 11:43:07 -0700512 Group matchingExtraneousGroup = null;
513 if (groupDesc.givenGroupId() != null) {
514 //Check if there is a extraneous group existing with the same Id
515 matchingExtraneousGroup = getMatchingExtraneousGroupbyId(
alshabibb0285992016-03-28 23:30:37 -0700516 groupDesc.deviceId(), groupDesc.givenGroupId());
Srikanth Vavilapallie48b3cf2015-07-06 11:43:07 -0700517 if (matchingExtraneousGroup != null) {
Saurav Das0fd79d92016-03-07 10:58:36 -0800518 log.debug("storeGroupDescriptionInternal: Matching extraneous group "
alshabibb0285992016-03-28 23:30:37 -0700519 + "found in Device {} for group id 0x{}",
Srikanth Vavilapallie48b3cf2015-07-06 11:43:07 -0700520 groupDesc.deviceId(),
Saurav Das0fd79d92016-03-07 10:58:36 -0800521 Integer.toHexString(groupDesc.givenGroupId()));
Srikanth Vavilapallie48b3cf2015-07-06 11:43:07 -0700522 //Check if the group buckets matches with user provided buckets
523 if (matchingExtraneousGroup.buckets().equals(groupDesc.buckets())) {
524 //Group is already existing with the same buckets and Id
525 // Create a group entry object
Saurav Das0fd79d92016-03-07 10:58:36 -0800526 log.debug("storeGroupDescriptionInternal: Buckets also matching "
alshabibb0285992016-03-28 23:30:37 -0700527 + "in Device {} for group id 0x{}",
Srikanth Vavilapallie48b3cf2015-07-06 11:43:07 -0700528 groupDesc.deviceId(),
Saurav Das0fd79d92016-03-07 10:58:36 -0800529 Integer.toHexString(groupDesc.givenGroupId()));
Srikanth Vavilapallie48b3cf2015-07-06 11:43:07 -0700530 StoredGroupEntry group = new DefaultGroup(
alshabibb0285992016-03-28 23:30:37 -0700531 matchingExtraneousGroup.id(), groupDesc);
Srikanth Vavilapallie48b3cf2015-07-06 11:43:07 -0700532 // Insert the newly created group entry into key and id maps
533 getGroupStoreKeyMap().
alshabibb0285992016-03-28 23:30:37 -0700534 put(new GroupStoreKeyMapKey(groupDesc.deviceId(),
535 groupDesc.appCookie()), group);
Srikanth Vavilapallie48b3cf2015-07-06 11:43:07 -0700536 // Ensure it also inserted into group id based table to
537 // avoid any chances of duplication in group id generation
538 getGroupIdTable(groupDesc.deviceId()).
alshabibb0285992016-03-28 23:30:37 -0700539 put(matchingExtraneousGroup.id(), group);
Srikanth Vavilapallie48b3cf2015-07-06 11:43:07 -0700540 addOrUpdateGroupEntry(matchingExtraneousGroup);
541 removeExtraneousGroupEntry(matchingExtraneousGroup);
542 return;
543 } else {
544 //Group buckets are not matching. Update group
545 //with user provided buckets.
Saurav Das0fd79d92016-03-07 10:58:36 -0800546 log.debug("storeGroupDescriptionInternal: Buckets are not "
alshabibb0285992016-03-28 23:30:37 -0700547 + "matching in Device {} for group id 0x{}",
Srikanth Vavilapallie48b3cf2015-07-06 11:43:07 -0700548 groupDesc.deviceId(),
Saurav Das0fd79d92016-03-07 10:58:36 -0800549 Integer.toHexString(groupDesc.givenGroupId()));
550 StoredGroupEntry modifiedGroup = new DefaultGroup(
alshabibb0285992016-03-28 23:30:37 -0700551 matchingExtraneousGroup.id(), groupDesc);
Saurav Das0fd79d92016-03-07 10:58:36 -0800552 modifiedGroup.setState(GroupState.PENDING_UPDATE);
553 getGroupStoreKeyMap().
alshabibb0285992016-03-28 23:30:37 -0700554 put(new GroupStoreKeyMapKey(groupDesc.deviceId(),
555 groupDesc.appCookie()), modifiedGroup);
Saurav Das0fd79d92016-03-07 10:58:36 -0800556 // Ensure it also inserted into group id based table to
557 // avoid any chances of duplication in group id generation
558 getGroupIdTable(groupDesc.deviceId()).
alshabibb0285992016-03-28 23:30:37 -0700559 put(matchingExtraneousGroup.id(), modifiedGroup);
Saurav Das0fd79d92016-03-07 10:58:36 -0800560 removeExtraneousGroupEntry(matchingExtraneousGroup);
561 log.debug("storeGroupDescriptionInternal: Triggering Group "
alshabibb0285992016-03-28 23:30:37 -0700562 + "UPDATE request for {} in device {}",
Saurav Das0fd79d92016-03-07 10:58:36 -0800563 matchingExtraneousGroup.id(),
564 groupDesc.deviceId());
565 notifyDelegate(new GroupEvent(Type.GROUP_UPDATE_REQUESTED, modifiedGroup));
566 return;
Srikanth Vavilapallie48b3cf2015-07-06 11:43:07 -0700567 }
568 }
569 } else {
570 //Check if there is an extraneous group with user provided buckets
571 matchingExtraneousGroup = getMatchingExtraneousGroupbyBuckets(
alshabibb0285992016-03-28 23:30:37 -0700572 groupDesc.deviceId(), groupDesc.buckets());
Srikanth Vavilapallie48b3cf2015-07-06 11:43:07 -0700573 if (matchingExtraneousGroup != null) {
574 //Group is already existing with the same buckets.
575 //So reuse this group.
576 log.debug("storeGroupDescriptionInternal: Matching extraneous group found in Device {}",
577 groupDesc.deviceId());
578 //Create a group entry object
579 StoredGroupEntry group = new DefaultGroup(
alshabibb0285992016-03-28 23:30:37 -0700580 matchingExtraneousGroup.id(), groupDesc);
Srikanth Vavilapallie48b3cf2015-07-06 11:43:07 -0700581 // Insert the newly created group entry into key and id maps
582 getGroupStoreKeyMap().
alshabibb0285992016-03-28 23:30:37 -0700583 put(new GroupStoreKeyMapKey(groupDesc.deviceId(),
584 groupDesc.appCookie()), group);
Srikanth Vavilapallie48b3cf2015-07-06 11:43:07 -0700585 // Ensure it also inserted into group id based table to
586 // avoid any chances of duplication in group id generation
587 getGroupIdTable(groupDesc.deviceId()).
alshabibb0285992016-03-28 23:30:37 -0700588 put(matchingExtraneousGroup.id(), group);
Srikanth Vavilapallie48b3cf2015-07-06 11:43:07 -0700589 addOrUpdateGroupEntry(matchingExtraneousGroup);
590 removeExtraneousGroupEntry(matchingExtraneousGroup);
591 return;
592 } else {
593 //TODO: Check if there are any empty groups that can be used here
594 log.debug("storeGroupDescriptionInternal: No matching extraneous groups found in Device {}",
595 groupDesc.deviceId());
596 }
597 }
598
Saurav Das100e3b82015-04-30 11:12:10 -0700599 GroupId id = null;
600 if (groupDesc.givenGroupId() == null) {
601 // Get a new group identifier
602 id = new DefaultGroupId(getFreeGroupIdValue(groupDesc.deviceId()));
603 } else {
Saurav Das8be4e3a2016-03-11 17:19:07 -0800604 // we need to use the identifier passed in by caller, but check if
605 // already used
606 Group existing = getGroup(groupDesc.deviceId(),
607 new DefaultGroupId(groupDesc.givenGroupId()));
608 if (existing != null) {
609 log.warn("Group already exists with the same id: 0x{} in dev:{} "
alshabibb0285992016-03-28 23:30:37 -0700610 + "but with different key: {} (request gkey: {})",
611 Integer.toHexString(groupDesc.givenGroupId()),
612 groupDesc.deviceId(),
613 existing.appCookie(),
614 groupDesc.appCookie());
Saurav Das8be4e3a2016-03-11 17:19:07 -0800615 return;
616 }
Saurav Das100e3b82015-04-30 11:12:10 -0700617 id = new DefaultGroupId(groupDesc.givenGroupId());
618 }
alshabib10580802015-02-18 18:30:33 -0800619 // Create a group entry object
620 StoredGroupEntry group = new DefaultGroup(id, groupDesc);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700621 // Insert the newly created group entry into key and id maps
622 getGroupStoreKeyMap().
alshabibb0285992016-03-28 23:30:37 -0700623 put(new GroupStoreKeyMapKey(groupDesc.deviceId(),
624 groupDesc.appCookie()), group);
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700625 // Ensure it also inserted into group id based table to
626 // avoid any chances of duplication in group id generation
627 getGroupIdTable(groupDesc.deviceId()).
alshabibb0285992016-03-28 23:30:37 -0700628 put(id, group);
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700629 log.debug("storeGroupDescriptionInternal: Processing Group ADD request for Id {} in device {}",
alshabibb0285992016-03-28 23:30:37 -0700630 id,
631 groupDesc.deviceId());
alshabib10580802015-02-18 18:30:33 -0800632 notifyDelegate(new GroupEvent(GroupEvent.Type.GROUP_ADD_REQUESTED,
633 group));
634 }
635
636 /**
637 * Updates the existing group entry with the information
638 * from group description.
639 *
alshabibb0285992016-03-28 23:30:37 -0700640 * @param deviceId the device ID
alshabib10580802015-02-18 18:30:33 -0800641 * @param oldAppCookie the current group key
alshabibb0285992016-03-28 23:30:37 -0700642 * @param type update type
643 * @param newBuckets group buckets for updates
alshabib10580802015-02-18 18:30:33 -0800644 * @param newAppCookie optional new group key
645 */
646 @Override
647 public void updateGroupDescription(DeviceId deviceId,
648 GroupKey oldAppCookie,
649 UpdateType type,
650 GroupBuckets newBuckets,
651 GroupKey newAppCookie) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700652 // Check if group update to be done by a remote instance
sangho52abe3a2015-05-05 14:13:34 -0700653 if (mastershipService.getMasterFor(deviceId) != null &&
654 mastershipService.getLocalRole(deviceId) != MastershipRole.MASTER) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700655 log.debug("updateGroupDescription: Device {} local role is not MASTER",
656 deviceId);
657 if (mastershipService.getMasterFor(deviceId) == null) {
658 log.error("No Master for device {}..."
alshabibb0285992016-03-28 23:30:37 -0700659 + "Can not perform update group operation",
660 deviceId);
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700661 //TODO: Send Group operation failure event
662 return;
663 }
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700664 GroupStoreMessage groupOp = GroupStoreMessage.
665 createGroupUpdateRequestMsg(deviceId,
666 oldAppCookie,
667 type,
668 newBuckets,
669 newAppCookie);
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700670
Madan Jampani175e8fd2015-05-20 14:10:45 -0700671 clusterCommunicator.unicast(groupOp,
alshabibb0285992016-03-28 23:30:37 -0700672 GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST,
673 clusterMsgSerializer::serialize,
674 mastershipService.getMasterFor(deviceId)).whenComplete((result, error) -> {
675 if (error != null) {
676 log.warn("Failed to send request to master: {} to {}",
677 groupOp,
678 mastershipService.getMasterFor(deviceId), error);
679 }
680 //TODO: Send Group operation failure event
681 });
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700682 return;
683 }
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700684 log.debug("updateGroupDescription for device {} is getting handled locally",
685 deviceId);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700686 updateGroupDescriptionInternal(deviceId,
687 oldAppCookie,
688 type,
689 newBuckets,
690 newAppCookie);
691 }
692
693 private void updateGroupDescriptionInternal(DeviceId deviceId,
alshabibb0285992016-03-28 23:30:37 -0700694 GroupKey oldAppCookie,
695 UpdateType type,
696 GroupBuckets newBuckets,
697 GroupKey newAppCookie) {
alshabib10580802015-02-18 18:30:33 -0800698 // Check if a group is existing with the provided key
699 Group oldGroup = getGroup(deviceId, oldAppCookie);
700 if (oldGroup == null) {
Saurav Das8be4e3a2016-03-11 17:19:07 -0800701 log.warn("updateGroupDescriptionInternal: Group not found...strange. "
alshabibb0285992016-03-28 23:30:37 -0700702 + "GroupKey:{} DeviceId:{}", oldAppCookie, deviceId);
alshabib10580802015-02-18 18:30:33 -0800703 return;
704 }
705
706 List<GroupBucket> newBucketList = getUpdatedBucketList(oldGroup,
707 type,
708 newBuckets);
709 if (newBucketList != null) {
710 // Create a new group object from the old group
711 GroupBuckets updatedBuckets = new GroupBuckets(newBucketList);
712 GroupKey newCookie = (newAppCookie != null) ? newAppCookie : oldAppCookie;
713 GroupDescription updatedGroupDesc = new DefaultGroupDescription(
714 oldGroup.deviceId(),
715 oldGroup.type(),
716 updatedBuckets,
717 newCookie,
Saurav Das100e3b82015-04-30 11:12:10 -0700718 oldGroup.givenGroupId(),
alshabib10580802015-02-18 18:30:33 -0800719 oldGroup.appId());
720 StoredGroupEntry newGroup = new DefaultGroup(oldGroup.id(),
721 updatedGroupDesc);
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700722 log.debug("updateGroupDescriptionInternal: group entry {} in device {} moving from {} to PENDING_UPDATE",
alshabibb0285992016-03-28 23:30:37 -0700723 oldGroup.id(),
724 oldGroup.deviceId(),
725 oldGroup.state());
alshabib10580802015-02-18 18:30:33 -0800726 newGroup.setState(GroupState.PENDING_UPDATE);
727 newGroup.setLife(oldGroup.life());
728 newGroup.setPackets(oldGroup.packets());
729 newGroup.setBytes(oldGroup.bytes());
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700730 //Update the group entry in groupkey based map.
731 //Update to groupid based map will happen in the
732 //groupkey based map update listener
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700733 log.debug("updateGroupDescriptionInternal with type {}: Group updated with buckets",
734 type);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700735 getGroupStoreKeyMap().
alshabibb0285992016-03-28 23:30:37 -0700736 put(new GroupStoreKeyMapKey(newGroup.deviceId(),
737 newGroup.appCookie()), newGroup);
alshabib10580802015-02-18 18:30:33 -0800738 notifyDelegate(new GroupEvent(Type.GROUP_UPDATE_REQUESTED, newGroup));
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700739 } else {
740 log.warn("updateGroupDescriptionInternal with type {}: No "
alshabibb0285992016-03-28 23:30:37 -0700741 + "change in the buckets in update", type);
alshabib10580802015-02-18 18:30:33 -0800742 }
743 }
744
745 private List<GroupBucket> getUpdatedBucketList(Group oldGroup,
746 UpdateType type,
747 GroupBuckets buckets) {
Victor Silva0282ab82016-11-15 16:30:27 -0300748 if (type == UpdateType.SET) {
749 return buckets.buckets();
750 }
751
Victor Silvadf1eeae2016-08-12 15:28:57 -0300752 List<GroupBucket> oldBuckets = oldGroup.buckets().buckets();
753 List<GroupBucket> updatedBucketList = new ArrayList<>();
alshabib10580802015-02-18 18:30:33 -0800754 boolean groupDescUpdated = false;
755
756 if (type == UpdateType.ADD) {
Victor Silvadf1eeae2016-08-12 15:28:57 -0300757 List<GroupBucket> newBuckets = buckets.buckets();
758
759 // Add old buckets that will not be updated and check if any will be updated.
760 for (GroupBucket oldBucket : oldBuckets) {
761 int newBucketIndex = newBuckets.indexOf(oldBucket);
762
763 if (newBucketIndex != -1) {
764 GroupBucket newBucket = newBuckets.get(newBucketIndex);
765 if (!newBucket.hasSameParameters(oldBucket)) {
766 // Bucket will be updated
767 groupDescUpdated = true;
768 }
769 } else {
770 // Old bucket will remain the same - add it.
771 updatedBucketList.add(oldBucket);
alshabib10580802015-02-18 18:30:33 -0800772 }
773 }
Victor Silvadf1eeae2016-08-12 15:28:57 -0300774
775 // Add all new buckets
776 updatedBucketList.addAll(newBuckets);
777 if (!oldBuckets.containsAll(newBuckets)) {
778 groupDescUpdated = true;
779 }
780
alshabib10580802015-02-18 18:30:33 -0800781 } else if (type == UpdateType.REMOVE) {
Victor Silvadf1eeae2016-08-12 15:28:57 -0300782 List<GroupBucket> bucketsToRemove = buckets.buckets();
783
784 // Check which old buckets should remain
785 for (GroupBucket oldBucket : oldBuckets) {
786 if (!bucketsToRemove.contains(oldBucket)) {
787 updatedBucketList.add(oldBucket);
788 } else {
alshabib10580802015-02-18 18:30:33 -0800789 groupDescUpdated = true;
790 }
791 }
792 }
793
794 if (groupDescUpdated) {
Victor Silvadf1eeae2016-08-12 15:28:57 -0300795 return updatedBucketList;
alshabib10580802015-02-18 18:30:33 -0800796 } else {
797 return null;
798 }
799 }
800
801 /**
802 * Triggers deleting the existing group entry.
803 *
alshabibb0285992016-03-28 23:30:37 -0700804 * @param deviceId the device ID
alshabib10580802015-02-18 18:30:33 -0800805 * @param appCookie the group key
806 */
807 @Override
808 public void deleteGroupDescription(DeviceId deviceId,
809 GroupKey appCookie) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700810 // Check if group to be deleted by a remote instance
811 if (mastershipService.
812 getLocalRole(deviceId) != MastershipRole.MASTER) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700813 log.debug("deleteGroupDescription: Device {} local role is not MASTER",
814 deviceId);
815 if (mastershipService.getMasterFor(deviceId) == null) {
816 log.error("No Master for device {}..."
alshabibb0285992016-03-28 23:30:37 -0700817 + "Can not perform delete group operation",
818 deviceId);
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700819 //TODO: Send Group operation failure event
820 return;
821 }
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700822 GroupStoreMessage groupOp = GroupStoreMessage.
823 createGroupDeleteRequestMsg(deviceId,
824 appCookie);
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700825
Madan Jampani175e8fd2015-05-20 14:10:45 -0700826 clusterCommunicator.unicast(groupOp,
alshabibb0285992016-03-28 23:30:37 -0700827 GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST,
828 clusterMsgSerializer::serialize,
829 mastershipService.getMasterFor(deviceId)).whenComplete((result, error) -> {
830 if (error != null) {
831 log.warn("Failed to send request to master: {} to {}",
832 groupOp,
833 mastershipService.getMasterFor(deviceId), error);
834 }
835 //TODO: Send Group operation failure event
836 });
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700837 return;
838 }
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700839 log.debug("deleteGroupDescription in device {} is getting handled locally",
840 deviceId);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700841 deleteGroupDescriptionInternal(deviceId, appCookie);
842 }
843
844 private void deleteGroupDescriptionInternal(DeviceId deviceId,
845 GroupKey appCookie) {
alshabib10580802015-02-18 18:30:33 -0800846 // Check if a group is existing with the provided key
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700847 StoredGroupEntry existing = getStoredGroupEntry(deviceId, appCookie);
alshabib10580802015-02-18 18:30:33 -0800848 if (existing == null) {
849 return;
850 }
851
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700852 log.debug("deleteGroupDescriptionInternal: group entry {} in device {} moving from {} to PENDING_DELETE",
alshabibb0285992016-03-28 23:30:37 -0700853 existing.id(),
854 existing.deviceId(),
855 existing.state());
alshabib10580802015-02-18 18:30:33 -0800856 synchronized (existing) {
857 existing.setState(GroupState.PENDING_DELETE);
Saurav Das80980c72016-03-23 11:22:49 -0700858 getGroupStoreKeyMap().
alshabibb0285992016-03-28 23:30:37 -0700859 put(new GroupStoreKeyMapKey(existing.deviceId(), existing.appCookie()),
860 existing);
alshabib10580802015-02-18 18:30:33 -0800861 }
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700862 log.debug("deleteGroupDescriptionInternal: in device {} issuing GROUP_REMOVE_REQUESTED",
863 deviceId);
alshabib10580802015-02-18 18:30:33 -0800864 notifyDelegate(new GroupEvent(Type.GROUP_REMOVE_REQUESTED, existing));
865 }
866
867 /**
868 * Stores a new group entry, or updates an existing entry.
869 *
870 * @param group group entry
871 */
872 @Override
873 public void addOrUpdateGroupEntry(Group group) {
874 // check if this new entry is an update to an existing entry
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700875 StoredGroupEntry existing = getStoredGroupEntry(group.deviceId(),
876 group.id());
alshabib10580802015-02-18 18:30:33 -0800877 GroupEvent event = null;
878
879 if (existing != null) {
Saurav Das0fd79d92016-03-07 10:58:36 -0800880 log.trace("addOrUpdateGroupEntry: updating group entry {} in device {}",
alshabibb0285992016-03-28 23:30:37 -0700881 group.id(),
882 group.deviceId());
alshabib10580802015-02-18 18:30:33 -0800883 synchronized (existing) {
alshabibb0285992016-03-28 23:30:37 -0700884 for (GroupBucket bucket : group.buckets().buckets()) {
Sho SHIMIZU30d639b2015-05-05 09:30:35 -0700885 Optional<GroupBucket> matchingBucket =
Srikanth Vavilapalli10e75cd2015-04-13 16:21:24 -0700886 existing.buckets().buckets()
alshabibb0285992016-03-28 23:30:37 -0700887 .stream()
888 .filter((existingBucket) -> (existingBucket.equals(bucket)))
889 .findFirst();
Srikanth Vavilapalli10e75cd2015-04-13 16:21:24 -0700890 if (matchingBucket.isPresent()) {
891 ((StoredGroupBucketEntry) matchingBucket.
892 get()).setPackets(bucket.packets());
893 ((StoredGroupBucketEntry) matchingBucket.
894 get()).setBytes(bucket.bytes());
895 } else {
896 log.warn("addOrUpdateGroupEntry: No matching "
alshabibb0285992016-03-28 23:30:37 -0700897 + "buckets to update stats");
Srikanth Vavilapalli10e75cd2015-04-13 16:21:24 -0700898 }
899 }
alshabib10580802015-02-18 18:30:33 -0800900 existing.setLife(group.life());
901 existing.setPackets(group.packets());
902 existing.setBytes(group.bytes());
alshabibb0285992016-03-28 23:30:37 -0700903 existing.setReferenceCount(group.referenceCount());
Srikanth Vavilapalli5428b6c2015-05-14 20:22:47 -0700904 if ((existing.state() == GroupState.PENDING_ADD) ||
alshabibb0285992016-03-28 23:30:37 -0700905 (existing.state() == GroupState.PENDING_ADD_RETRY)) {
Saurav Das0fd79d92016-03-07 10:58:36 -0800906 log.trace("addOrUpdateGroupEntry: group entry {} in device {} moving from {} to ADDED",
alshabibb0285992016-03-28 23:30:37 -0700907 existing.id(),
908 existing.deviceId(),
909 existing.state());
alshabib10580802015-02-18 18:30:33 -0800910 existing.setState(GroupState.ADDED);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700911 existing.setIsGroupStateAddedFirstTime(true);
alshabib10580802015-02-18 18:30:33 -0800912 event = new GroupEvent(Type.GROUP_ADDED, existing);
913 } else {
Saurav Das0fd79d92016-03-07 10:58:36 -0800914 log.trace("addOrUpdateGroupEntry: group entry {} in device {} moving from {} to ADDED",
alshabibb0285992016-03-28 23:30:37 -0700915 existing.id(),
916 existing.deviceId(),
917 GroupState.PENDING_UPDATE);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700918 existing.setState(GroupState.ADDED);
919 existing.setIsGroupStateAddedFirstTime(false);
alshabib10580802015-02-18 18:30:33 -0800920 event = new GroupEvent(Type.GROUP_UPDATED, existing);
921 }
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700922 //Re-PUT map entries to trigger map update events
923 getGroupStoreKeyMap().
alshabibb0285992016-03-28 23:30:37 -0700924 put(new GroupStoreKeyMapKey(existing.deviceId(),
925 existing.appCookie()), existing);
alshabib10580802015-02-18 18:30:33 -0800926 }
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700927 } else {
928 log.warn("addOrUpdateGroupEntry: Group update "
alshabibb0285992016-03-28 23:30:37 -0700929 + "happening for a non-existing entry in the map");
alshabib10580802015-02-18 18:30:33 -0800930 }
931
932 if (event != null) {
933 notifyDelegate(event);
934 }
935 }
936
937 /**
938 * Removes the group entry from store.
939 *
940 * @param group group entry
941 */
942 @Override
943 public void removeGroupEntry(Group group) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700944 StoredGroupEntry existing = getStoredGroupEntry(group.deviceId(),
945 group.id());
alshabib10580802015-02-18 18:30:33 -0800946
947 if (existing != null) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700948 log.debug("removeGroupEntry: removing group entry {} in device {}",
alshabibb0285992016-03-28 23:30:37 -0700949 group.id(),
950 group.deviceId());
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700951 //Removal from groupid based map will happen in the
952 //map update listener
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700953 getGroupStoreKeyMap().remove(new GroupStoreKeyMapKey(existing.deviceId(),
954 existing.appCookie()));
alshabib10580802015-02-18 18:30:33 -0800955 notifyDelegate(new GroupEvent(Type.GROUP_REMOVED, existing));
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700956 } else {
957 log.warn("removeGroupEntry for {} in device{} is "
alshabibb0285992016-03-28 23:30:37 -0700958 + "not existing in our maps",
959 group.id(),
960 group.deviceId());
alshabib10580802015-02-18 18:30:33 -0800961 }
962 }
963
Victor Silva4e8b7832016-08-17 17:11:19 -0300964 private void purgeGroupEntries(Set<Entry<GroupStoreKeyMapKey, StoredGroupEntry>> entries) {
965 entries.forEach(entry -> {
966 groupStoreEntriesByKey.remove(entry.getKey());
967 });
968 }
969
alshabib10580802015-02-18 18:30:33 -0800970 @Override
Charles Chan0c7c43b2016-01-14 17:39:20 -0800971 public void purgeGroupEntry(DeviceId deviceId) {
Victor Silva4e8b7832016-08-17 17:11:19 -0300972 Set<Entry<GroupStoreKeyMapKey, StoredGroupEntry>> entriesPendingRemove =
Charles Chan0c7c43b2016-01-14 17:39:20 -0800973 new HashSet<>();
974
Madan Jampani0b847532016-03-03 13:44:15 -0800975 getGroupStoreKeyMap().entrySet().stream()
Charles Chan0c7c43b2016-01-14 17:39:20 -0800976 .filter(entry -> entry.getKey().deviceId().equals(deviceId))
Victor Silva4e8b7832016-08-17 17:11:19 -0300977 .forEach(entriesPendingRemove::add);
Charles Chan0c7c43b2016-01-14 17:39:20 -0800978
Victor Silva4e8b7832016-08-17 17:11:19 -0300979 purgeGroupEntries(entriesPendingRemove);
980 }
981
982 @Override
983 public void purgeGroupEntries() {
984 purgeGroupEntries(getGroupStoreKeyMap().entrySet());
Charles Chan0c7c43b2016-01-14 17:39:20 -0800985 }
986
987 @Override
alshabib10580802015-02-18 18:30:33 -0800988 public void deviceInitialAuditCompleted(DeviceId deviceId,
989 boolean completed) {
990 synchronized (deviceAuditStatus) {
991 if (completed) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700992 log.debug("AUDIT completed for device {}",
993 deviceId);
alshabib10580802015-02-18 18:30:33 -0800994 deviceAuditStatus.put(deviceId, true);
995 // Execute all pending group requests
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700996 List<StoredGroupEntry> pendingGroupRequests =
997 getPendingGroupKeyTable().values()
alshabibb0285992016-03-28 23:30:37 -0700998 .stream()
999 .filter(g -> g.deviceId().equals(deviceId))
1000 .collect(Collectors.toList());
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001001 log.debug("processing pending group add requests for device {} and number of pending requests {}",
alshabibb0285992016-03-28 23:30:37 -07001002 deviceId,
1003 pendingGroupRequests.size());
1004 for (Group group : pendingGroupRequests) {
alshabib10580802015-02-18 18:30:33 -08001005 GroupDescription tmp = new DefaultGroupDescription(
1006 group.deviceId(),
1007 group.type(),
1008 group.buckets(),
1009 group.appCookie(),
Saurav Das100e3b82015-04-30 11:12:10 -07001010 group.givenGroupId(),
alshabib10580802015-02-18 18:30:33 -08001011 group.appId());
1012 storeGroupDescriptionInternal(tmp);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001013 getPendingGroupKeyTable().
alshabibb0285992016-03-28 23:30:37 -07001014 remove(new GroupStoreKeyMapKey(deviceId, group.appCookie()));
alshabib10580802015-02-18 18:30:33 -08001015 }
alshabib10580802015-02-18 18:30:33 -08001016 } else {
Thomas Vachuskac40d4632015-04-09 16:55:03 -07001017 Boolean audited = deviceAuditStatus.get(deviceId);
1018 if (audited != null && audited) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001019 log.debug("Clearing AUDIT status for device {}", deviceId);
alshabib10580802015-02-18 18:30:33 -08001020 deviceAuditStatus.put(deviceId, false);
1021 }
1022 }
1023 }
1024 }
1025
1026 @Override
1027 public boolean deviceInitialAuditStatus(DeviceId deviceId) {
1028 synchronized (deviceAuditStatus) {
Thomas Vachuskac40d4632015-04-09 16:55:03 -07001029 Boolean audited = deviceAuditStatus.get(deviceId);
1030 return audited != null && audited;
alshabib10580802015-02-18 18:30:33 -08001031 }
1032 }
1033
1034 @Override
1035 public void groupOperationFailed(DeviceId deviceId, GroupOperation operation) {
1036
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001037 StoredGroupEntry existing = getStoredGroupEntry(deviceId,
1038 operation.groupId());
alshabib10580802015-02-18 18:30:33 -08001039
1040 if (existing == null) {
1041 log.warn("No group entry with ID {} found ", operation.groupId());
1042 return;
1043 }
1044
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001045 log.warn("groupOperationFailed: group operation {} failed"
alshabibb0285992016-03-28 23:30:37 -07001046 + "for group {} in device {} with code {}",
1047 operation.opType(),
1048 existing.id(),
1049 existing.deviceId(),
1050 operation.failureCode());
Saurav Das0fd79d92016-03-07 10:58:36 -08001051 if (operation.failureCode() == GroupOperation.GroupMsgErrorCode.GROUP_EXISTS) {
1052 log.warn("Current extraneous groups in device:{} are: {}",
1053 deviceId,
1054 getExtraneousGroups(deviceId));
Saurav Das8be4e3a2016-03-11 17:19:07 -08001055 if (operation.buckets().equals(existing.buckets())) {
1056 if (existing.state() == GroupState.PENDING_ADD) {
1057 log.info("GROUP_EXISTS: GroupID and Buckets match for group in pending "
alshabibb0285992016-03-28 23:30:37 -07001058 + "add state - moving to ADDED for group {} in device {}",
1059 existing.id(), deviceId);
Saurav Das8be4e3a2016-03-11 17:19:07 -08001060 addOrUpdateGroupEntry(existing);
1061 return;
1062 } else {
1063 log.warn("GROUP EXISTS: Group ID matched but buckets did not. "
alshabibb0285992016-03-28 23:30:37 -07001064 + "Operation: {} Existing: {}", operation.buckets(),
1065 existing.buckets());
Saurav Das8be4e3a2016-03-11 17:19:07 -08001066 }
1067 }
Saurav Das0fd79d92016-03-07 10:58:36 -08001068 }
alshabib10580802015-02-18 18:30:33 -08001069 switch (operation.opType()) {
1070 case ADD:
Srikanth Vavilapalli5428b6c2015-05-14 20:22:47 -07001071 if (existing.state() == GroupState.PENDING_ADD) {
Srikanth Vavilapalli5428b6c2015-05-14 20:22:47 -07001072 notifyDelegate(new GroupEvent(Type.GROUP_ADD_FAILED, existing));
1073 log.warn("groupOperationFailed: cleaningup "
alshabibb0285992016-03-28 23:30:37 -07001074 + "group {} from store in device {}....",
1075 existing.id(),
1076 existing.deviceId());
Srikanth Vavilapalli5428b6c2015-05-14 20:22:47 -07001077 //Removal from groupid based map will happen in the
1078 //map update listener
1079 getGroupStoreKeyMap().remove(new GroupStoreKeyMapKey(existing.deviceId(),
1080 existing.appCookie()));
1081 }
alshabib10580802015-02-18 18:30:33 -08001082 break;
1083 case MODIFY:
1084 notifyDelegate(new GroupEvent(Type.GROUP_UPDATE_FAILED, existing));
1085 break;
1086 case DELETE:
1087 notifyDelegate(new GroupEvent(Type.GROUP_REMOVE_FAILED, existing));
1088 break;
1089 default:
1090 log.warn("Unknown group operation type {}", operation.opType());
1091 }
alshabib10580802015-02-18 18:30:33 -08001092 }
1093
1094 @Override
1095 public void addOrUpdateExtraneousGroupEntry(Group group) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001096 log.debug("add/update extraneous group entry {} in device {}",
alshabibb0285992016-03-28 23:30:37 -07001097 group.id(),
1098 group.deviceId());
alshabib10580802015-02-18 18:30:33 -08001099 ConcurrentMap<GroupId, Group> extraneousIdTable =
1100 getExtraneousGroupIdTable(group.deviceId());
1101 extraneousIdTable.put(group.id(), group);
Srikanth Vavilapallie48b3cf2015-07-06 11:43:07 -07001102 // Don't remove the extraneous groups, instead re-use it when
1103 // a group request comes with the same set of buckets
alshabib10580802015-02-18 18:30:33 -08001104 }
1105
1106 @Override
1107 public void removeExtraneousGroupEntry(Group group) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001108 log.debug("remove extraneous group entry {} of device {} from store",
alshabibb0285992016-03-28 23:30:37 -07001109 group.id(),
1110 group.deviceId());
alshabib10580802015-02-18 18:30:33 -08001111 ConcurrentMap<GroupId, Group> extraneousIdTable =
1112 getExtraneousGroupIdTable(group.deviceId());
1113 extraneousIdTable.remove(group.id());
1114 }
1115
1116 @Override
1117 public Iterable<Group> getExtraneousGroups(DeviceId deviceId) {
1118 // flatten and make iterator unmodifiable
1119 return FluentIterable.from(
1120 getExtraneousGroupIdTable(deviceId).values());
1121 }
1122
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001123 /**
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -07001124 * Map handler to receive any events when the group key map is updated.
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001125 */
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -07001126 private class GroupStoreKeyMapListener implements
Madan Jampani0b847532016-03-03 13:44:15 -08001127 MapEventListener<GroupStoreKeyMapKey, StoredGroupEntry> {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001128
1129 @Override
Madan Jampani0b847532016-03-03 13:44:15 -08001130 public void event(MapEvent<GroupStoreKeyMapKey, StoredGroupEntry> mapEvent) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001131 GroupEvent groupEvent = null;
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001132 GroupStoreKeyMapKey key = mapEvent.key();
Madan Jampani0b847532016-03-03 13:44:15 -08001133 StoredGroupEntry group = Versioned.valueOrNull(mapEvent.newValue());
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001134 if ((key == null) && (group == null)) {
1135 log.error("GroupStoreKeyMapListener: Received "
alshabibb0285992016-03-28 23:30:37 -07001136 + "event {} with null entry", mapEvent.type());
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001137 return;
1138 } else if (group == null) {
1139 group = getGroupIdTable(key.deviceId()).values()
1140 .stream()
1141 .filter((storedGroup) -> (storedGroup.appCookie().equals(key.appCookie)))
Yuta HIGUCHI6e5f4702016-11-21 11:42:11 -08001142 .findFirst().orElse(null);
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001143 if (group == null) {
1144 log.error("GroupStoreKeyMapListener: Received "
alshabibb0285992016-03-28 23:30:37 -07001145 + "event {} with null entry... can not process", mapEvent.type());
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001146 return;
1147 }
1148 }
1149 log.trace("received groupid map event {} for id {} in device {}",
1150 mapEvent.type(),
1151 group.id(),
jaegonkim68e080c2016-12-01 22:31:01 +09001152 (key != null ? key.deviceId() : null));
Madan Jampani0b847532016-03-03 13:44:15 -08001153 if (mapEvent.type() == MapEvent.Type.INSERT || mapEvent.type() == MapEvent.Type.UPDATE) {
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -07001154 // Update the group ID table
1155 getGroupIdTable(group.deviceId()).put(group.id(), group);
Madan Jampani0b847532016-03-03 13:44:15 -08001156 StoredGroupEntry value = Versioned.valueOrNull(mapEvent.newValue());
1157 if (value.state() == Group.GroupState.ADDED) {
1158 if (value.isGroupStateAddedFirstTime()) {
1159 groupEvent = new GroupEvent(Type.GROUP_ADDED, value);
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001160 log.trace("Received first time GROUP_ADDED state update for id {} in device {}",
alshabibb0285992016-03-28 23:30:37 -07001161 group.id(),
1162 group.deviceId());
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001163 } else {
Madan Jampani0b847532016-03-03 13:44:15 -08001164 groupEvent = new GroupEvent(Type.GROUP_UPDATED, value);
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001165 log.trace("Received following GROUP_ADDED state update for id {} in device {}",
alshabibb0285992016-03-28 23:30:37 -07001166 group.id(),
1167 group.deviceId());
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001168 }
1169 }
Madan Jampani0b847532016-03-03 13:44:15 -08001170 } else if (mapEvent.type() == MapEvent.Type.REMOVE) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001171 groupEvent = new GroupEvent(Type.GROUP_REMOVED, group);
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -07001172 // Remove the entry from the group ID table
1173 getGroupIdTable(group.deviceId()).remove(group.id(), group);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001174 }
1175
1176 if (groupEvent != null) {
1177 notifyDelegate(groupEvent);
1178 }
1179 }
1180 }
Madan Jampani01e05fb2015-08-13 13:29:36 -07001181
helenyrwua1c41152016-08-18 16:16:14 -07001182 private void processGroupMessage(GroupStoreMessage message) {
1183 if (message.type() == GroupStoreMessage.Type.FAILOVER) {
1184 // FIXME: groupStoreEntriesByKey inaccessible here
1185 getGroupIdTable(message.deviceId()).values()
1186 .stream()
1187 .filter((storedGroup) -> (storedGroup.appCookie().equals(message.appCookie())))
1188 .findFirst().ifPresent(group -> notifyDelegate(new GroupEvent(Type.GROUP_BUCKET_FAILOVER, group)));
1189 }
1190 }
1191
Madan Jampani01e05fb2015-08-13 13:29:36 -07001192 private void process(GroupStoreMessage groupOp) {
1193 log.debug("Received remote group operation {} request for device {}",
alshabibb0285992016-03-28 23:30:37 -07001194 groupOp.type(),
1195 groupOp.deviceId());
1196 if (!mastershipService.isLocalMaster(groupOp.deviceId())) {
1197 log.warn("This node is not MASTER for device {}", groupOp.deviceId());
1198 return;
1199 }
1200 if (groupOp.type() == GroupStoreMessage.Type.ADD) {
1201 storeGroupDescriptionInternal(groupOp.groupDesc());
1202 } else if (groupOp.type() == GroupStoreMessage.Type.UPDATE) {
1203 updateGroupDescriptionInternal(groupOp.deviceId(),
1204 groupOp.appCookie(),
1205 groupOp.updateType(),
1206 groupOp.updateBuckets(),
1207 groupOp.newAppCookie());
1208 } else if (groupOp.type() == GroupStoreMessage.Type.DELETE) {
1209 deleteGroupDescriptionInternal(groupOp.deviceId(),
1210 groupOp.appCookie());
1211 }
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001212 }
1213
1214 /**
1215 * Flattened map key to be used to store group entries.
1216 */
Ray Milkeyb3c5ce22015-08-10 09:07:36 -07001217 protected static class GroupStoreMapKey {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001218 private final DeviceId deviceId;
1219
1220 public GroupStoreMapKey(DeviceId deviceId) {
1221 this.deviceId = deviceId;
1222 }
1223
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001224 public DeviceId deviceId() {
1225 return deviceId;
1226 }
1227
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001228 @Override
1229 public boolean equals(Object o) {
1230 if (this == o) {
1231 return true;
1232 }
1233 if (!(o instanceof GroupStoreMapKey)) {
1234 return false;
1235 }
1236 GroupStoreMapKey that = (GroupStoreMapKey) o;
1237 return this.deviceId.equals(that.deviceId);
1238 }
1239
1240 @Override
1241 public int hashCode() {
1242 int result = 17;
1243
1244 result = 31 * result + Objects.hash(this.deviceId);
1245
1246 return result;
1247 }
1248 }
1249
Ray Milkeyb3c5ce22015-08-10 09:07:36 -07001250 protected static class GroupStoreKeyMapKey extends GroupStoreMapKey {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001251 private final GroupKey appCookie;
alshabibb0285992016-03-28 23:30:37 -07001252
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001253 public GroupStoreKeyMapKey(DeviceId deviceId,
1254 GroupKey appCookie) {
1255 super(deviceId);
1256 this.appCookie = appCookie;
1257 }
1258
1259 @Override
1260 public boolean equals(Object o) {
1261 if (this == o) {
1262 return true;
1263 }
1264 if (!(o instanceof GroupStoreKeyMapKey)) {
1265 return false;
1266 }
1267 GroupStoreKeyMapKey that = (GroupStoreKeyMapKey) o;
1268 return (super.equals(that) &&
1269 this.appCookie.equals(that.appCookie));
1270 }
1271
1272 @Override
1273 public int hashCode() {
1274 int result = 17;
1275
1276 result = 31 * result + super.hashCode() + Objects.hash(this.appCookie);
1277
1278 return result;
1279 }
1280 }
1281
Ray Milkeyb3c5ce22015-08-10 09:07:36 -07001282 protected static class GroupStoreIdMapKey extends GroupStoreMapKey {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001283 private final GroupId groupId;
alshabibb0285992016-03-28 23:30:37 -07001284
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001285 public GroupStoreIdMapKey(DeviceId deviceId,
1286 GroupId groupId) {
1287 super(deviceId);
1288 this.groupId = groupId;
1289 }
1290
1291 @Override
1292 public boolean equals(Object o) {
1293 if (this == o) {
1294 return true;
1295 }
1296 if (!(o instanceof GroupStoreIdMapKey)) {
1297 return false;
1298 }
1299 GroupStoreIdMapKey that = (GroupStoreIdMapKey) o;
1300 return (super.equals(that) &&
1301 this.groupId.equals(that.groupId));
1302 }
1303
1304 @Override
1305 public int hashCode() {
1306 int result = 17;
1307
1308 result = 31 * result + super.hashCode() + Objects.hash(this.groupId);
1309
1310 return result;
1311 }
1312 }
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001313
1314 @Override
1315 public void pushGroupMetrics(DeviceId deviceId,
1316 Collection<Group> groupEntries) {
1317 boolean deviceInitialAuditStatus =
1318 deviceInitialAuditStatus(deviceId);
1319 Set<Group> southboundGroupEntries =
1320 Sets.newHashSet(groupEntries);
1321 Set<StoredGroupEntry> storedGroupEntries =
1322 Sets.newHashSet(getStoredGroups(deviceId));
1323 Set<Group> extraneousStoredEntries =
1324 Sets.newHashSet(getExtraneousGroups(deviceId));
1325
Sho SHIMIZU695bac62016-08-15 12:41:59 -07001326 if (log.isTraceEnabled()) {
1327 log.trace("pushGroupMetrics: Displaying all ({}) southboundGroupEntries for device {}",
1328 southboundGroupEntries.size(),
1329 deviceId);
1330 for (Group group : southboundGroupEntries) {
1331 log.trace("Group {} in device {}", group, deviceId);
1332 }
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001333
Sho SHIMIZU695bac62016-08-15 12:41:59 -07001334 log.trace("Displaying all ({}) stored group entries for device {}",
1335 storedGroupEntries.size(),
1336 deviceId);
1337 for (StoredGroupEntry group : storedGroupEntries) {
1338 log.trace("Stored Group {} for device {}", group, deviceId);
1339 }
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001340 }
1341
alshabibb0285992016-03-28 23:30:37 -07001342 garbageCollect(deviceId, southboundGroupEntries, storedGroupEntries);
1343
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001344 for (Iterator<Group> it2 = southboundGroupEntries.iterator(); it2.hasNext();) {
1345 Group group = it2.next();
1346 if (storedGroupEntries.remove(group)) {
1347 // we both have the group, let's update some info then.
1348 log.trace("Group AUDIT: group {} exists in both planes for device {}",
alshabibb0285992016-03-28 23:30:37 -07001349 group.id(), deviceId);
1350
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001351 groupAdded(group);
1352 it2.remove();
1353 }
1354 }
1355 for (Group group : southboundGroupEntries) {
1356 if (getGroup(group.deviceId(), group.id()) != null) {
1357 // There is a group existing with the same id
1358 // It is possible that group update is
1359 // in progress while we got a stale info from switch
1360 if (!storedGroupEntries.remove(getGroup(
alshabibb0285992016-03-28 23:30:37 -07001361 group.deviceId(), group.id()))) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001362 log.warn("Group AUDIT: Inconsistent state:"
alshabibb0285992016-03-28 23:30:37 -07001363 + "Group exists in ID based table while "
1364 + "not present in key based table");
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001365 }
1366 } else {
1367 // there are groups in the switch that aren't in the store
1368 log.debug("Group AUDIT: extraneous group {} exists in data plane for device {}",
alshabibb0285992016-03-28 23:30:37 -07001369 group.id(), deviceId);
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001370 extraneousStoredEntries.remove(group);
1371 extraneousGroup(group);
1372 }
1373 }
1374 for (Group group : storedGroupEntries) {
1375 // there are groups in the store that aren't in the switch
1376 log.debug("Group AUDIT: group {} missing in data plane for device {}",
alshabibb0285992016-03-28 23:30:37 -07001377 group.id(), deviceId);
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001378 groupMissing(group);
1379 }
1380 for (Group group : extraneousStoredEntries) {
1381 // there are groups in the extraneous store that
1382 // aren't in the switch
Saurav Das0fd79d92016-03-07 10:58:36 -08001383 log.debug("Group AUDIT: clearing extraneous group {} from store for device {}",
alshabibb0285992016-03-28 23:30:37 -07001384 group.id(), deviceId);
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001385 removeExtraneousGroupEntry(group);
1386 }
1387
1388 if (!deviceInitialAuditStatus) {
Saurav Das0fd79d92016-03-07 10:58:36 -08001389 log.info("Group AUDIT: Setting device {} initial AUDIT completed",
alshabibb0285992016-03-28 23:30:37 -07001390 deviceId);
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001391 deviceInitialAuditCompleted(deviceId, true);
1392 }
1393 }
1394
helenyrwu89470f12016-08-12 13:18:10 -07001395 @Override
1396 public void notifyOfFailovers(Collection<Group> failoverGroups) {
helenyrwu89470f12016-08-12 13:18:10 -07001397 failoverGroups.forEach(group -> {
1398 if (group.type() == Group.Type.FAILOVER) {
helenyrwua1c41152016-08-18 16:16:14 -07001399 groupTopic.publish(GroupStoreMessage.createGroupFailoverMsg(
1400 group.deviceId(), group));
helenyrwu89470f12016-08-12 13:18:10 -07001401 }
1402 });
helenyrwu89470f12016-08-12 13:18:10 -07001403 }
1404
alshabibb0285992016-03-28 23:30:37 -07001405 private void garbageCollect(DeviceId deviceId,
1406 Set<Group> southboundGroupEntries,
1407 Set<StoredGroupEntry> storedGroupEntries) {
1408 if (!garbageCollect) {
1409 return;
1410 }
1411
1412 Iterator<StoredGroupEntry> it = storedGroupEntries.iterator();
1413 while (it.hasNext()) {
1414 StoredGroupEntry group = it.next();
1415 if (group.state() != GroupState.PENDING_DELETE && checkGroupRefCount(group)) {
1416 log.debug("Garbage collecting group {} on {}", group, deviceId);
1417 deleteGroupDescription(deviceId, group.appCookie());
1418 southboundGroupEntries.remove(group);
1419 it.remove();
1420 }
1421 }
1422 }
1423
1424 private boolean checkGroupRefCount(Group group) {
1425 return (group.referenceCount() == 0 && group.age() >= gcThresh);
1426 }
1427
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001428 private void groupMissing(Group group) {
1429 switch (group.state()) {
1430 case PENDING_DELETE:
1431 log.debug("Group {} delete confirmation from device {}",
1432 group, group.deviceId());
1433 removeGroupEntry(group);
1434 break;
1435 case ADDED:
1436 case PENDING_ADD:
Srikanth Vavilapalli5428b6c2015-05-14 20:22:47 -07001437 case PENDING_ADD_RETRY:
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001438 case PENDING_UPDATE:
1439 log.debug("Group {} is in store but not on device {}",
1440 group, group.deviceId());
1441 StoredGroupEntry existing =
1442 getStoredGroupEntry(group.deviceId(), group.id());
Srikanth Vavilapalli5428b6c2015-05-14 20:22:47 -07001443 log.debug("groupMissing: group entry {} in device {} moving from {} to PENDING_ADD_RETRY",
alshabibb0285992016-03-28 23:30:37 -07001444 existing.id(),
1445 existing.deviceId(),
1446 existing.state());
Srikanth Vavilapalli5428b6c2015-05-14 20:22:47 -07001447 existing.setState(Group.GroupState.PENDING_ADD_RETRY);
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001448 //Re-PUT map entries to trigger map update events
1449 getGroupStoreKeyMap().
alshabibb0285992016-03-28 23:30:37 -07001450 put(new GroupStoreKeyMapKey(existing.deviceId(),
1451 existing.appCookie()), existing);
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001452 notifyDelegate(new GroupEvent(GroupEvent.Type.GROUP_ADD_REQUESTED,
1453 group));
1454 break;
1455 default:
1456 log.debug("Group {} has not been installed.", group);
1457 break;
1458 }
1459 }
1460
1461 private void extraneousGroup(Group group) {
Saurav Das0fd79d92016-03-07 10:58:36 -08001462 log.trace("Group {} is on device {} but not in store.",
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001463 group, group.deviceId());
1464 addOrUpdateExtraneousGroupEntry(group);
1465 }
1466
1467 private void groupAdded(Group group) {
1468 log.trace("Group {} Added or Updated in device {}",
1469 group, group.deviceId());
1470 addOrUpdateGroupEntry(group);
1471 }
alshabib10580802015-02-18 18:30:33 -08001472}