blob: ebeb0e2e29456561185a74ac69d6a0f558e5964f [file] [log] [blame]
alshabib10580802015-02-18 18:30:33 -08001/*
Brian O'Connor5ab426f2016-04-09 01:19:45 -07002 * Copyright 2015-present Open Networking Laboratory
alshabib10580802015-02-18 18:30:33 -08003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.group.impl;
17
Jonathan Hart6ec029a2015-03-24 17:12:35 -070018import com.google.common.collect.FluentIterable;
Charles Chanf4838a72015-12-07 18:13:45 -080019import com.google.common.collect.ImmutableSet;
Jonathan Hart6ec029a2015-03-24 17:12:35 -070020import com.google.common.collect.Iterables;
Srikanth Vavilapalli23181912015-05-04 09:48:09 -070021import com.google.common.collect.Sets;
alshabib10580802015-02-18 18:30:33 -080022import org.apache.felix.scr.annotations.Activate;
23import org.apache.felix.scr.annotations.Component;
24import org.apache.felix.scr.annotations.Deactivate;
alshabibb0285992016-03-28 23:30:37 -070025import org.apache.felix.scr.annotations.Modified;
26import org.apache.felix.scr.annotations.Property;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070027import org.apache.felix.scr.annotations.Reference;
28import org.apache.felix.scr.annotations.ReferenceCardinality;
alshabib10580802015-02-18 18:30:33 -080029import org.apache.felix.scr.annotations.Service;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070030import org.onlab.util.KryoNamespace;
alshabibb0285992016-03-28 23:30:37 -070031import org.onosproject.cfg.ComponentConfigService;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070032import org.onosproject.cluster.ClusterService;
Charles Chanf4838a72015-12-07 18:13:45 -080033import org.onosproject.cluster.NodeId;
alshabib10580802015-02-18 18:30:33 -080034import org.onosproject.core.DefaultGroupId;
35import org.onosproject.core.GroupId;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070036import org.onosproject.mastership.MastershipService;
alshabib10580802015-02-18 18:30:33 -080037import org.onosproject.net.DeviceId;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070038import org.onosproject.net.MastershipRole;
alshabib10580802015-02-18 18:30:33 -080039import org.onosproject.net.group.DefaultGroup;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070040import org.onosproject.net.group.DefaultGroupBucket;
alshabib10580802015-02-18 18:30:33 -080041import org.onosproject.net.group.DefaultGroupDescription;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070042import org.onosproject.net.group.DefaultGroupKey;
alshabib10580802015-02-18 18:30:33 -080043import org.onosproject.net.group.Group;
44import org.onosproject.net.group.Group.GroupState;
45import org.onosproject.net.group.GroupBucket;
46import org.onosproject.net.group.GroupBuckets;
47import org.onosproject.net.group.GroupDescription;
48import org.onosproject.net.group.GroupEvent;
49import org.onosproject.net.group.GroupEvent.Type;
50import org.onosproject.net.group.GroupKey;
51import org.onosproject.net.group.GroupOperation;
52import org.onosproject.net.group.GroupStore;
53import org.onosproject.net.group.GroupStoreDelegate;
Srikanth Vavilapalli10e75cd2015-04-13 16:21:24 -070054import org.onosproject.net.group.StoredGroupBucketEntry;
alshabib10580802015-02-18 18:30:33 -080055import org.onosproject.net.group.StoredGroupEntry;
56import org.onosproject.store.AbstractStore;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070057import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070058import org.onosproject.store.serializers.KryoNamespaces;
Madan Jampani0b847532016-03-03 13:44:15 -080059import org.onosproject.store.service.ConsistentMap;
60import org.onosproject.store.service.MapEvent;
61import org.onosproject.store.service.MapEventListener;
alshabibb0285992016-03-28 23:30:37 -070062import org.onosproject.store.service.MultiValuedTimestamp;
Madan Jampani0b847532016-03-03 13:44:15 -080063import org.onosproject.store.service.Serializer;
Jonathan Hart6ec029a2015-03-24 17:12:35 -070064import org.onosproject.store.service.StorageService;
helenyrwua1c41152016-08-18 16:16:14 -070065import org.onosproject.store.service.Topic;
Madan Jampani0b847532016-03-03 13:44:15 -080066import org.onosproject.store.service.Versioned;
Kavitha Alagesan8dba7992017-01-19 12:32:26 +053067import org.onosproject.store.service.DistributedPrimitive.Status;
alshabibb0285992016-03-28 23:30:37 -070068import org.osgi.service.component.ComponentContext;
alshabib10580802015-02-18 18:30:33 -080069import org.slf4j.Logger;
70
Jonathan Hart6ec029a2015-03-24 17:12:35 -070071import java.util.ArrayList;
Srikanth Vavilapalli23181912015-05-04 09:48:09 -070072import java.util.Collection;
Charles Chanf4838a72015-12-07 18:13:45 -080073import java.util.Collections;
alshabibb0285992016-03-28 23:30:37 -070074import java.util.Dictionary;
Jonathan Hart6ec029a2015-03-24 17:12:35 -070075import java.util.HashMap;
Charles Chan0c7c43b2016-01-14 17:39:20 -080076import java.util.HashSet;
Srikanth Vavilapalli23181912015-05-04 09:48:09 -070077import java.util.Iterator;
Jonathan Hart6ec029a2015-03-24 17:12:35 -070078import java.util.List;
Madan Jampani0b847532016-03-03 13:44:15 -080079import java.util.Map;
Charles Chan0c7c43b2016-01-14 17:39:20 -080080import java.util.Map.Entry;
Jonathan Hart6ec029a2015-03-24 17:12:35 -070081import java.util.Objects;
Sho SHIMIZU30d639b2015-05-05 09:30:35 -070082import java.util.Optional;
alshabibb0285992016-03-28 23:30:37 -070083import java.util.Properties;
Srikanth Vavilapalli23181912015-05-04 09:48:09 -070084import java.util.Set;
Jonathan Hart6ec029a2015-03-24 17:12:35 -070085import java.util.concurrent.ConcurrentHashMap;
86import java.util.concurrent.ConcurrentMap;
87import java.util.concurrent.ExecutorService;
88import java.util.concurrent.Executors;
Kavitha Alagesan8dba7992017-01-19 12:32:26 +053089import java.util.concurrent.ScheduledExecutorService;
Jonathan Hart6ec029a2015-03-24 17:12:35 -070090import java.util.concurrent.atomic.AtomicInteger;
Kavitha Alagesan8dba7992017-01-19 12:32:26 +053091import java.util.function.Consumer;
Jonathan Hart6ec029a2015-03-24 17:12:35 -070092import java.util.stream.Collectors;
93
alshabibb0285992016-03-28 23:30:37 -070094import static com.google.common.base.Strings.isNullOrEmpty;
Kavitha Alagesan8dba7992017-01-19 12:32:26 +053095import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
alshabibb0285992016-03-28 23:30:37 -070096import static org.onlab.util.Tools.get;
Jonathan Hart6ec029a2015-03-24 17:12:35 -070097import static org.onlab.util.Tools.groupedThreads;
98import static org.slf4j.LoggerFactory.getLogger;
alshabib10580802015-02-18 18:30:33 -080099
100/**
Saurav Das0fd79d92016-03-07 10:58:36 -0800101 * Manages inventory of group entries using distributed group stores from the
102 * storage service.
alshabib10580802015-02-18 18:30:33 -0800103 */
104@Component(immediate = true)
105@Service
106public class DistributedGroupStore
107 extends AbstractStore<GroupEvent, GroupStoreDelegate>
108 implements GroupStore {
109
110 private final Logger log = getLogger(getClass());
111
alshabibb0285992016-03-28 23:30:37 -0700112 private static final boolean GARBAGE_COLLECT = false;
113 private static final int GC_THRESH = 6;
114
alshabib10580802015-02-18 18:30:33 -0800115 private final int dummyId = 0xffffffff;
116 private final GroupId dummyGroupId = new DefaultGroupId(dummyId);
117
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700118 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
119 protected ClusterCommunicationService clusterCommunicator;
120
121 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
122 protected ClusterService clusterService;
123
124 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700125 protected StorageService storageService;
126
127 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700128 protected MastershipService mastershipService;
129
alshabibb0285992016-03-28 23:30:37 -0700130 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
131 protected ComponentConfigService cfgService;
132
Kavitha Alagesan8dba7992017-01-19 12:32:26 +0530133 private ScheduledExecutorService executor;
134 private Consumer<Status> statusChangeListener;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700135 // Per device group table with (device id + app cookie) as key
Madan Jampani0b847532016-03-03 13:44:15 -0800136 private ConsistentMap<GroupStoreKeyMapKey,
alshabibb0285992016-03-28 23:30:37 -0700137 StoredGroupEntry> groupStoreEntriesByKey = null;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700138 // Per device group table with (device id + group id) as key
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700139 private final ConcurrentMap<DeviceId, ConcurrentMap<GroupId, StoredGroupEntry>>
alshabibb0285992016-03-28 23:30:37 -0700140 groupEntriesById = new ConcurrentHashMap<>();
Madan Jampani0b847532016-03-03 13:44:15 -0800141 private ConsistentMap<GroupStoreKeyMapKey,
alshabibb0285992016-03-28 23:30:37 -0700142 StoredGroupEntry> auditPendingReqQueue = null;
Frank Wange0eb5ce2016-07-01 18:21:25 +0800143 private MapEventListener<GroupStoreKeyMapKey, StoredGroupEntry>
144 mapListener = new GroupStoreKeyMapListener();
alshabib10580802015-02-18 18:30:33 -0800145 private final ConcurrentMap<DeviceId, ConcurrentMap<GroupId, Group>>
146 extraneousGroupEntriesById = new ConcurrentHashMap<>();
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700147 private ExecutorService messageHandlingExecutor;
148 private static final int MESSAGE_HANDLER_THREAD_POOL_SIZE = 1;
Sho SHIMIZU7a4087b2015-09-10 09:23:16 -0700149 private final HashMap<DeviceId, Boolean> deviceAuditStatus = new HashMap<>();
alshabib10580802015-02-18 18:30:33 -0800150
151 private final AtomicInteger groupIdGen = new AtomicInteger();
152
HIGUCHI Yuta180d70f2015-10-01 16:13:56 -0700153 private KryoNamespace clusterMsgSerializer;
154
helenyrwua1c41152016-08-18 16:16:14 -0700155 private static Topic<GroupStoreMessage> groupTopic;
156
alshabibb0285992016-03-28 23:30:37 -0700157 @Property(name = "garbageCollect", boolValue = GARBAGE_COLLECT,
158 label = "Enable group garbage collection")
159 private boolean garbageCollect = GARBAGE_COLLECT;
160
161 @Property(name = "gcThresh", intValue = GC_THRESH,
162 label = "Number of rounds for group garbage collection")
163 private int gcThresh = GC_THRESH;
164
165
alshabib10580802015-02-18 18:30:33 -0800166 @Activate
sivachidambaram subramanian247dfd82017-04-12 11:36:12 +0530167 public void activate(ComponentContext context) {
alshabibb0285992016-03-28 23:30:37 -0700168 cfgService.registerProperties(getClass());
sivachidambaram subramanian247dfd82017-04-12 11:36:12 +0530169 modified(context);
HIGUCHI Yuta3a84b322016-05-18 13:38:07 -0700170 KryoNamespace.Builder kryoBuilder = new KryoNamespace.Builder()
alshabibb0285992016-03-28 23:30:37 -0700171 .register(KryoNamespaces.API)
HIGUCHI Yuta3a84b322016-05-18 13:38:07 -0700172 .nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID)
alshabibb0285992016-03-28 23:30:37 -0700173 .register(DefaultGroup.class,
174 DefaultGroupBucket.class,
175 DefaultGroupDescription.class,
176 DefaultGroupKey.class,
177 GroupDescription.Type.class,
178 Group.GroupState.class,
179 GroupBuckets.class,
alshabibb0285992016-03-28 23:30:37 -0700180 GroupStoreMessage.class,
181 GroupStoreMessage.Type.class,
182 UpdateType.class,
183 GroupStoreMessageSubjects.class,
184 MultiValuedTimestamp.class,
185 GroupStoreKeyMapKey.class,
186 GroupStoreIdMapKey.class,
187 GroupStoreMapKey.class
188 );
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700189
HIGUCHI Yuta3a84b322016-05-18 13:38:07 -0700190 clusterMsgSerializer = kryoBuilder.build("GroupStore");
191 Serializer serializer = Serializer.using(clusterMsgSerializer);
HIGUCHI Yuta180d70f2015-10-01 16:13:56 -0700192
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700193 messageHandlingExecutor = Executors.
194 newFixedThreadPool(MESSAGE_HANDLER_THREAD_POOL_SIZE,
195 groupedThreads("onos/store/group",
HIGUCHI Yutad9e01052016-04-14 09:31:42 -0700196 "message-handlers",
197 log));
Madan Jampani01e05fb2015-08-13 13:29:36 -0700198
199 clusterCommunicator.addSubscriber(GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST,
alshabibb0285992016-03-28 23:30:37 -0700200 clusterMsgSerializer::deserialize,
201 this::process,
202 messageHandlingExecutor);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700203
Madan Jampani0b847532016-03-03 13:44:15 -0800204 log.debug("Creating Consistent map onos-group-store-keymap");
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700205
Madan Jampani0b847532016-03-03 13:44:15 -0800206 groupStoreEntriesByKey = storageService.<GroupStoreKeyMapKey, StoredGroupEntry>consistentMapBuilder()
207 .withName("onos-group-store-keymap")
HIGUCHI Yuta3a84b322016-05-18 13:38:07 -0700208 .withSerializer(serializer)
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700209 .build();
Frank Wange0eb5ce2016-07-01 18:21:25 +0800210 groupStoreEntriesByKey.addListener(mapListener);
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700211 log.debug("Current size of groupstorekeymap:{}",
212 groupStoreEntriesByKey.size());
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700213
Kavitha Alagesan8dba7992017-01-19 12:32:26 +0530214 log.debug("Creating GroupStoreId Map From GroupStoreKey Map");
215 matchGroupEntries();
216 executor = newSingleThreadScheduledExecutor(groupedThreads("onos/group", "store", log));
217 statusChangeListener = status -> {
218 if (status == Status.ACTIVE) {
219 executor.execute(this::matchGroupEntries);
220 }
221 };
222 groupStoreEntriesByKey.addStatusChangeListener(statusChangeListener);
223
Madan Jampani0b847532016-03-03 13:44:15 -0800224 log.debug("Creating Consistent map pendinggroupkeymap");
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700225
Madan Jampani0b847532016-03-03 13:44:15 -0800226 auditPendingReqQueue = storageService.<GroupStoreKeyMapKey, StoredGroupEntry>consistentMapBuilder()
227 .withName("onos-pending-group-keymap")
HIGUCHI Yuta3a84b322016-05-18 13:38:07 -0700228 .withSerializer(serializer)
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700229 .build();
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700230 log.debug("Current size of pendinggroupkeymap:{}",
231 auditPendingReqQueue.size());
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700232
helenyrwua1c41152016-08-18 16:16:14 -0700233 groupTopic = getOrCreateGroupTopic(serializer);
234 groupTopic.subscribe(this::processGroupMessage);
235
alshabib10580802015-02-18 18:30:33 -0800236 log.info("Started");
237 }
238
239 @Deactivate
240 public void deactivate() {
Frank Wange0eb5ce2016-07-01 18:21:25 +0800241 groupStoreEntriesByKey.removeListener(mapListener);
alshabibb0285992016-03-28 23:30:37 -0700242 cfgService.unregisterProperties(getClass(), false);
HIGUCHI Yuta180d70f2015-10-01 16:13:56 -0700243 clusterCommunicator.removeSubscriber(GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST);
alshabib10580802015-02-18 18:30:33 -0800244 log.info("Stopped");
245 }
246
alshabibb0285992016-03-28 23:30:37 -0700247 @Modified
248 public void modified(ComponentContext context) {
249 Dictionary<?, ?> properties = context != null ? context.getProperties() : new Properties();
250
251 try {
252 String s = get(properties, "garbageCollect");
253 garbageCollect = isNullOrEmpty(s) ? GARBAGE_COLLECT : Boolean.parseBoolean(s.trim());
254
255 s = get(properties, "gcThresh");
256 gcThresh = isNullOrEmpty(s) ? GC_THRESH : Integer.parseInt(s.trim());
257 } catch (Exception e) {
258 gcThresh = GC_THRESH;
259 garbageCollect = GARBAGE_COLLECT;
260 }
261 }
262
helenyrwua1c41152016-08-18 16:16:14 -0700263 private Topic<GroupStoreMessage> getOrCreateGroupTopic(Serializer serializer) {
264 if (groupTopic == null) {
265 return storageService.getTopic("group-failover-notif", serializer);
266 } else {
267 return groupTopic;
268 }
Sho SHIMIZUa6285542017-01-12 15:08:24 -0800269 }
helenyrwua1c41152016-08-18 16:16:14 -0700270
alshabib10580802015-02-18 18:30:33 -0800271 /**
Kavitha Alagesan8dba7992017-01-19 12:32:26 +0530272 * Updating values of groupEntriesById.
273 */
274 private void matchGroupEntries() {
275 for (Entry<GroupStoreKeyMapKey, StoredGroupEntry> entry : groupStoreEntriesByKey.asJavaMap().entrySet()) {
276 StoredGroupEntry group = entry.getValue();
277 getGroupIdTable(entry.getKey().deviceId()).put(group.id(), group);
278 }
279 }
280
281 /**
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700282 * Returns the group store eventual consistent key map.
alshabib10580802015-02-18 18:30:33 -0800283 *
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700284 * @return Map representing group key table.
alshabib10580802015-02-18 18:30:33 -0800285 */
Madan Jampani0b847532016-03-03 13:44:15 -0800286 private Map<GroupStoreKeyMapKey, StoredGroupEntry>
alshabibb0285992016-03-28 23:30:37 -0700287 getGroupStoreKeyMap() {
Madan Jampani0b847532016-03-03 13:44:15 -0800288 return groupStoreEntriesByKey.asJavaMap();
alshabib10580802015-02-18 18:30:33 -0800289 }
290
291 /**
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700292 * Returns the group id table for specified device.
alshabib10580802015-02-18 18:30:33 -0800293 *
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700294 * @param deviceId identifier of the device
295 * @return Map representing group key table of given device.
alshabib10580802015-02-18 18:30:33 -0800296 */
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700297 private ConcurrentMap<GroupId, StoredGroupEntry> getGroupIdTable(DeviceId deviceId) {
Yuta HIGUCHIc2e68152016-08-16 13:47:36 -0700298 return groupEntriesById.computeIfAbsent(deviceId, k -> new ConcurrentHashMap<>());
alshabib10580802015-02-18 18:30:33 -0800299 }
300
301 /**
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700302 * Returns the pending group request table.
alshabib10580802015-02-18 18:30:33 -0800303 *
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700304 * @return Map representing group key table.
alshabib10580802015-02-18 18:30:33 -0800305 */
Madan Jampani0b847532016-03-03 13:44:15 -0800306 private Map<GroupStoreKeyMapKey, StoredGroupEntry>
alshabibb0285992016-03-28 23:30:37 -0700307 getPendingGroupKeyTable() {
Madan Jampani0b847532016-03-03 13:44:15 -0800308 return auditPendingReqQueue.asJavaMap();
alshabib10580802015-02-18 18:30:33 -0800309 }
310
311 /**
312 * Returns the extraneous group id table for specified device.
313 *
314 * @param deviceId identifier of the device
315 * @return Map representing group key table of given device.
316 */
317 private ConcurrentMap<GroupId, Group>
318 getExtraneousGroupIdTable(DeviceId deviceId) {
Yuta HIGUCHIc2e68152016-08-16 13:47:36 -0700319 return extraneousGroupEntriesById.computeIfAbsent(deviceId, k -> new ConcurrentHashMap<>());
alshabib10580802015-02-18 18:30:33 -0800320 }
321
322 /**
323 * Returns the number of groups for the specified device in the store.
324 *
325 * @return number of groups for the specified device
326 */
327 @Override
328 public int getGroupCount(DeviceId deviceId) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700329 return (getGroups(deviceId) != null) ?
alshabibb0285992016-03-28 23:30:37 -0700330 Iterables.size(getGroups(deviceId)) : 0;
alshabib10580802015-02-18 18:30:33 -0800331 }
332
333 /**
334 * Returns the groups associated with a device.
335 *
336 * @param deviceId the device ID
alshabib10580802015-02-18 18:30:33 -0800337 * @return the group entries
338 */
339 @Override
340 public Iterable<Group> getGroups(DeviceId deviceId) {
Charles Chanf4838a72015-12-07 18:13:45 -0800341 // Let ImmutableSet.copyOf do the type conversion
342 return ImmutableSet.copyOf(getStoredGroups(deviceId));
alshabib10580802015-02-18 18:30:33 -0800343 }
344
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700345 private Iterable<StoredGroupEntry> getStoredGroups(DeviceId deviceId) {
Charles Chanf4838a72015-12-07 18:13:45 -0800346 NodeId master = mastershipService.getMasterFor(deviceId);
347 if (master == null) {
348 log.debug("Failed to getGroups: No master for {}", deviceId);
349 return Collections.emptySet();
350 }
351
352 Set<StoredGroupEntry> storedGroups = getGroupStoreKeyMap().values()
353 .stream()
354 .filter(input -> input.deviceId().equals(deviceId))
355 .collect(Collectors.toSet());
356 return ImmutableSet.copyOf(storedGroups);
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700357 }
358
alshabib10580802015-02-18 18:30:33 -0800359 /**
360 * Returns the stored group entry.
361 *
alshabibb0285992016-03-28 23:30:37 -0700362 * @param deviceId the device ID
alshabib10580802015-02-18 18:30:33 -0800363 * @param appCookie the group key
alshabib10580802015-02-18 18:30:33 -0800364 * @return a group associated with the key
365 */
366 @Override
367 public Group getGroup(DeviceId deviceId, GroupKey appCookie) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700368 return getStoredGroupEntry(deviceId, appCookie);
369 }
370
371 private StoredGroupEntry getStoredGroupEntry(DeviceId deviceId,
372 GroupKey appCookie) {
373 return getGroupStoreKeyMap().get(new GroupStoreKeyMapKey(deviceId,
374 appCookie));
375 }
376
377 @Override
378 public Group getGroup(DeviceId deviceId, GroupId groupId) {
379 return getStoredGroupEntry(deviceId, groupId);
380 }
381
382 private StoredGroupEntry getStoredGroupEntry(DeviceId deviceId,
383 GroupId groupId) {
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700384 return getGroupIdTable(deviceId).get(groupId);
alshabib10580802015-02-18 18:30:33 -0800385 }
386
387 private int getFreeGroupIdValue(DeviceId deviceId) {
388 int freeId = groupIdGen.incrementAndGet();
389
390 while (true) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700391 Group existing = getGroup(deviceId, new DefaultGroupId(freeId));
alshabib10580802015-02-18 18:30:33 -0800392 if (existing == null) {
393 existing = (
394 extraneousGroupEntriesById.get(deviceId) != null) ?
395 extraneousGroupEntriesById.get(deviceId).
396 get(new DefaultGroupId(freeId)) :
397 null;
398 }
399 if (existing != null) {
400 freeId = groupIdGen.incrementAndGet();
401 } else {
402 break;
403 }
404 }
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700405 log.debug("getFreeGroupIdValue: Next Free ID is {}", freeId);
alshabib10580802015-02-18 18:30:33 -0800406 return freeId;
407 }
408
409 /**
410 * Stores a new group entry using the information from group description.
411 *
412 * @param groupDesc group description to be used to create group entry
413 */
414 @Override
415 public void storeGroupDescription(GroupDescription groupDesc) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700416 log.debug("In storeGroupDescription");
alshabib10580802015-02-18 18:30:33 -0800417 // Check if a group is existing with the same key
Saurav Das8a0732e2015-11-20 15:27:53 -0800418 Group existingGroup = getGroup(groupDesc.deviceId(), groupDesc.appCookie());
419 if (existingGroup != null) {
Charles Chan216e3c82016-04-23 14:48:16 -0700420 log.info("Group already exists with the same key {} in dev:{} with id:0x{}",
Saurav Das8a0732e2015-11-20 15:27:53 -0800421 groupDesc.appCookie(), groupDesc.deviceId(),
422 Integer.toHexString(existingGroup.id().id()));
alshabib10580802015-02-18 18:30:33 -0800423 return;
424 }
425
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700426 // Check if group to be created by a remote instance
Madan Jampani175e8fd2015-05-20 14:10:45 -0700427 if (mastershipService.getLocalRole(groupDesc.deviceId()) != MastershipRole.MASTER) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700428 log.debug("storeGroupDescription: Device {} local role is not MASTER",
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700429 groupDesc.deviceId());
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700430 if (mastershipService.getMasterFor(groupDesc.deviceId()) == null) {
431 log.error("No Master for device {}..."
alshabibb0285992016-03-28 23:30:37 -0700432 + "Can not perform add group operation",
433 groupDesc.deviceId());
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700434 //TODO: Send Group operation failure event
435 return;
436 }
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700437 GroupStoreMessage groupOp = GroupStoreMessage.
438 createGroupAddRequestMsg(groupDesc.deviceId(),
439 groupDesc);
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700440
Madan Jampani175e8fd2015-05-20 14:10:45 -0700441 clusterCommunicator.unicast(groupOp,
alshabibb0285992016-03-28 23:30:37 -0700442 GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST,
443 clusterMsgSerializer::serialize,
444 mastershipService.getMasterFor(groupDesc.deviceId()))
445 .whenComplete((result, error) -> {
Madan Jampani175e8fd2015-05-20 14:10:45 -0700446 if (error != null) {
447 log.warn("Failed to send request to master: {} to {}",
alshabibb0285992016-03-28 23:30:37 -0700448 groupOp,
449 mastershipService.getMasterFor(groupDesc.deviceId()));
Madan Jampani175e8fd2015-05-20 14:10:45 -0700450 //TODO: Send Group operation failure event
451 } else {
452 log.debug("Sent Group operation request for device {} "
alshabibb0285992016-03-28 23:30:37 -0700453 + "to remote MASTER {}",
454 groupDesc.deviceId(),
455 mastershipService.getMasterFor(groupDesc.deviceId()));
Madan Jampani175e8fd2015-05-20 14:10:45 -0700456 }
457 });
alshabib10580802015-02-18 18:30:33 -0800458 return;
459 }
460
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700461 log.debug("Store group for device {} is getting handled locally",
462 groupDesc.deviceId());
alshabib10580802015-02-18 18:30:33 -0800463 storeGroupDescriptionInternal(groupDesc);
464 }
465
Srikanth Vavilapallie48b3cf2015-07-06 11:43:07 -0700466 private Group getMatchingExtraneousGroupbyId(DeviceId deviceId, Integer groupId) {
467 ConcurrentMap<GroupId, Group> extraneousMap =
468 extraneousGroupEntriesById.get(deviceId);
469 if (extraneousMap == null) {
470 return null;
471 }
472 return extraneousMap.get(new DefaultGroupId(groupId));
473 }
474
475 private Group getMatchingExtraneousGroupbyBuckets(DeviceId deviceId,
476 GroupBuckets buckets) {
477 ConcurrentMap<GroupId, Group> extraneousMap =
478 extraneousGroupEntriesById.get(deviceId);
479 if (extraneousMap == null) {
480 return null;
481 }
482
alshabibb0285992016-03-28 23:30:37 -0700483 for (Group extraneousGroup : extraneousMap.values()) {
Srikanth Vavilapallie48b3cf2015-07-06 11:43:07 -0700484 if (extraneousGroup.buckets().equals(buckets)) {
485 return extraneousGroup;
486 }
487 }
488 return null;
489 }
490
alshabib10580802015-02-18 18:30:33 -0800491 private void storeGroupDescriptionInternal(GroupDescription groupDesc) {
492 // Check if a group is existing with the same key
493 if (getGroup(groupDesc.deviceId(), groupDesc.appCookie()) != null) {
494 return;
495 }
496
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700497 if (deviceAuditStatus.get(groupDesc.deviceId()) == null) {
498 // Device group audit has not completed yet
499 // Add this group description to pending group key table
500 // Create a group entry object with Dummy Group ID
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700501 log.debug("storeGroupDescriptionInternal: Device {} AUDIT pending...Queuing Group ADD request",
alshabibb0285992016-03-28 23:30:37 -0700502 groupDesc.deviceId());
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700503 StoredGroupEntry group = new DefaultGroup(dummyGroupId, groupDesc);
504 group.setState(GroupState.WAITING_AUDIT_COMPLETE);
Madan Jampani0b847532016-03-03 13:44:15 -0800505 Map<GroupStoreKeyMapKey, StoredGroupEntry> pendingKeyTable =
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700506 getPendingGroupKeyTable();
507 pendingKeyTable.put(new GroupStoreKeyMapKey(groupDesc.deviceId(),
508 groupDesc.appCookie()),
509 group);
510 return;
511 }
512
Srikanth Vavilapallie48b3cf2015-07-06 11:43:07 -0700513 Group matchingExtraneousGroup = null;
514 if (groupDesc.givenGroupId() != null) {
515 //Check if there is a extraneous group existing with the same Id
516 matchingExtraneousGroup = getMatchingExtraneousGroupbyId(
alshabibb0285992016-03-28 23:30:37 -0700517 groupDesc.deviceId(), groupDesc.givenGroupId());
Srikanth Vavilapallie48b3cf2015-07-06 11:43:07 -0700518 if (matchingExtraneousGroup != null) {
Saurav Das0fd79d92016-03-07 10:58:36 -0800519 log.debug("storeGroupDescriptionInternal: Matching extraneous group "
alshabibb0285992016-03-28 23:30:37 -0700520 + "found in Device {} for group id 0x{}",
Srikanth Vavilapallie48b3cf2015-07-06 11:43:07 -0700521 groupDesc.deviceId(),
Saurav Das0fd79d92016-03-07 10:58:36 -0800522 Integer.toHexString(groupDesc.givenGroupId()));
Srikanth Vavilapallie48b3cf2015-07-06 11:43:07 -0700523 //Check if the group buckets matches with user provided buckets
524 if (matchingExtraneousGroup.buckets().equals(groupDesc.buckets())) {
525 //Group is already existing with the same buckets and Id
526 // Create a group entry object
Saurav Das0fd79d92016-03-07 10:58:36 -0800527 log.debug("storeGroupDescriptionInternal: Buckets also matching "
alshabibb0285992016-03-28 23:30:37 -0700528 + "in Device {} for group id 0x{}",
Srikanth Vavilapallie48b3cf2015-07-06 11:43:07 -0700529 groupDesc.deviceId(),
Saurav Das0fd79d92016-03-07 10:58:36 -0800530 Integer.toHexString(groupDesc.givenGroupId()));
Srikanth Vavilapallie48b3cf2015-07-06 11:43:07 -0700531 StoredGroupEntry group = new DefaultGroup(
alshabibb0285992016-03-28 23:30:37 -0700532 matchingExtraneousGroup.id(), groupDesc);
Srikanth Vavilapallie48b3cf2015-07-06 11:43:07 -0700533 // Insert the newly created group entry into key and id maps
534 getGroupStoreKeyMap().
alshabibb0285992016-03-28 23:30:37 -0700535 put(new GroupStoreKeyMapKey(groupDesc.deviceId(),
536 groupDesc.appCookie()), group);
Srikanth Vavilapallie48b3cf2015-07-06 11:43:07 -0700537 // Ensure it also inserted into group id based table to
538 // avoid any chances of duplication in group id generation
539 getGroupIdTable(groupDesc.deviceId()).
alshabibb0285992016-03-28 23:30:37 -0700540 put(matchingExtraneousGroup.id(), group);
Srikanth Vavilapallie48b3cf2015-07-06 11:43:07 -0700541 addOrUpdateGroupEntry(matchingExtraneousGroup);
542 removeExtraneousGroupEntry(matchingExtraneousGroup);
543 return;
544 } else {
545 //Group buckets are not matching. Update group
546 //with user provided buckets.
Saurav Das0fd79d92016-03-07 10:58:36 -0800547 log.debug("storeGroupDescriptionInternal: Buckets are not "
alshabibb0285992016-03-28 23:30:37 -0700548 + "matching in Device {} for group id 0x{}",
Srikanth Vavilapallie48b3cf2015-07-06 11:43:07 -0700549 groupDesc.deviceId(),
Saurav Das0fd79d92016-03-07 10:58:36 -0800550 Integer.toHexString(groupDesc.givenGroupId()));
551 StoredGroupEntry modifiedGroup = new DefaultGroup(
alshabibb0285992016-03-28 23:30:37 -0700552 matchingExtraneousGroup.id(), groupDesc);
Saurav Das0fd79d92016-03-07 10:58:36 -0800553 modifiedGroup.setState(GroupState.PENDING_UPDATE);
554 getGroupStoreKeyMap().
alshabibb0285992016-03-28 23:30:37 -0700555 put(new GroupStoreKeyMapKey(groupDesc.deviceId(),
556 groupDesc.appCookie()), modifiedGroup);
Saurav Das0fd79d92016-03-07 10:58:36 -0800557 // Ensure it also inserted into group id based table to
558 // avoid any chances of duplication in group id generation
559 getGroupIdTable(groupDesc.deviceId()).
alshabibb0285992016-03-28 23:30:37 -0700560 put(matchingExtraneousGroup.id(), modifiedGroup);
Saurav Das0fd79d92016-03-07 10:58:36 -0800561 removeExtraneousGroupEntry(matchingExtraneousGroup);
562 log.debug("storeGroupDescriptionInternal: Triggering Group "
alshabibb0285992016-03-28 23:30:37 -0700563 + "UPDATE request for {} in device {}",
Saurav Das0fd79d92016-03-07 10:58:36 -0800564 matchingExtraneousGroup.id(),
565 groupDesc.deviceId());
566 notifyDelegate(new GroupEvent(Type.GROUP_UPDATE_REQUESTED, modifiedGroup));
567 return;
Srikanth Vavilapallie48b3cf2015-07-06 11:43:07 -0700568 }
569 }
570 } else {
571 //Check if there is an extraneous group with user provided buckets
572 matchingExtraneousGroup = getMatchingExtraneousGroupbyBuckets(
alshabibb0285992016-03-28 23:30:37 -0700573 groupDesc.deviceId(), groupDesc.buckets());
Srikanth Vavilapallie48b3cf2015-07-06 11:43:07 -0700574 if (matchingExtraneousGroup != null) {
575 //Group is already existing with the same buckets.
576 //So reuse this group.
577 log.debug("storeGroupDescriptionInternal: Matching extraneous group found in Device {}",
578 groupDesc.deviceId());
579 //Create a group entry object
580 StoredGroupEntry group = new DefaultGroup(
alshabibb0285992016-03-28 23:30:37 -0700581 matchingExtraneousGroup.id(), groupDesc);
Srikanth Vavilapallie48b3cf2015-07-06 11:43:07 -0700582 // Insert the newly created group entry into key and id maps
583 getGroupStoreKeyMap().
alshabibb0285992016-03-28 23:30:37 -0700584 put(new GroupStoreKeyMapKey(groupDesc.deviceId(),
585 groupDesc.appCookie()), group);
Srikanth Vavilapallie48b3cf2015-07-06 11:43:07 -0700586 // Ensure it also inserted into group id based table to
587 // avoid any chances of duplication in group id generation
588 getGroupIdTable(groupDesc.deviceId()).
alshabibb0285992016-03-28 23:30:37 -0700589 put(matchingExtraneousGroup.id(), group);
Srikanth Vavilapallie48b3cf2015-07-06 11:43:07 -0700590 addOrUpdateGroupEntry(matchingExtraneousGroup);
591 removeExtraneousGroupEntry(matchingExtraneousGroup);
592 return;
593 } else {
594 //TODO: Check if there are any empty groups that can be used here
595 log.debug("storeGroupDescriptionInternal: No matching extraneous groups found in Device {}",
596 groupDesc.deviceId());
597 }
598 }
599
Saurav Das100e3b82015-04-30 11:12:10 -0700600 GroupId id = null;
601 if (groupDesc.givenGroupId() == null) {
602 // Get a new group identifier
603 id = new DefaultGroupId(getFreeGroupIdValue(groupDesc.deviceId()));
604 } else {
Saurav Das8be4e3a2016-03-11 17:19:07 -0800605 // we need to use the identifier passed in by caller, but check if
606 // already used
607 Group existing = getGroup(groupDesc.deviceId(),
608 new DefaultGroupId(groupDesc.givenGroupId()));
609 if (existing != null) {
610 log.warn("Group already exists with the same id: 0x{} in dev:{} "
alshabibb0285992016-03-28 23:30:37 -0700611 + "but with different key: {} (request gkey: {})",
612 Integer.toHexString(groupDesc.givenGroupId()),
613 groupDesc.deviceId(),
614 existing.appCookie(),
615 groupDesc.appCookie());
Saurav Das8be4e3a2016-03-11 17:19:07 -0800616 return;
617 }
Saurav Das100e3b82015-04-30 11:12:10 -0700618 id = new DefaultGroupId(groupDesc.givenGroupId());
619 }
alshabib10580802015-02-18 18:30:33 -0800620 // Create a group entry object
621 StoredGroupEntry group = new DefaultGroup(id, groupDesc);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700622 // Insert the newly created group entry into key and id maps
623 getGroupStoreKeyMap().
alshabibb0285992016-03-28 23:30:37 -0700624 put(new GroupStoreKeyMapKey(groupDesc.deviceId(),
625 groupDesc.appCookie()), group);
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700626 // Ensure it also inserted into group id based table to
627 // avoid any chances of duplication in group id generation
628 getGroupIdTable(groupDesc.deviceId()).
alshabibb0285992016-03-28 23:30:37 -0700629 put(id, group);
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700630 log.debug("storeGroupDescriptionInternal: Processing Group ADD request for Id {} in device {}",
alshabibb0285992016-03-28 23:30:37 -0700631 id,
632 groupDesc.deviceId());
alshabib10580802015-02-18 18:30:33 -0800633 notifyDelegate(new GroupEvent(GroupEvent.Type.GROUP_ADD_REQUESTED,
634 group));
635 }
636
637 /**
638 * Updates the existing group entry with the information
639 * from group description.
640 *
alshabibb0285992016-03-28 23:30:37 -0700641 * @param deviceId the device ID
alshabib10580802015-02-18 18:30:33 -0800642 * @param oldAppCookie the current group key
alshabibb0285992016-03-28 23:30:37 -0700643 * @param type update type
644 * @param newBuckets group buckets for updates
alshabib10580802015-02-18 18:30:33 -0800645 * @param newAppCookie optional new group key
646 */
647 @Override
648 public void updateGroupDescription(DeviceId deviceId,
649 GroupKey oldAppCookie,
650 UpdateType type,
651 GroupBuckets newBuckets,
652 GroupKey newAppCookie) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700653 // Check if group update to be done by a remote instance
sangho52abe3a2015-05-05 14:13:34 -0700654 if (mastershipService.getMasterFor(deviceId) != null &&
655 mastershipService.getLocalRole(deviceId) != MastershipRole.MASTER) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700656 log.debug("updateGroupDescription: Device {} local role is not MASTER",
657 deviceId);
658 if (mastershipService.getMasterFor(deviceId) == null) {
659 log.error("No Master for device {}..."
alshabibb0285992016-03-28 23:30:37 -0700660 + "Can not perform update group operation",
661 deviceId);
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700662 //TODO: Send Group operation failure event
663 return;
664 }
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700665 GroupStoreMessage groupOp = GroupStoreMessage.
666 createGroupUpdateRequestMsg(deviceId,
667 oldAppCookie,
668 type,
669 newBuckets,
670 newAppCookie);
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700671
Madan Jampani175e8fd2015-05-20 14:10:45 -0700672 clusterCommunicator.unicast(groupOp,
alshabibb0285992016-03-28 23:30:37 -0700673 GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST,
674 clusterMsgSerializer::serialize,
675 mastershipService.getMasterFor(deviceId)).whenComplete((result, error) -> {
676 if (error != null) {
677 log.warn("Failed to send request to master: {} to {}",
678 groupOp,
679 mastershipService.getMasterFor(deviceId), error);
680 }
681 //TODO: Send Group operation failure event
682 });
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700683 return;
684 }
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700685 log.debug("updateGroupDescription for device {} is getting handled locally",
686 deviceId);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700687 updateGroupDescriptionInternal(deviceId,
688 oldAppCookie,
689 type,
690 newBuckets,
691 newAppCookie);
692 }
693
694 private void updateGroupDescriptionInternal(DeviceId deviceId,
alshabibb0285992016-03-28 23:30:37 -0700695 GroupKey oldAppCookie,
696 UpdateType type,
697 GroupBuckets newBuckets,
698 GroupKey newAppCookie) {
alshabib10580802015-02-18 18:30:33 -0800699 // Check if a group is existing with the provided key
700 Group oldGroup = getGroup(deviceId, oldAppCookie);
701 if (oldGroup == null) {
Saurav Das8be4e3a2016-03-11 17:19:07 -0800702 log.warn("updateGroupDescriptionInternal: Group not found...strange. "
alshabibb0285992016-03-28 23:30:37 -0700703 + "GroupKey:{} DeviceId:{}", oldAppCookie, deviceId);
alshabib10580802015-02-18 18:30:33 -0800704 return;
705 }
706
707 List<GroupBucket> newBucketList = getUpdatedBucketList(oldGroup,
708 type,
709 newBuckets);
710 if (newBucketList != null) {
711 // Create a new group object from the old group
712 GroupBuckets updatedBuckets = new GroupBuckets(newBucketList);
713 GroupKey newCookie = (newAppCookie != null) ? newAppCookie : oldAppCookie;
714 GroupDescription updatedGroupDesc = new DefaultGroupDescription(
715 oldGroup.deviceId(),
716 oldGroup.type(),
717 updatedBuckets,
718 newCookie,
Saurav Das100e3b82015-04-30 11:12:10 -0700719 oldGroup.givenGroupId(),
alshabib10580802015-02-18 18:30:33 -0800720 oldGroup.appId());
721 StoredGroupEntry newGroup = new DefaultGroup(oldGroup.id(),
722 updatedGroupDesc);
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700723 log.debug("updateGroupDescriptionInternal: group entry {} in device {} moving from {} to PENDING_UPDATE",
alshabibb0285992016-03-28 23:30:37 -0700724 oldGroup.id(),
725 oldGroup.deviceId(),
726 oldGroup.state());
alshabib10580802015-02-18 18:30:33 -0800727 newGroup.setState(GroupState.PENDING_UPDATE);
728 newGroup.setLife(oldGroup.life());
729 newGroup.setPackets(oldGroup.packets());
730 newGroup.setBytes(oldGroup.bytes());
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700731 //Update the group entry in groupkey based map.
732 //Update to groupid based map will happen in the
733 //groupkey based map update listener
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700734 log.debug("updateGroupDescriptionInternal with type {}: Group updated with buckets",
735 type);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700736 getGroupStoreKeyMap().
alshabibb0285992016-03-28 23:30:37 -0700737 put(new GroupStoreKeyMapKey(newGroup.deviceId(),
738 newGroup.appCookie()), newGroup);
alshabib10580802015-02-18 18:30:33 -0800739 notifyDelegate(new GroupEvent(Type.GROUP_UPDATE_REQUESTED, newGroup));
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700740 } else {
741 log.warn("updateGroupDescriptionInternal with type {}: No "
alshabibb0285992016-03-28 23:30:37 -0700742 + "change in the buckets in update", type);
alshabib10580802015-02-18 18:30:33 -0800743 }
744 }
745
746 private List<GroupBucket> getUpdatedBucketList(Group oldGroup,
747 UpdateType type,
748 GroupBuckets buckets) {
Victor Silva0282ab82016-11-15 16:30:27 -0300749 if (type == UpdateType.SET) {
750 return buckets.buckets();
751 }
752
Victor Silvadf1eeae2016-08-12 15:28:57 -0300753 List<GroupBucket> oldBuckets = oldGroup.buckets().buckets();
754 List<GroupBucket> updatedBucketList = new ArrayList<>();
alshabib10580802015-02-18 18:30:33 -0800755 boolean groupDescUpdated = false;
756
757 if (type == UpdateType.ADD) {
Victor Silvadf1eeae2016-08-12 15:28:57 -0300758 List<GroupBucket> newBuckets = buckets.buckets();
759
760 // Add old buckets that will not be updated and check if any will be updated.
761 for (GroupBucket oldBucket : oldBuckets) {
762 int newBucketIndex = newBuckets.indexOf(oldBucket);
763
764 if (newBucketIndex != -1) {
765 GroupBucket newBucket = newBuckets.get(newBucketIndex);
766 if (!newBucket.hasSameParameters(oldBucket)) {
767 // Bucket will be updated
768 groupDescUpdated = true;
769 }
770 } else {
771 // Old bucket will remain the same - add it.
772 updatedBucketList.add(oldBucket);
alshabib10580802015-02-18 18:30:33 -0800773 }
774 }
Victor Silvadf1eeae2016-08-12 15:28:57 -0300775
776 // Add all new buckets
777 updatedBucketList.addAll(newBuckets);
778 if (!oldBuckets.containsAll(newBuckets)) {
779 groupDescUpdated = true;
780 }
781
alshabib10580802015-02-18 18:30:33 -0800782 } else if (type == UpdateType.REMOVE) {
Victor Silvadf1eeae2016-08-12 15:28:57 -0300783 List<GroupBucket> bucketsToRemove = buckets.buckets();
784
785 // Check which old buckets should remain
786 for (GroupBucket oldBucket : oldBuckets) {
787 if (!bucketsToRemove.contains(oldBucket)) {
788 updatedBucketList.add(oldBucket);
789 } else {
alshabib10580802015-02-18 18:30:33 -0800790 groupDescUpdated = true;
791 }
792 }
793 }
794
795 if (groupDescUpdated) {
Victor Silvadf1eeae2016-08-12 15:28:57 -0300796 return updatedBucketList;
alshabib10580802015-02-18 18:30:33 -0800797 } else {
798 return null;
799 }
800 }
801
802 /**
803 * Triggers deleting the existing group entry.
804 *
alshabibb0285992016-03-28 23:30:37 -0700805 * @param deviceId the device ID
alshabib10580802015-02-18 18:30:33 -0800806 * @param appCookie the group key
807 */
808 @Override
809 public void deleteGroupDescription(DeviceId deviceId,
810 GroupKey appCookie) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700811 // Check if group to be deleted by a remote instance
812 if (mastershipService.
813 getLocalRole(deviceId) != MastershipRole.MASTER) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700814 log.debug("deleteGroupDescription: Device {} local role is not MASTER",
815 deviceId);
816 if (mastershipService.getMasterFor(deviceId) == null) {
817 log.error("No Master for device {}..."
alshabibb0285992016-03-28 23:30:37 -0700818 + "Can not perform delete group operation",
819 deviceId);
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700820 //TODO: Send Group operation failure event
821 return;
822 }
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700823 GroupStoreMessage groupOp = GroupStoreMessage.
824 createGroupDeleteRequestMsg(deviceId,
825 appCookie);
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700826
Madan Jampani175e8fd2015-05-20 14:10:45 -0700827 clusterCommunicator.unicast(groupOp,
alshabibb0285992016-03-28 23:30:37 -0700828 GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST,
829 clusterMsgSerializer::serialize,
830 mastershipService.getMasterFor(deviceId)).whenComplete((result, error) -> {
831 if (error != null) {
832 log.warn("Failed to send request to master: {} to {}",
833 groupOp,
834 mastershipService.getMasterFor(deviceId), error);
835 }
836 //TODO: Send Group operation failure event
837 });
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700838 return;
839 }
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700840 log.debug("deleteGroupDescription in device {} is getting handled locally",
841 deviceId);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700842 deleteGroupDescriptionInternal(deviceId, appCookie);
843 }
844
845 private void deleteGroupDescriptionInternal(DeviceId deviceId,
846 GroupKey appCookie) {
alshabib10580802015-02-18 18:30:33 -0800847 // Check if a group is existing with the provided key
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700848 StoredGroupEntry existing = getStoredGroupEntry(deviceId, appCookie);
alshabib10580802015-02-18 18:30:33 -0800849 if (existing == null) {
850 return;
851 }
852
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700853 log.debug("deleteGroupDescriptionInternal: group entry {} in device {} moving from {} to PENDING_DELETE",
alshabibb0285992016-03-28 23:30:37 -0700854 existing.id(),
855 existing.deviceId(),
856 existing.state());
alshabib10580802015-02-18 18:30:33 -0800857 synchronized (existing) {
858 existing.setState(GroupState.PENDING_DELETE);
Saurav Das80980c72016-03-23 11:22:49 -0700859 getGroupStoreKeyMap().
alshabibb0285992016-03-28 23:30:37 -0700860 put(new GroupStoreKeyMapKey(existing.deviceId(), existing.appCookie()),
861 existing);
alshabib10580802015-02-18 18:30:33 -0800862 }
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700863 log.debug("deleteGroupDescriptionInternal: in device {} issuing GROUP_REMOVE_REQUESTED",
864 deviceId);
alshabib10580802015-02-18 18:30:33 -0800865 notifyDelegate(new GroupEvent(Type.GROUP_REMOVE_REQUESTED, existing));
866 }
867
868 /**
869 * Stores a new group entry, or updates an existing entry.
870 *
871 * @param group group entry
872 */
873 @Override
874 public void addOrUpdateGroupEntry(Group group) {
875 // check if this new entry is an update to an existing entry
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700876 StoredGroupEntry existing = getStoredGroupEntry(group.deviceId(),
877 group.id());
alshabib10580802015-02-18 18:30:33 -0800878 GroupEvent event = null;
879
880 if (existing != null) {
Saurav Das0fd79d92016-03-07 10:58:36 -0800881 log.trace("addOrUpdateGroupEntry: updating group entry {} in device {}",
alshabibb0285992016-03-28 23:30:37 -0700882 group.id(),
883 group.deviceId());
alshabib10580802015-02-18 18:30:33 -0800884 synchronized (existing) {
alshabibb0285992016-03-28 23:30:37 -0700885 for (GroupBucket bucket : group.buckets().buckets()) {
Sho SHIMIZU30d639b2015-05-05 09:30:35 -0700886 Optional<GroupBucket> matchingBucket =
Srikanth Vavilapalli10e75cd2015-04-13 16:21:24 -0700887 existing.buckets().buckets()
alshabibb0285992016-03-28 23:30:37 -0700888 .stream()
889 .filter((existingBucket) -> (existingBucket.equals(bucket)))
890 .findFirst();
Srikanth Vavilapalli10e75cd2015-04-13 16:21:24 -0700891 if (matchingBucket.isPresent()) {
892 ((StoredGroupBucketEntry) matchingBucket.
893 get()).setPackets(bucket.packets());
894 ((StoredGroupBucketEntry) matchingBucket.
895 get()).setBytes(bucket.bytes());
896 } else {
897 log.warn("addOrUpdateGroupEntry: No matching "
alshabibb0285992016-03-28 23:30:37 -0700898 + "buckets to update stats");
Srikanth Vavilapalli10e75cd2015-04-13 16:21:24 -0700899 }
900 }
alshabib10580802015-02-18 18:30:33 -0800901 existing.setLife(group.life());
902 existing.setPackets(group.packets());
903 existing.setBytes(group.bytes());
alshabibb0285992016-03-28 23:30:37 -0700904 existing.setReferenceCount(group.referenceCount());
Srikanth Vavilapalli5428b6c2015-05-14 20:22:47 -0700905 if ((existing.state() == GroupState.PENDING_ADD) ||
alshabibb0285992016-03-28 23:30:37 -0700906 (existing.state() == GroupState.PENDING_ADD_RETRY)) {
Saurav Das0fd79d92016-03-07 10:58:36 -0800907 log.trace("addOrUpdateGroupEntry: group entry {} in device {} moving from {} to ADDED",
alshabibb0285992016-03-28 23:30:37 -0700908 existing.id(),
909 existing.deviceId(),
910 existing.state());
alshabib10580802015-02-18 18:30:33 -0800911 existing.setState(GroupState.ADDED);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700912 existing.setIsGroupStateAddedFirstTime(true);
alshabib10580802015-02-18 18:30:33 -0800913 event = new GroupEvent(Type.GROUP_ADDED, existing);
914 } else {
Saurav Das0fd79d92016-03-07 10:58:36 -0800915 log.trace("addOrUpdateGroupEntry: group entry {} in device {} moving from {} to ADDED",
alshabibb0285992016-03-28 23:30:37 -0700916 existing.id(),
917 existing.deviceId(),
918 GroupState.PENDING_UPDATE);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700919 existing.setState(GroupState.ADDED);
920 existing.setIsGroupStateAddedFirstTime(false);
alshabib10580802015-02-18 18:30:33 -0800921 event = new GroupEvent(Type.GROUP_UPDATED, existing);
922 }
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700923 //Re-PUT map entries to trigger map update events
924 getGroupStoreKeyMap().
alshabibb0285992016-03-28 23:30:37 -0700925 put(new GroupStoreKeyMapKey(existing.deviceId(),
926 existing.appCookie()), existing);
alshabib10580802015-02-18 18:30:33 -0800927 }
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700928 } else {
929 log.warn("addOrUpdateGroupEntry: Group update "
alshabibb0285992016-03-28 23:30:37 -0700930 + "happening for a non-existing entry in the map");
alshabib10580802015-02-18 18:30:33 -0800931 }
932
933 if (event != null) {
934 notifyDelegate(event);
935 }
936 }
937
938 /**
939 * Removes the group entry from store.
940 *
941 * @param group group entry
942 */
943 @Override
944 public void removeGroupEntry(Group group) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700945 StoredGroupEntry existing = getStoredGroupEntry(group.deviceId(),
946 group.id());
alshabib10580802015-02-18 18:30:33 -0800947
948 if (existing != null) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700949 log.debug("removeGroupEntry: removing group entry {} in device {}",
alshabibb0285992016-03-28 23:30:37 -0700950 group.id(),
951 group.deviceId());
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700952 //Removal from groupid based map will happen in the
953 //map update listener
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700954 getGroupStoreKeyMap().remove(new GroupStoreKeyMapKey(existing.deviceId(),
955 existing.appCookie()));
alshabib10580802015-02-18 18:30:33 -0800956 notifyDelegate(new GroupEvent(Type.GROUP_REMOVED, existing));
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700957 } else {
958 log.warn("removeGroupEntry for {} in device{} is "
alshabibb0285992016-03-28 23:30:37 -0700959 + "not existing in our maps",
960 group.id(),
961 group.deviceId());
alshabib10580802015-02-18 18:30:33 -0800962 }
963 }
964
Victor Silva4e8b7832016-08-17 17:11:19 -0300965 private void purgeGroupEntries(Set<Entry<GroupStoreKeyMapKey, StoredGroupEntry>> entries) {
966 entries.forEach(entry -> {
967 groupStoreEntriesByKey.remove(entry.getKey());
968 });
969 }
970
alshabib10580802015-02-18 18:30:33 -0800971 @Override
Charles Chan0c7c43b2016-01-14 17:39:20 -0800972 public void purgeGroupEntry(DeviceId deviceId) {
Victor Silva4e8b7832016-08-17 17:11:19 -0300973 Set<Entry<GroupStoreKeyMapKey, StoredGroupEntry>> entriesPendingRemove =
Charles Chan0c7c43b2016-01-14 17:39:20 -0800974 new HashSet<>();
975
Madan Jampani0b847532016-03-03 13:44:15 -0800976 getGroupStoreKeyMap().entrySet().stream()
Charles Chan0c7c43b2016-01-14 17:39:20 -0800977 .filter(entry -> entry.getKey().deviceId().equals(deviceId))
Victor Silva4e8b7832016-08-17 17:11:19 -0300978 .forEach(entriesPendingRemove::add);
Charles Chan0c7c43b2016-01-14 17:39:20 -0800979
Victor Silva4e8b7832016-08-17 17:11:19 -0300980 purgeGroupEntries(entriesPendingRemove);
981 }
982
983 @Override
984 public void purgeGroupEntries() {
985 purgeGroupEntries(getGroupStoreKeyMap().entrySet());
Charles Chan0c7c43b2016-01-14 17:39:20 -0800986 }
987
988 @Override
alshabib10580802015-02-18 18:30:33 -0800989 public void deviceInitialAuditCompleted(DeviceId deviceId,
990 boolean completed) {
991 synchronized (deviceAuditStatus) {
992 if (completed) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700993 log.debug("AUDIT completed for device {}",
994 deviceId);
alshabib10580802015-02-18 18:30:33 -0800995 deviceAuditStatus.put(deviceId, true);
996 // Execute all pending group requests
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700997 List<StoredGroupEntry> pendingGroupRequests =
998 getPendingGroupKeyTable().values()
alshabibb0285992016-03-28 23:30:37 -0700999 .stream()
1000 .filter(g -> g.deviceId().equals(deviceId))
1001 .collect(Collectors.toList());
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001002 log.debug("processing pending group add requests for device {} and number of pending requests {}",
alshabibb0285992016-03-28 23:30:37 -07001003 deviceId,
1004 pendingGroupRequests.size());
1005 for (Group group : pendingGroupRequests) {
alshabib10580802015-02-18 18:30:33 -08001006 GroupDescription tmp = new DefaultGroupDescription(
1007 group.deviceId(),
1008 group.type(),
1009 group.buckets(),
1010 group.appCookie(),
Saurav Das100e3b82015-04-30 11:12:10 -07001011 group.givenGroupId(),
alshabib10580802015-02-18 18:30:33 -08001012 group.appId());
1013 storeGroupDescriptionInternal(tmp);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001014 getPendingGroupKeyTable().
alshabibb0285992016-03-28 23:30:37 -07001015 remove(new GroupStoreKeyMapKey(deviceId, group.appCookie()));
alshabib10580802015-02-18 18:30:33 -08001016 }
alshabib10580802015-02-18 18:30:33 -08001017 } else {
Thomas Vachuskac40d4632015-04-09 16:55:03 -07001018 Boolean audited = deviceAuditStatus.get(deviceId);
1019 if (audited != null && audited) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001020 log.debug("Clearing AUDIT status for device {}", deviceId);
alshabib10580802015-02-18 18:30:33 -08001021 deviceAuditStatus.put(deviceId, false);
1022 }
1023 }
1024 }
1025 }
1026
1027 @Override
1028 public boolean deviceInitialAuditStatus(DeviceId deviceId) {
1029 synchronized (deviceAuditStatus) {
Thomas Vachuskac40d4632015-04-09 16:55:03 -07001030 Boolean audited = deviceAuditStatus.get(deviceId);
1031 return audited != null && audited;
alshabib10580802015-02-18 18:30:33 -08001032 }
1033 }
1034
1035 @Override
1036 public void groupOperationFailed(DeviceId deviceId, GroupOperation operation) {
1037
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001038 StoredGroupEntry existing = getStoredGroupEntry(deviceId,
1039 operation.groupId());
alshabib10580802015-02-18 18:30:33 -08001040
1041 if (existing == null) {
1042 log.warn("No group entry with ID {} found ", operation.groupId());
1043 return;
1044 }
1045
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001046 log.warn("groupOperationFailed: group operation {} failed"
alshabibb0285992016-03-28 23:30:37 -07001047 + "for group {} in device {} with code {}",
1048 operation.opType(),
1049 existing.id(),
1050 existing.deviceId(),
1051 operation.failureCode());
Saurav Das0fd79d92016-03-07 10:58:36 -08001052 if (operation.failureCode() == GroupOperation.GroupMsgErrorCode.GROUP_EXISTS) {
1053 log.warn("Current extraneous groups in device:{} are: {}",
1054 deviceId,
1055 getExtraneousGroups(deviceId));
Saurav Das8be4e3a2016-03-11 17:19:07 -08001056 if (operation.buckets().equals(existing.buckets())) {
1057 if (existing.state() == GroupState.PENDING_ADD) {
1058 log.info("GROUP_EXISTS: GroupID and Buckets match for group in pending "
alshabibb0285992016-03-28 23:30:37 -07001059 + "add state - moving to ADDED for group {} in device {}",
1060 existing.id(), deviceId);
Saurav Das8be4e3a2016-03-11 17:19:07 -08001061 addOrUpdateGroupEntry(existing);
1062 return;
1063 } else {
1064 log.warn("GROUP EXISTS: Group ID matched but buckets did not. "
alshabibb0285992016-03-28 23:30:37 -07001065 + "Operation: {} Existing: {}", operation.buckets(),
1066 existing.buckets());
Saurav Das8be4e3a2016-03-11 17:19:07 -08001067 }
1068 }
Saurav Das0fd79d92016-03-07 10:58:36 -08001069 }
alshabib10580802015-02-18 18:30:33 -08001070 switch (operation.opType()) {
1071 case ADD:
Srikanth Vavilapalli5428b6c2015-05-14 20:22:47 -07001072 if (existing.state() == GroupState.PENDING_ADD) {
Srikanth Vavilapalli5428b6c2015-05-14 20:22:47 -07001073 notifyDelegate(new GroupEvent(Type.GROUP_ADD_FAILED, existing));
1074 log.warn("groupOperationFailed: cleaningup "
alshabibb0285992016-03-28 23:30:37 -07001075 + "group {} from store in device {}....",
1076 existing.id(),
1077 existing.deviceId());
Srikanth Vavilapalli5428b6c2015-05-14 20:22:47 -07001078 //Removal from groupid based map will happen in the
1079 //map update listener
1080 getGroupStoreKeyMap().remove(new GroupStoreKeyMapKey(existing.deviceId(),
1081 existing.appCookie()));
1082 }
alshabib10580802015-02-18 18:30:33 -08001083 break;
1084 case MODIFY:
1085 notifyDelegate(new GroupEvent(Type.GROUP_UPDATE_FAILED, existing));
1086 break;
1087 case DELETE:
1088 notifyDelegate(new GroupEvent(Type.GROUP_REMOVE_FAILED, existing));
1089 break;
1090 default:
1091 log.warn("Unknown group operation type {}", operation.opType());
1092 }
alshabib10580802015-02-18 18:30:33 -08001093 }
1094
1095 @Override
1096 public void addOrUpdateExtraneousGroupEntry(Group group) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001097 log.debug("add/update extraneous group entry {} in device {}",
alshabibb0285992016-03-28 23:30:37 -07001098 group.id(),
1099 group.deviceId());
alshabib10580802015-02-18 18:30:33 -08001100 ConcurrentMap<GroupId, Group> extraneousIdTable =
1101 getExtraneousGroupIdTable(group.deviceId());
1102 extraneousIdTable.put(group.id(), group);
Srikanth Vavilapallie48b3cf2015-07-06 11:43:07 -07001103 // Don't remove the extraneous groups, instead re-use it when
1104 // a group request comes with the same set of buckets
alshabib10580802015-02-18 18:30:33 -08001105 }
1106
1107 @Override
1108 public void removeExtraneousGroupEntry(Group group) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001109 log.debug("remove extraneous group entry {} of device {} from store",
alshabibb0285992016-03-28 23:30:37 -07001110 group.id(),
1111 group.deviceId());
alshabib10580802015-02-18 18:30:33 -08001112 ConcurrentMap<GroupId, Group> extraneousIdTable =
1113 getExtraneousGroupIdTable(group.deviceId());
1114 extraneousIdTable.remove(group.id());
1115 }
1116
1117 @Override
1118 public Iterable<Group> getExtraneousGroups(DeviceId deviceId) {
1119 // flatten and make iterator unmodifiable
1120 return FluentIterable.from(
1121 getExtraneousGroupIdTable(deviceId).values());
1122 }
1123
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001124 /**
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -07001125 * Map handler to receive any events when the group key map is updated.
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001126 */
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -07001127 private class GroupStoreKeyMapListener implements
Madan Jampani0b847532016-03-03 13:44:15 -08001128 MapEventListener<GroupStoreKeyMapKey, StoredGroupEntry> {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001129
1130 @Override
Madan Jampani0b847532016-03-03 13:44:15 -08001131 public void event(MapEvent<GroupStoreKeyMapKey, StoredGroupEntry> mapEvent) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001132 GroupEvent groupEvent = null;
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001133 GroupStoreKeyMapKey key = mapEvent.key();
Madan Jampani0b847532016-03-03 13:44:15 -08001134 StoredGroupEntry group = Versioned.valueOrNull(mapEvent.newValue());
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001135 if ((key == null) && (group == null)) {
1136 log.error("GroupStoreKeyMapListener: Received "
alshabibb0285992016-03-28 23:30:37 -07001137 + "event {} with null entry", mapEvent.type());
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001138 return;
1139 } else if (group == null) {
1140 group = getGroupIdTable(key.deviceId()).values()
1141 .stream()
1142 .filter((storedGroup) -> (storedGroup.appCookie().equals(key.appCookie)))
Yuta HIGUCHI6e5f4702016-11-21 11:42:11 -08001143 .findFirst().orElse(null);
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001144 if (group == null) {
1145 log.error("GroupStoreKeyMapListener: Received "
alshabibb0285992016-03-28 23:30:37 -07001146 + "event {} with null entry... can not process", mapEvent.type());
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001147 return;
1148 }
1149 }
1150 log.trace("received groupid map event {} for id {} in device {}",
1151 mapEvent.type(),
1152 group.id(),
jaegonkim68e080c2016-12-01 22:31:01 +09001153 (key != null ? key.deviceId() : null));
Madan Jampani0b847532016-03-03 13:44:15 -08001154 if (mapEvent.type() == MapEvent.Type.INSERT || mapEvent.type() == MapEvent.Type.UPDATE) {
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -07001155 // Update the group ID table
1156 getGroupIdTable(group.deviceId()).put(group.id(), group);
Madan Jampani0b847532016-03-03 13:44:15 -08001157 StoredGroupEntry value = Versioned.valueOrNull(mapEvent.newValue());
1158 if (value.state() == Group.GroupState.ADDED) {
1159 if (value.isGroupStateAddedFirstTime()) {
1160 groupEvent = new GroupEvent(Type.GROUP_ADDED, value);
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001161 log.trace("Received first time GROUP_ADDED state update for id {} in device {}",
alshabibb0285992016-03-28 23:30:37 -07001162 group.id(),
1163 group.deviceId());
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001164 } else {
Madan Jampani0b847532016-03-03 13:44:15 -08001165 groupEvent = new GroupEvent(Type.GROUP_UPDATED, value);
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001166 log.trace("Received following GROUP_ADDED state update for id {} in device {}",
alshabibb0285992016-03-28 23:30:37 -07001167 group.id(),
1168 group.deviceId());
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001169 }
1170 }
Madan Jampani0b847532016-03-03 13:44:15 -08001171 } else if (mapEvent.type() == MapEvent.Type.REMOVE) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001172 groupEvent = new GroupEvent(Type.GROUP_REMOVED, group);
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -07001173 // Remove the entry from the group ID table
1174 getGroupIdTable(group.deviceId()).remove(group.id(), group);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001175 }
1176
1177 if (groupEvent != null) {
1178 notifyDelegate(groupEvent);
1179 }
1180 }
1181 }
Madan Jampani01e05fb2015-08-13 13:29:36 -07001182
helenyrwua1c41152016-08-18 16:16:14 -07001183 private void processGroupMessage(GroupStoreMessage message) {
1184 if (message.type() == GroupStoreMessage.Type.FAILOVER) {
1185 // FIXME: groupStoreEntriesByKey inaccessible here
1186 getGroupIdTable(message.deviceId()).values()
1187 .stream()
1188 .filter((storedGroup) -> (storedGroup.appCookie().equals(message.appCookie())))
1189 .findFirst().ifPresent(group -> notifyDelegate(new GroupEvent(Type.GROUP_BUCKET_FAILOVER, group)));
1190 }
1191 }
1192
Madan Jampani01e05fb2015-08-13 13:29:36 -07001193 private void process(GroupStoreMessage groupOp) {
1194 log.debug("Received remote group operation {} request for device {}",
alshabibb0285992016-03-28 23:30:37 -07001195 groupOp.type(),
1196 groupOp.deviceId());
1197 if (!mastershipService.isLocalMaster(groupOp.deviceId())) {
1198 log.warn("This node is not MASTER for device {}", groupOp.deviceId());
1199 return;
1200 }
1201 if (groupOp.type() == GroupStoreMessage.Type.ADD) {
1202 storeGroupDescriptionInternal(groupOp.groupDesc());
1203 } else if (groupOp.type() == GroupStoreMessage.Type.UPDATE) {
1204 updateGroupDescriptionInternal(groupOp.deviceId(),
1205 groupOp.appCookie(),
1206 groupOp.updateType(),
1207 groupOp.updateBuckets(),
1208 groupOp.newAppCookie());
1209 } else if (groupOp.type() == GroupStoreMessage.Type.DELETE) {
1210 deleteGroupDescriptionInternal(groupOp.deviceId(),
1211 groupOp.appCookie());
1212 }
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001213 }
1214
1215 /**
1216 * Flattened map key to be used to store group entries.
1217 */
Ray Milkeyb3c5ce22015-08-10 09:07:36 -07001218 protected static class GroupStoreMapKey {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001219 private final DeviceId deviceId;
1220
1221 public GroupStoreMapKey(DeviceId deviceId) {
1222 this.deviceId = deviceId;
1223 }
1224
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001225 public DeviceId deviceId() {
1226 return deviceId;
1227 }
1228
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001229 @Override
1230 public boolean equals(Object o) {
1231 if (this == o) {
1232 return true;
1233 }
1234 if (!(o instanceof GroupStoreMapKey)) {
1235 return false;
1236 }
1237 GroupStoreMapKey that = (GroupStoreMapKey) o;
1238 return this.deviceId.equals(that.deviceId);
1239 }
1240
1241 @Override
1242 public int hashCode() {
1243 int result = 17;
1244
1245 result = 31 * result + Objects.hash(this.deviceId);
1246
1247 return result;
1248 }
1249 }
1250
Ray Milkeyb3c5ce22015-08-10 09:07:36 -07001251 protected static class GroupStoreKeyMapKey extends GroupStoreMapKey {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001252 private final GroupKey appCookie;
alshabibb0285992016-03-28 23:30:37 -07001253
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001254 public GroupStoreKeyMapKey(DeviceId deviceId,
1255 GroupKey appCookie) {
1256 super(deviceId);
1257 this.appCookie = appCookie;
1258 }
1259
1260 @Override
1261 public boolean equals(Object o) {
1262 if (this == o) {
1263 return true;
1264 }
1265 if (!(o instanceof GroupStoreKeyMapKey)) {
1266 return false;
1267 }
1268 GroupStoreKeyMapKey that = (GroupStoreKeyMapKey) o;
1269 return (super.equals(that) &&
1270 this.appCookie.equals(that.appCookie));
1271 }
1272
1273 @Override
1274 public int hashCode() {
1275 int result = 17;
1276
1277 result = 31 * result + super.hashCode() + Objects.hash(this.appCookie);
1278
1279 return result;
1280 }
1281 }
1282
Ray Milkeyb3c5ce22015-08-10 09:07:36 -07001283 protected static class GroupStoreIdMapKey extends GroupStoreMapKey {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001284 private final GroupId groupId;
alshabibb0285992016-03-28 23:30:37 -07001285
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001286 public GroupStoreIdMapKey(DeviceId deviceId,
1287 GroupId groupId) {
1288 super(deviceId);
1289 this.groupId = groupId;
1290 }
1291
1292 @Override
1293 public boolean equals(Object o) {
1294 if (this == o) {
1295 return true;
1296 }
1297 if (!(o instanceof GroupStoreIdMapKey)) {
1298 return false;
1299 }
1300 GroupStoreIdMapKey that = (GroupStoreIdMapKey) o;
1301 return (super.equals(that) &&
1302 this.groupId.equals(that.groupId));
1303 }
1304
1305 @Override
1306 public int hashCode() {
1307 int result = 17;
1308
1309 result = 31 * result + super.hashCode() + Objects.hash(this.groupId);
1310
1311 return result;
1312 }
1313 }
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001314
1315 @Override
1316 public void pushGroupMetrics(DeviceId deviceId,
1317 Collection<Group> groupEntries) {
1318 boolean deviceInitialAuditStatus =
1319 deviceInitialAuditStatus(deviceId);
1320 Set<Group> southboundGroupEntries =
1321 Sets.newHashSet(groupEntries);
1322 Set<StoredGroupEntry> storedGroupEntries =
1323 Sets.newHashSet(getStoredGroups(deviceId));
1324 Set<Group> extraneousStoredEntries =
1325 Sets.newHashSet(getExtraneousGroups(deviceId));
1326
Sho SHIMIZU695bac62016-08-15 12:41:59 -07001327 if (log.isTraceEnabled()) {
1328 log.trace("pushGroupMetrics: Displaying all ({}) southboundGroupEntries for device {}",
1329 southboundGroupEntries.size(),
1330 deviceId);
1331 for (Group group : southboundGroupEntries) {
1332 log.trace("Group {} in device {}", group, deviceId);
1333 }
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001334
Sho SHIMIZU695bac62016-08-15 12:41:59 -07001335 log.trace("Displaying all ({}) stored group entries for device {}",
1336 storedGroupEntries.size(),
1337 deviceId);
1338 for (StoredGroupEntry group : storedGroupEntries) {
1339 log.trace("Stored Group {} for device {}", group, deviceId);
1340 }
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001341 }
1342
alshabibb0285992016-03-28 23:30:37 -07001343 garbageCollect(deviceId, southboundGroupEntries, storedGroupEntries);
1344
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001345 for (Iterator<Group> it2 = southboundGroupEntries.iterator(); it2.hasNext();) {
1346 Group group = it2.next();
1347 if (storedGroupEntries.remove(group)) {
1348 // we both have the group, let's update some info then.
1349 log.trace("Group AUDIT: group {} exists in both planes for device {}",
alshabibb0285992016-03-28 23:30:37 -07001350 group.id(), deviceId);
1351
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001352 groupAdded(group);
1353 it2.remove();
1354 }
1355 }
1356 for (Group group : southboundGroupEntries) {
1357 if (getGroup(group.deviceId(), group.id()) != null) {
1358 // There is a group existing with the same id
1359 // It is possible that group update is
1360 // in progress while we got a stale info from switch
1361 if (!storedGroupEntries.remove(getGroup(
alshabibb0285992016-03-28 23:30:37 -07001362 group.deviceId(), group.id()))) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001363 log.warn("Group AUDIT: Inconsistent state:"
alshabibb0285992016-03-28 23:30:37 -07001364 + "Group exists in ID based table while "
1365 + "not present in key based table");
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001366 }
1367 } else {
1368 // there are groups in the switch that aren't in the store
1369 log.debug("Group AUDIT: extraneous group {} exists in data plane for device {}",
alshabibb0285992016-03-28 23:30:37 -07001370 group.id(), deviceId);
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001371 extraneousStoredEntries.remove(group);
1372 extraneousGroup(group);
1373 }
1374 }
1375 for (Group group : storedGroupEntries) {
1376 // there are groups in the store that aren't in the switch
1377 log.debug("Group AUDIT: group {} missing in data plane for device {}",
alshabibb0285992016-03-28 23:30:37 -07001378 group.id(), deviceId);
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001379 groupMissing(group);
1380 }
1381 for (Group group : extraneousStoredEntries) {
1382 // there are groups in the extraneous store that
1383 // aren't in the switch
Saurav Das0fd79d92016-03-07 10:58:36 -08001384 log.debug("Group AUDIT: clearing extraneous group {} from store for device {}",
alshabibb0285992016-03-28 23:30:37 -07001385 group.id(), deviceId);
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001386 removeExtraneousGroupEntry(group);
1387 }
1388
1389 if (!deviceInitialAuditStatus) {
Saurav Das0fd79d92016-03-07 10:58:36 -08001390 log.info("Group AUDIT: Setting device {} initial AUDIT completed",
alshabibb0285992016-03-28 23:30:37 -07001391 deviceId);
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001392 deviceInitialAuditCompleted(deviceId, true);
1393 }
1394 }
1395
helenyrwu89470f12016-08-12 13:18:10 -07001396 @Override
1397 public void notifyOfFailovers(Collection<Group> failoverGroups) {
helenyrwu89470f12016-08-12 13:18:10 -07001398 failoverGroups.forEach(group -> {
1399 if (group.type() == Group.Type.FAILOVER) {
helenyrwua1c41152016-08-18 16:16:14 -07001400 groupTopic.publish(GroupStoreMessage.createGroupFailoverMsg(
1401 group.deviceId(), group));
helenyrwu89470f12016-08-12 13:18:10 -07001402 }
1403 });
helenyrwu89470f12016-08-12 13:18:10 -07001404 }
1405
alshabibb0285992016-03-28 23:30:37 -07001406 private void garbageCollect(DeviceId deviceId,
1407 Set<Group> southboundGroupEntries,
1408 Set<StoredGroupEntry> storedGroupEntries) {
1409 if (!garbageCollect) {
1410 return;
1411 }
1412
1413 Iterator<StoredGroupEntry> it = storedGroupEntries.iterator();
1414 while (it.hasNext()) {
1415 StoredGroupEntry group = it.next();
1416 if (group.state() != GroupState.PENDING_DELETE && checkGroupRefCount(group)) {
1417 log.debug("Garbage collecting group {} on {}", group, deviceId);
1418 deleteGroupDescription(deviceId, group.appCookie());
1419 southboundGroupEntries.remove(group);
1420 it.remove();
1421 }
1422 }
1423 }
1424
1425 private boolean checkGroupRefCount(Group group) {
1426 return (group.referenceCount() == 0 && group.age() >= gcThresh);
1427 }
1428
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001429 private void groupMissing(Group group) {
1430 switch (group.state()) {
1431 case PENDING_DELETE:
1432 log.debug("Group {} delete confirmation from device {}",
1433 group, group.deviceId());
1434 removeGroupEntry(group);
1435 break;
1436 case ADDED:
1437 case PENDING_ADD:
Srikanth Vavilapalli5428b6c2015-05-14 20:22:47 -07001438 case PENDING_ADD_RETRY:
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001439 case PENDING_UPDATE:
1440 log.debug("Group {} is in store but not on device {}",
1441 group, group.deviceId());
1442 StoredGroupEntry existing =
1443 getStoredGroupEntry(group.deviceId(), group.id());
Srikanth Vavilapalli5428b6c2015-05-14 20:22:47 -07001444 log.debug("groupMissing: group entry {} in device {} moving from {} to PENDING_ADD_RETRY",
alshabibb0285992016-03-28 23:30:37 -07001445 existing.id(),
1446 existing.deviceId(),
1447 existing.state());
Srikanth Vavilapalli5428b6c2015-05-14 20:22:47 -07001448 existing.setState(Group.GroupState.PENDING_ADD_RETRY);
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001449 //Re-PUT map entries to trigger map update events
1450 getGroupStoreKeyMap().
alshabibb0285992016-03-28 23:30:37 -07001451 put(new GroupStoreKeyMapKey(existing.deviceId(),
1452 existing.appCookie()), existing);
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001453 notifyDelegate(new GroupEvent(GroupEvent.Type.GROUP_ADD_REQUESTED,
1454 group));
1455 break;
1456 default:
1457 log.debug("Group {} has not been installed.", group);
1458 break;
1459 }
1460 }
1461
1462 private void extraneousGroup(Group group) {
Saurav Das0fd79d92016-03-07 10:58:36 -08001463 log.trace("Group {} is on device {} but not in store.",
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001464 group, group.deviceId());
1465 addOrUpdateExtraneousGroupEntry(group);
1466 }
1467
1468 private void groupAdded(Group group) {
1469 log.trace("Group {} Added or Updated in device {}",
1470 group, group.deviceId());
1471 addOrUpdateGroupEntry(group);
1472 }
alshabib10580802015-02-18 18:30:33 -08001473}