blob: 2fd45f7b65b63ce5610c2c3feb5265b6d467afda [file] [log] [blame]
alshabib10580802015-02-18 18:30:33 -08001/*
Brian O'Connor5ab426f2016-04-09 01:19:45 -07002 * Copyright 2015-present Open Networking Laboratory
alshabib10580802015-02-18 18:30:33 -08003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.group.impl;
17
Jonathan Hart6ec029a2015-03-24 17:12:35 -070018import com.google.common.collect.FluentIterable;
Charles Chanf4838a72015-12-07 18:13:45 -080019import com.google.common.collect.ImmutableSet;
Jonathan Hart6ec029a2015-03-24 17:12:35 -070020import com.google.common.collect.Iterables;
Srikanth Vavilapalli23181912015-05-04 09:48:09 -070021import com.google.common.collect.Sets;
alshabib10580802015-02-18 18:30:33 -080022import org.apache.felix.scr.annotations.Activate;
23import org.apache.felix.scr.annotations.Component;
24import org.apache.felix.scr.annotations.Deactivate;
alshabibb0285992016-03-28 23:30:37 -070025import org.apache.felix.scr.annotations.Modified;
26import org.apache.felix.scr.annotations.Property;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070027import org.apache.felix.scr.annotations.Reference;
28import org.apache.felix.scr.annotations.ReferenceCardinality;
alshabib10580802015-02-18 18:30:33 -080029import org.apache.felix.scr.annotations.Service;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070030import org.onlab.util.KryoNamespace;
alshabibb0285992016-03-28 23:30:37 -070031import org.onosproject.cfg.ComponentConfigService;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070032import org.onosproject.cluster.ClusterService;
Charles Chanf4838a72015-12-07 18:13:45 -080033import org.onosproject.cluster.NodeId;
alshabib10580802015-02-18 18:30:33 -080034import org.onosproject.core.GroupId;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070035import org.onosproject.mastership.MastershipService;
alshabib10580802015-02-18 18:30:33 -080036import org.onosproject.net.DeviceId;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070037import org.onosproject.net.MastershipRole;
alshabib10580802015-02-18 18:30:33 -080038import org.onosproject.net.group.DefaultGroup;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070039import org.onosproject.net.group.DefaultGroupBucket;
alshabib10580802015-02-18 18:30:33 -080040import org.onosproject.net.group.DefaultGroupDescription;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070041import org.onosproject.net.group.DefaultGroupKey;
alshabib10580802015-02-18 18:30:33 -080042import org.onosproject.net.group.Group;
43import org.onosproject.net.group.Group.GroupState;
44import org.onosproject.net.group.GroupBucket;
45import org.onosproject.net.group.GroupBuckets;
46import org.onosproject.net.group.GroupDescription;
47import org.onosproject.net.group.GroupEvent;
48import org.onosproject.net.group.GroupEvent.Type;
49import org.onosproject.net.group.GroupKey;
50import org.onosproject.net.group.GroupOperation;
51import org.onosproject.net.group.GroupStore;
52import org.onosproject.net.group.GroupStoreDelegate;
Srikanth Vavilapalli10e75cd2015-04-13 16:21:24 -070053import org.onosproject.net.group.StoredGroupBucketEntry;
alshabib10580802015-02-18 18:30:33 -080054import org.onosproject.net.group.StoredGroupEntry;
55import org.onosproject.store.AbstractStore;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070056import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070057import org.onosproject.store.serializers.KryoNamespaces;
Madan Jampani0b847532016-03-03 13:44:15 -080058import org.onosproject.store.service.ConsistentMap;
59import org.onosproject.store.service.MapEvent;
60import org.onosproject.store.service.MapEventListener;
alshabibb0285992016-03-28 23:30:37 -070061import org.onosproject.store.service.MultiValuedTimestamp;
Madan Jampani0b847532016-03-03 13:44:15 -080062import org.onosproject.store.service.Serializer;
Jonathan Hart6ec029a2015-03-24 17:12:35 -070063import org.onosproject.store.service.StorageService;
helenyrwua1c41152016-08-18 16:16:14 -070064import org.onosproject.store.service.Topic;
Madan Jampani0b847532016-03-03 13:44:15 -080065import org.onosproject.store.service.Versioned;
Kavitha Alagesanc884c3ef2017-01-19 12:32:26 +053066import org.onosproject.store.service.DistributedPrimitive.Status;
alshabibb0285992016-03-28 23:30:37 -070067import org.osgi.service.component.ComponentContext;
alshabib10580802015-02-18 18:30:33 -080068import org.slf4j.Logger;
69
Jonathan Hart6ec029a2015-03-24 17:12:35 -070070import java.util.ArrayList;
Srikanth Vavilapalli23181912015-05-04 09:48:09 -070071import java.util.Collection;
Charles Chanf4838a72015-12-07 18:13:45 -080072import java.util.Collections;
alshabibb0285992016-03-28 23:30:37 -070073import java.util.Dictionary;
Jonathan Hart6ec029a2015-03-24 17:12:35 -070074import java.util.HashMap;
Charles Chan0c7c43b2016-01-14 17:39:20 -080075import java.util.HashSet;
Srikanth Vavilapalli23181912015-05-04 09:48:09 -070076import java.util.Iterator;
Jonathan Hart6ec029a2015-03-24 17:12:35 -070077import java.util.List;
Madan Jampani0b847532016-03-03 13:44:15 -080078import java.util.Map;
Charles Chan0c7c43b2016-01-14 17:39:20 -080079import java.util.Map.Entry;
Jonathan Hart6ec029a2015-03-24 17:12:35 -070080import java.util.Objects;
Sho SHIMIZU30d639b2015-05-05 09:30:35 -070081import java.util.Optional;
alshabibb0285992016-03-28 23:30:37 -070082import java.util.Properties;
Srikanth Vavilapalli23181912015-05-04 09:48:09 -070083import java.util.Set;
Jonathan Hart6ec029a2015-03-24 17:12:35 -070084import java.util.concurrent.ConcurrentHashMap;
85import java.util.concurrent.ConcurrentMap;
86import java.util.concurrent.ExecutorService;
87import java.util.concurrent.Executors;
Kavitha Alagesanc884c3ef2017-01-19 12:32:26 +053088import java.util.concurrent.ScheduledExecutorService;
Jonathan Hart6ec029a2015-03-24 17:12:35 -070089import java.util.concurrent.atomic.AtomicInteger;
Kavitha Alagesanc884c3ef2017-01-19 12:32:26 +053090import java.util.function.Consumer;
Jonathan Hart6ec029a2015-03-24 17:12:35 -070091import java.util.stream.Collectors;
92
alshabibb0285992016-03-28 23:30:37 -070093import static com.google.common.base.Strings.isNullOrEmpty;
Kavitha Alagesanc884c3ef2017-01-19 12:32:26 +053094import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
alshabibb0285992016-03-28 23:30:37 -070095import static org.onlab.util.Tools.get;
Jonathan Hart6ec029a2015-03-24 17:12:35 -070096import static org.onlab.util.Tools.groupedThreads;
97import static org.slf4j.LoggerFactory.getLogger;
alshabib10580802015-02-18 18:30:33 -080098
99/**
Saurav Das0fd79d92016-03-07 10:58:36 -0800100 * Manages inventory of group entries using distributed group stores from the
101 * storage service.
alshabib10580802015-02-18 18:30:33 -0800102 */
103@Component(immediate = true)
104@Service
105public class DistributedGroupStore
106 extends AbstractStore<GroupEvent, GroupStoreDelegate>
107 implements GroupStore {
108
109 private final Logger log = getLogger(getClass());
110
alshabibb0285992016-03-28 23:30:37 -0700111 private static final boolean GARBAGE_COLLECT = false;
112 private static final int GC_THRESH = 6;
Kavitha Alagesanc56cded2017-01-13 10:48:18 +0530113 private static final boolean ALLOW_EXTRANEOUS_GROUPS = true;
alshabibb0285992016-03-28 23:30:37 -0700114
alshabib10580802015-02-18 18:30:33 -0800115 private final int dummyId = 0xffffffff;
Yi Tsengfa394de2017-02-01 11:26:40 -0800116 private final GroupId dummyGroupId = new GroupId(dummyId);
alshabib10580802015-02-18 18:30:33 -0800117
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700118 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
119 protected ClusterCommunicationService clusterCommunicator;
120
121 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
122 protected ClusterService clusterService;
123
124 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700125 protected StorageService storageService;
126
127 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700128 protected MastershipService mastershipService;
129
alshabibb0285992016-03-28 23:30:37 -0700130 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
131 protected ComponentConfigService cfgService;
132
Kavitha Alagesanc884c3ef2017-01-19 12:32:26 +0530133 private ScheduledExecutorService executor;
134 private Consumer<Status> statusChangeListener;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700135 // Per device group table with (device id + app cookie) as key
Madan Jampani0b847532016-03-03 13:44:15 -0800136 private ConsistentMap<GroupStoreKeyMapKey,
alshabibb0285992016-03-28 23:30:37 -0700137 StoredGroupEntry> groupStoreEntriesByKey = null;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700138 // Per device group table with (device id + group id) as key
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700139 private final ConcurrentMap<DeviceId, ConcurrentMap<GroupId, StoredGroupEntry>>
alshabibb0285992016-03-28 23:30:37 -0700140 groupEntriesById = new ConcurrentHashMap<>();
Madan Jampani0b847532016-03-03 13:44:15 -0800141 private ConsistentMap<GroupStoreKeyMapKey,
alshabibb0285992016-03-28 23:30:37 -0700142 StoredGroupEntry> auditPendingReqQueue = null;
Frank Wange0eb5ce2016-07-01 18:21:25 +0800143 private MapEventListener<GroupStoreKeyMapKey, StoredGroupEntry>
144 mapListener = new GroupStoreKeyMapListener();
alshabib10580802015-02-18 18:30:33 -0800145 private final ConcurrentMap<DeviceId, ConcurrentMap<GroupId, Group>>
146 extraneousGroupEntriesById = new ConcurrentHashMap<>();
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700147 private ExecutorService messageHandlingExecutor;
148 private static final int MESSAGE_HANDLER_THREAD_POOL_SIZE = 1;
Sho SHIMIZU7a4087b2015-09-10 09:23:16 -0700149 private final HashMap<DeviceId, Boolean> deviceAuditStatus = new HashMap<>();
alshabib10580802015-02-18 18:30:33 -0800150
151 private final AtomicInteger groupIdGen = new AtomicInteger();
152
HIGUCHI Yuta180d70f2015-10-01 16:13:56 -0700153 private KryoNamespace clusterMsgSerializer;
154
helenyrwua1c41152016-08-18 16:16:14 -0700155 private static Topic<GroupStoreMessage> groupTopic;
156
alshabibb0285992016-03-28 23:30:37 -0700157 @Property(name = "garbageCollect", boolValue = GARBAGE_COLLECT,
158 label = "Enable group garbage collection")
159 private boolean garbageCollect = GARBAGE_COLLECT;
160
161 @Property(name = "gcThresh", intValue = GC_THRESH,
162 label = "Number of rounds for group garbage collection")
163 private int gcThresh = GC_THRESH;
164
Kavitha Alagesanc56cded2017-01-13 10:48:18 +0530165 @Property(name = "allowExtraneousGroups", boolValue = ALLOW_EXTRANEOUS_GROUPS,
166 label = "Allow groups in switches not installed by ONOS")
167 private boolean allowExtraneousGroups = ALLOW_EXTRANEOUS_GROUPS;
alshabibb0285992016-03-28 23:30:37 -0700168
alshabib10580802015-02-18 18:30:33 -0800169 @Activate
170 public void activate() {
alshabibb0285992016-03-28 23:30:37 -0700171 cfgService.registerProperties(getClass());
HIGUCHI Yuta3a84b322016-05-18 13:38:07 -0700172 KryoNamespace.Builder kryoBuilder = new KryoNamespace.Builder()
alshabibb0285992016-03-28 23:30:37 -0700173 .register(KryoNamespaces.API)
HIGUCHI Yuta3a84b322016-05-18 13:38:07 -0700174 .nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID)
alshabibb0285992016-03-28 23:30:37 -0700175 .register(DefaultGroup.class,
176 DefaultGroupBucket.class,
177 DefaultGroupDescription.class,
178 DefaultGroupKey.class,
179 GroupDescription.Type.class,
180 Group.GroupState.class,
181 GroupBuckets.class,
alshabibb0285992016-03-28 23:30:37 -0700182 GroupStoreMessage.class,
183 GroupStoreMessage.Type.class,
184 UpdateType.class,
185 GroupStoreMessageSubjects.class,
186 MultiValuedTimestamp.class,
187 GroupStoreKeyMapKey.class,
188 GroupStoreIdMapKey.class,
189 GroupStoreMapKey.class
190 );
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700191
HIGUCHI Yuta3a84b322016-05-18 13:38:07 -0700192 clusterMsgSerializer = kryoBuilder.build("GroupStore");
193 Serializer serializer = Serializer.using(clusterMsgSerializer);
HIGUCHI Yuta180d70f2015-10-01 16:13:56 -0700194
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700195 messageHandlingExecutor = Executors.
196 newFixedThreadPool(MESSAGE_HANDLER_THREAD_POOL_SIZE,
197 groupedThreads("onos/store/group",
HIGUCHI Yutad9e01052016-04-14 09:31:42 -0700198 "message-handlers",
199 log));
Madan Jampani01e05fb2015-08-13 13:29:36 -0700200
201 clusterCommunicator.addSubscriber(GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST,
alshabibb0285992016-03-28 23:30:37 -0700202 clusterMsgSerializer::deserialize,
203 this::process,
204 messageHandlingExecutor);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700205
Madan Jampani0b847532016-03-03 13:44:15 -0800206 log.debug("Creating Consistent map onos-group-store-keymap");
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700207
Madan Jampani0b847532016-03-03 13:44:15 -0800208 groupStoreEntriesByKey = storageService.<GroupStoreKeyMapKey, StoredGroupEntry>consistentMapBuilder()
209 .withName("onos-group-store-keymap")
HIGUCHI Yuta3a84b322016-05-18 13:38:07 -0700210 .withSerializer(serializer)
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700211 .build();
Frank Wange0eb5ce2016-07-01 18:21:25 +0800212 groupStoreEntriesByKey.addListener(mapListener);
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700213 log.debug("Current size of groupstorekeymap:{}",
214 groupStoreEntriesByKey.size());
Thiago Santosfb73c502016-08-18 18:15:13 -0300215 synchronizeGroupStoreEntries();
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700216
Kavitha Alagesanc884c3ef2017-01-19 12:32:26 +0530217 log.debug("Creating GroupStoreId Map From GroupStoreKey Map");
218 matchGroupEntries();
219 executor = newSingleThreadScheduledExecutor(groupedThreads("onos/group", "store", log));
220 statusChangeListener = status -> {
221 if (status == Status.ACTIVE) {
222 executor.execute(this::matchGroupEntries);
223 }
224 };
225 groupStoreEntriesByKey.addStatusChangeListener(statusChangeListener);
226
Madan Jampani0b847532016-03-03 13:44:15 -0800227 log.debug("Creating Consistent map pendinggroupkeymap");
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700228
Madan Jampani0b847532016-03-03 13:44:15 -0800229 auditPendingReqQueue = storageService.<GroupStoreKeyMapKey, StoredGroupEntry>consistentMapBuilder()
230 .withName("onos-pending-group-keymap")
HIGUCHI Yuta3a84b322016-05-18 13:38:07 -0700231 .withSerializer(serializer)
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700232 .build();
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700233 log.debug("Current size of pendinggroupkeymap:{}",
234 auditPendingReqQueue.size());
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700235
helenyrwua1c41152016-08-18 16:16:14 -0700236 groupTopic = getOrCreateGroupTopic(serializer);
237 groupTopic.subscribe(this::processGroupMessage);
238
alshabib10580802015-02-18 18:30:33 -0800239 log.info("Started");
240 }
241
242 @Deactivate
243 public void deactivate() {
Frank Wange0eb5ce2016-07-01 18:21:25 +0800244 groupStoreEntriesByKey.removeListener(mapListener);
alshabibb0285992016-03-28 23:30:37 -0700245 cfgService.unregisterProperties(getClass(), false);
HIGUCHI Yuta180d70f2015-10-01 16:13:56 -0700246 clusterCommunicator.removeSubscriber(GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST);
alshabib10580802015-02-18 18:30:33 -0800247 log.info("Stopped");
248 }
249
alshabibb0285992016-03-28 23:30:37 -0700250 @Modified
251 public void modified(ComponentContext context) {
252 Dictionary<?, ?> properties = context != null ? context.getProperties() : new Properties();
253
254 try {
255 String s = get(properties, "garbageCollect");
256 garbageCollect = isNullOrEmpty(s) ? GARBAGE_COLLECT : Boolean.parseBoolean(s.trim());
257
258 s = get(properties, "gcThresh");
259 gcThresh = isNullOrEmpty(s) ? GC_THRESH : Integer.parseInt(s.trim());
Kavitha Alagesanc56cded2017-01-13 10:48:18 +0530260
261 s = get(properties, "allowExtraneousGroups");
262 allowExtraneousGroups = isNullOrEmpty(s) ? ALLOW_EXTRANEOUS_GROUPS : Boolean.parseBoolean(s.trim());
alshabibb0285992016-03-28 23:30:37 -0700263 } catch (Exception e) {
264 gcThresh = GC_THRESH;
265 garbageCollect = GARBAGE_COLLECT;
Kavitha Alagesanc56cded2017-01-13 10:48:18 +0530266 allowExtraneousGroups = ALLOW_EXTRANEOUS_GROUPS;
alshabibb0285992016-03-28 23:30:37 -0700267 }
268 }
269
helenyrwua1c41152016-08-18 16:16:14 -0700270 private Topic<GroupStoreMessage> getOrCreateGroupTopic(Serializer serializer) {
271 if (groupTopic == null) {
272 return storageService.getTopic("group-failover-notif", serializer);
273 } else {
274 return groupTopic;
275 }
Sho SHIMIZUa6285542017-01-12 15:08:24 -0800276 }
helenyrwua1c41152016-08-18 16:16:14 -0700277
alshabib10580802015-02-18 18:30:33 -0800278 /**
Kavitha Alagesanc884c3ef2017-01-19 12:32:26 +0530279 * Updating values of groupEntriesById.
280 */
281 private void matchGroupEntries() {
282 for (Entry<GroupStoreKeyMapKey, StoredGroupEntry> entry : groupStoreEntriesByKey.asJavaMap().entrySet()) {
283 StoredGroupEntry group = entry.getValue();
284 getGroupIdTable(entry.getKey().deviceId()).put(group.id(), group);
285 }
286 }
287
Thiago Santosfb73c502016-08-18 18:15:13 -0300288
289 private void synchronizeGroupStoreEntries() {
290 Map<GroupStoreKeyMapKey, StoredGroupEntry> groupEntryMap = groupStoreEntriesByKey.asJavaMap();
291 for (Entry<GroupStoreKeyMapKey, StoredGroupEntry> entry : groupEntryMap.entrySet()) {
Thiago Santosfb73c502016-08-18 18:15:13 -0300292 StoredGroupEntry value = entry.getValue();
Thiago Santosfb73c502016-08-18 18:15:13 -0300293 ConcurrentMap<GroupId, StoredGroupEntry> groupIdTable = getGroupIdTable(value.deviceId());
294 groupIdTable.put(value.id(), value);
295 }
296 }
297
Kavitha Alagesanc884c3ef2017-01-19 12:32:26 +0530298 /**
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700299 * Returns the group store eventual consistent key map.
alshabib10580802015-02-18 18:30:33 -0800300 *
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700301 * @return Map representing group key table.
alshabib10580802015-02-18 18:30:33 -0800302 */
Madan Jampani0b847532016-03-03 13:44:15 -0800303 private Map<GroupStoreKeyMapKey, StoredGroupEntry>
alshabibb0285992016-03-28 23:30:37 -0700304 getGroupStoreKeyMap() {
Madan Jampani0b847532016-03-03 13:44:15 -0800305 return groupStoreEntriesByKey.asJavaMap();
alshabib10580802015-02-18 18:30:33 -0800306 }
307
308 /**
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700309 * Returns the group id table for specified device.
alshabib10580802015-02-18 18:30:33 -0800310 *
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700311 * @param deviceId identifier of the device
312 * @return Map representing group key table of given device.
alshabib10580802015-02-18 18:30:33 -0800313 */
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700314 private ConcurrentMap<GroupId, StoredGroupEntry> getGroupIdTable(DeviceId deviceId) {
Yuta HIGUCHIc2e68152016-08-16 13:47:36 -0700315 return groupEntriesById.computeIfAbsent(deviceId, k -> new ConcurrentHashMap<>());
alshabib10580802015-02-18 18:30:33 -0800316 }
317
318 /**
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700319 * Returns the pending group request table.
alshabib10580802015-02-18 18:30:33 -0800320 *
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700321 * @return Map representing group key table.
alshabib10580802015-02-18 18:30:33 -0800322 */
Madan Jampani0b847532016-03-03 13:44:15 -0800323 private Map<GroupStoreKeyMapKey, StoredGroupEntry>
alshabibb0285992016-03-28 23:30:37 -0700324 getPendingGroupKeyTable() {
Madan Jampani0b847532016-03-03 13:44:15 -0800325 return auditPendingReqQueue.asJavaMap();
alshabib10580802015-02-18 18:30:33 -0800326 }
327
328 /**
329 * Returns the extraneous group id table for specified device.
330 *
331 * @param deviceId identifier of the device
332 * @return Map representing group key table of given device.
333 */
334 private ConcurrentMap<GroupId, Group>
335 getExtraneousGroupIdTable(DeviceId deviceId) {
Yuta HIGUCHIc2e68152016-08-16 13:47:36 -0700336 return extraneousGroupEntriesById.computeIfAbsent(deviceId, k -> new ConcurrentHashMap<>());
alshabib10580802015-02-18 18:30:33 -0800337 }
338
339 /**
340 * Returns the number of groups for the specified device in the store.
341 *
342 * @return number of groups for the specified device
343 */
344 @Override
345 public int getGroupCount(DeviceId deviceId) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700346 return (getGroups(deviceId) != null) ?
alshabibb0285992016-03-28 23:30:37 -0700347 Iterables.size(getGroups(deviceId)) : 0;
alshabib10580802015-02-18 18:30:33 -0800348 }
349
350 /**
351 * Returns the groups associated with a device.
352 *
353 * @param deviceId the device ID
alshabib10580802015-02-18 18:30:33 -0800354 * @return the group entries
355 */
356 @Override
357 public Iterable<Group> getGroups(DeviceId deviceId) {
Charles Chanf4838a72015-12-07 18:13:45 -0800358 // Let ImmutableSet.copyOf do the type conversion
359 return ImmutableSet.copyOf(getStoredGroups(deviceId));
alshabib10580802015-02-18 18:30:33 -0800360 }
361
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700362 private Iterable<StoredGroupEntry> getStoredGroups(DeviceId deviceId) {
Charles Chanf4838a72015-12-07 18:13:45 -0800363 NodeId master = mastershipService.getMasterFor(deviceId);
364 if (master == null) {
365 log.debug("Failed to getGroups: No master for {}", deviceId);
366 return Collections.emptySet();
367 }
368
369 Set<StoredGroupEntry> storedGroups = getGroupStoreKeyMap().values()
370 .stream()
371 .filter(input -> input.deviceId().equals(deviceId))
372 .collect(Collectors.toSet());
373 return ImmutableSet.copyOf(storedGroups);
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700374 }
375
alshabib10580802015-02-18 18:30:33 -0800376 /**
377 * Returns the stored group entry.
378 *
alshabibb0285992016-03-28 23:30:37 -0700379 * @param deviceId the device ID
alshabib10580802015-02-18 18:30:33 -0800380 * @param appCookie the group key
alshabib10580802015-02-18 18:30:33 -0800381 * @return a group associated with the key
382 */
383 @Override
384 public Group getGroup(DeviceId deviceId, GroupKey appCookie) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700385 return getStoredGroupEntry(deviceId, appCookie);
386 }
387
388 private StoredGroupEntry getStoredGroupEntry(DeviceId deviceId,
389 GroupKey appCookie) {
390 return getGroupStoreKeyMap().get(new GroupStoreKeyMapKey(deviceId,
391 appCookie));
392 }
393
394 @Override
395 public Group getGroup(DeviceId deviceId, GroupId groupId) {
396 return getStoredGroupEntry(deviceId, groupId);
397 }
398
399 private StoredGroupEntry getStoredGroupEntry(DeviceId deviceId,
400 GroupId groupId) {
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700401 return getGroupIdTable(deviceId).get(groupId);
alshabib10580802015-02-18 18:30:33 -0800402 }
403
404 private int getFreeGroupIdValue(DeviceId deviceId) {
405 int freeId = groupIdGen.incrementAndGet();
406
407 while (true) {
Yi Tsengfa394de2017-02-01 11:26:40 -0800408 Group existing = getGroup(deviceId, new GroupId(freeId));
alshabib10580802015-02-18 18:30:33 -0800409 if (existing == null) {
410 existing = (
411 extraneousGroupEntriesById.get(deviceId) != null) ?
412 extraneousGroupEntriesById.get(deviceId).
Yi Tsengfa394de2017-02-01 11:26:40 -0800413 get(new GroupId(freeId)) :
alshabib10580802015-02-18 18:30:33 -0800414 null;
415 }
416 if (existing != null) {
417 freeId = groupIdGen.incrementAndGet();
418 } else {
419 break;
420 }
421 }
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700422 log.debug("getFreeGroupIdValue: Next Free ID is {}", freeId);
alshabib10580802015-02-18 18:30:33 -0800423 return freeId;
424 }
425
426 /**
427 * Stores a new group entry using the information from group description.
428 *
429 * @param groupDesc group description to be used to create group entry
430 */
431 @Override
432 public void storeGroupDescription(GroupDescription groupDesc) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700433 log.debug("In storeGroupDescription");
alshabib10580802015-02-18 18:30:33 -0800434 // Check if a group is existing with the same key
Saurav Das8a0732e2015-11-20 15:27:53 -0800435 Group existingGroup = getGroup(groupDesc.deviceId(), groupDesc.appCookie());
436 if (existingGroup != null) {
Charles Chan216e3c82016-04-23 14:48:16 -0700437 log.info("Group already exists with the same key {} in dev:{} with id:0x{}",
Saurav Das8a0732e2015-11-20 15:27:53 -0800438 groupDesc.appCookie(), groupDesc.deviceId(),
439 Integer.toHexString(existingGroup.id().id()));
alshabib10580802015-02-18 18:30:33 -0800440 return;
441 }
442
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700443 // Check if group to be created by a remote instance
Madan Jampani175e8fd2015-05-20 14:10:45 -0700444 if (mastershipService.getLocalRole(groupDesc.deviceId()) != MastershipRole.MASTER) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700445 log.debug("storeGroupDescription: Device {} local role is not MASTER",
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700446 groupDesc.deviceId());
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700447 if (mastershipService.getMasterFor(groupDesc.deviceId()) == null) {
448 log.error("No Master for device {}..."
alshabibb0285992016-03-28 23:30:37 -0700449 + "Can not perform add group operation",
450 groupDesc.deviceId());
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700451 //TODO: Send Group operation failure event
452 return;
453 }
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700454 GroupStoreMessage groupOp = GroupStoreMessage.
455 createGroupAddRequestMsg(groupDesc.deviceId(),
456 groupDesc);
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700457
Madan Jampani175e8fd2015-05-20 14:10:45 -0700458 clusterCommunicator.unicast(groupOp,
alshabibb0285992016-03-28 23:30:37 -0700459 GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST,
460 clusterMsgSerializer::serialize,
461 mastershipService.getMasterFor(groupDesc.deviceId()))
462 .whenComplete((result, error) -> {
Madan Jampani175e8fd2015-05-20 14:10:45 -0700463 if (error != null) {
464 log.warn("Failed to send request to master: {} to {}",
alshabibb0285992016-03-28 23:30:37 -0700465 groupOp,
466 mastershipService.getMasterFor(groupDesc.deviceId()));
Madan Jampani175e8fd2015-05-20 14:10:45 -0700467 //TODO: Send Group operation failure event
468 } else {
469 log.debug("Sent Group operation request for device {} "
alshabibb0285992016-03-28 23:30:37 -0700470 + "to remote MASTER {}",
471 groupDesc.deviceId(),
472 mastershipService.getMasterFor(groupDesc.deviceId()));
Madan Jampani175e8fd2015-05-20 14:10:45 -0700473 }
474 });
alshabib10580802015-02-18 18:30:33 -0800475 return;
476 }
477
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700478 log.debug("Store group for device {} is getting handled locally",
479 groupDesc.deviceId());
alshabib10580802015-02-18 18:30:33 -0800480 storeGroupDescriptionInternal(groupDesc);
481 }
482
Srikanth Vavilapallie48b3cf2015-07-06 11:43:07 -0700483 private Group getMatchingExtraneousGroupbyId(DeviceId deviceId, Integer groupId) {
484 ConcurrentMap<GroupId, Group> extraneousMap =
485 extraneousGroupEntriesById.get(deviceId);
486 if (extraneousMap == null) {
487 return null;
488 }
Yi Tsengfa394de2017-02-01 11:26:40 -0800489 return extraneousMap.get(new GroupId(groupId));
Srikanth Vavilapallie48b3cf2015-07-06 11:43:07 -0700490 }
491
492 private Group getMatchingExtraneousGroupbyBuckets(DeviceId deviceId,
493 GroupBuckets buckets) {
494 ConcurrentMap<GroupId, Group> extraneousMap =
495 extraneousGroupEntriesById.get(deviceId);
496 if (extraneousMap == null) {
497 return null;
498 }
499
alshabibb0285992016-03-28 23:30:37 -0700500 for (Group extraneousGroup : extraneousMap.values()) {
Srikanth Vavilapallie48b3cf2015-07-06 11:43:07 -0700501 if (extraneousGroup.buckets().equals(buckets)) {
502 return extraneousGroup;
503 }
504 }
505 return null;
506 }
507
alshabib10580802015-02-18 18:30:33 -0800508 private void storeGroupDescriptionInternal(GroupDescription groupDesc) {
509 // Check if a group is existing with the same key
510 if (getGroup(groupDesc.deviceId(), groupDesc.appCookie()) != null) {
511 return;
512 }
513
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700514 if (deviceAuditStatus.get(groupDesc.deviceId()) == null) {
515 // Device group audit has not completed yet
516 // Add this group description to pending group key table
517 // Create a group entry object with Dummy Group ID
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700518 log.debug("storeGroupDescriptionInternal: Device {} AUDIT pending...Queuing Group ADD request",
alshabibb0285992016-03-28 23:30:37 -0700519 groupDesc.deviceId());
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700520 StoredGroupEntry group = new DefaultGroup(dummyGroupId, groupDesc);
521 group.setState(GroupState.WAITING_AUDIT_COMPLETE);
Madan Jampani0b847532016-03-03 13:44:15 -0800522 Map<GroupStoreKeyMapKey, StoredGroupEntry> pendingKeyTable =
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700523 getPendingGroupKeyTable();
524 pendingKeyTable.put(new GroupStoreKeyMapKey(groupDesc.deviceId(),
525 groupDesc.appCookie()),
526 group);
527 return;
528 }
529
Srikanth Vavilapallie48b3cf2015-07-06 11:43:07 -0700530 Group matchingExtraneousGroup = null;
531 if (groupDesc.givenGroupId() != null) {
532 //Check if there is a extraneous group existing with the same Id
533 matchingExtraneousGroup = getMatchingExtraneousGroupbyId(
alshabibb0285992016-03-28 23:30:37 -0700534 groupDesc.deviceId(), groupDesc.givenGroupId());
Srikanth Vavilapallie48b3cf2015-07-06 11:43:07 -0700535 if (matchingExtraneousGroup != null) {
Saurav Das0fd79d92016-03-07 10:58:36 -0800536 log.debug("storeGroupDescriptionInternal: Matching extraneous group "
alshabibb0285992016-03-28 23:30:37 -0700537 + "found in Device {} for group id 0x{}",
Srikanth Vavilapallie48b3cf2015-07-06 11:43:07 -0700538 groupDesc.deviceId(),
Saurav Das0fd79d92016-03-07 10:58:36 -0800539 Integer.toHexString(groupDesc.givenGroupId()));
Srikanth Vavilapallie48b3cf2015-07-06 11:43:07 -0700540 //Check if the group buckets matches with user provided buckets
541 if (matchingExtraneousGroup.buckets().equals(groupDesc.buckets())) {
542 //Group is already existing with the same buckets and Id
543 // Create a group entry object
Saurav Das0fd79d92016-03-07 10:58:36 -0800544 log.debug("storeGroupDescriptionInternal: Buckets also matching "
alshabibb0285992016-03-28 23:30:37 -0700545 + "in Device {} for group id 0x{}",
Srikanth Vavilapallie48b3cf2015-07-06 11:43:07 -0700546 groupDesc.deviceId(),
Saurav Das0fd79d92016-03-07 10:58:36 -0800547 Integer.toHexString(groupDesc.givenGroupId()));
Srikanth Vavilapallie48b3cf2015-07-06 11:43:07 -0700548 StoredGroupEntry group = new DefaultGroup(
alshabibb0285992016-03-28 23:30:37 -0700549 matchingExtraneousGroup.id(), groupDesc);
Srikanth Vavilapallie48b3cf2015-07-06 11:43:07 -0700550 // Insert the newly created group entry into key and id maps
551 getGroupStoreKeyMap().
alshabibb0285992016-03-28 23:30:37 -0700552 put(new GroupStoreKeyMapKey(groupDesc.deviceId(),
553 groupDesc.appCookie()), group);
Srikanth Vavilapallie48b3cf2015-07-06 11:43:07 -0700554 // Ensure it also inserted into group id based table to
555 // avoid any chances of duplication in group id generation
556 getGroupIdTable(groupDesc.deviceId()).
alshabibb0285992016-03-28 23:30:37 -0700557 put(matchingExtraneousGroup.id(), group);
Srikanth Vavilapallie48b3cf2015-07-06 11:43:07 -0700558 addOrUpdateGroupEntry(matchingExtraneousGroup);
559 removeExtraneousGroupEntry(matchingExtraneousGroup);
560 return;
561 } else {
562 //Group buckets are not matching. Update group
563 //with user provided buckets.
Saurav Das0fd79d92016-03-07 10:58:36 -0800564 log.debug("storeGroupDescriptionInternal: Buckets are not "
alshabibb0285992016-03-28 23:30:37 -0700565 + "matching in Device {} for group id 0x{}",
Srikanth Vavilapallie48b3cf2015-07-06 11:43:07 -0700566 groupDesc.deviceId(),
Saurav Das0fd79d92016-03-07 10:58:36 -0800567 Integer.toHexString(groupDesc.givenGroupId()));
568 StoredGroupEntry modifiedGroup = new DefaultGroup(
alshabibb0285992016-03-28 23:30:37 -0700569 matchingExtraneousGroup.id(), groupDesc);
Saurav Das0fd79d92016-03-07 10:58:36 -0800570 modifiedGroup.setState(GroupState.PENDING_UPDATE);
571 getGroupStoreKeyMap().
alshabibb0285992016-03-28 23:30:37 -0700572 put(new GroupStoreKeyMapKey(groupDesc.deviceId(),
573 groupDesc.appCookie()), modifiedGroup);
Saurav Das0fd79d92016-03-07 10:58:36 -0800574 // Ensure it also inserted into group id based table to
575 // avoid any chances of duplication in group id generation
576 getGroupIdTable(groupDesc.deviceId()).
alshabibb0285992016-03-28 23:30:37 -0700577 put(matchingExtraneousGroup.id(), modifiedGroup);
Saurav Das0fd79d92016-03-07 10:58:36 -0800578 removeExtraneousGroupEntry(matchingExtraneousGroup);
579 log.debug("storeGroupDescriptionInternal: Triggering Group "
alshabibb0285992016-03-28 23:30:37 -0700580 + "UPDATE request for {} in device {}",
Saurav Das0fd79d92016-03-07 10:58:36 -0800581 matchingExtraneousGroup.id(),
582 groupDesc.deviceId());
583 notifyDelegate(new GroupEvent(Type.GROUP_UPDATE_REQUESTED, modifiedGroup));
584 return;
Srikanth Vavilapallie48b3cf2015-07-06 11:43:07 -0700585 }
586 }
587 } else {
588 //Check if there is an extraneous group with user provided buckets
589 matchingExtraneousGroup = getMatchingExtraneousGroupbyBuckets(
alshabibb0285992016-03-28 23:30:37 -0700590 groupDesc.deviceId(), groupDesc.buckets());
Srikanth Vavilapallie48b3cf2015-07-06 11:43:07 -0700591 if (matchingExtraneousGroup != null) {
592 //Group is already existing with the same buckets.
593 //So reuse this group.
594 log.debug("storeGroupDescriptionInternal: Matching extraneous group found in Device {}",
595 groupDesc.deviceId());
596 //Create a group entry object
597 StoredGroupEntry group = new DefaultGroup(
alshabibb0285992016-03-28 23:30:37 -0700598 matchingExtraneousGroup.id(), groupDesc);
Srikanth Vavilapallie48b3cf2015-07-06 11:43:07 -0700599 // Insert the newly created group entry into key and id maps
600 getGroupStoreKeyMap().
alshabibb0285992016-03-28 23:30:37 -0700601 put(new GroupStoreKeyMapKey(groupDesc.deviceId(),
602 groupDesc.appCookie()), group);
Srikanth Vavilapallie48b3cf2015-07-06 11:43:07 -0700603 // Ensure it also inserted into group id based table to
604 // avoid any chances of duplication in group id generation
605 getGroupIdTable(groupDesc.deviceId()).
alshabibb0285992016-03-28 23:30:37 -0700606 put(matchingExtraneousGroup.id(), group);
Srikanth Vavilapallie48b3cf2015-07-06 11:43:07 -0700607 addOrUpdateGroupEntry(matchingExtraneousGroup);
608 removeExtraneousGroupEntry(matchingExtraneousGroup);
609 return;
610 } else {
611 //TODO: Check if there are any empty groups that can be used here
612 log.debug("storeGroupDescriptionInternal: No matching extraneous groups found in Device {}",
613 groupDesc.deviceId());
614 }
615 }
616
Saurav Das100e3b82015-04-30 11:12:10 -0700617 GroupId id = null;
618 if (groupDesc.givenGroupId() == null) {
619 // Get a new group identifier
Yi Tsengfa394de2017-02-01 11:26:40 -0800620 id = new GroupId(getFreeGroupIdValue(groupDesc.deviceId()));
Saurav Das100e3b82015-04-30 11:12:10 -0700621 } else {
Saurav Das8be4e3a2016-03-11 17:19:07 -0800622 // we need to use the identifier passed in by caller, but check if
623 // already used
624 Group existing = getGroup(groupDesc.deviceId(),
Yi Tsengfa394de2017-02-01 11:26:40 -0800625 new GroupId(groupDesc.givenGroupId()));
Saurav Das8be4e3a2016-03-11 17:19:07 -0800626 if (existing != null) {
627 log.warn("Group already exists with the same id: 0x{} in dev:{} "
alshabibb0285992016-03-28 23:30:37 -0700628 + "but with different key: {} (request gkey: {})",
629 Integer.toHexString(groupDesc.givenGroupId()),
630 groupDesc.deviceId(),
631 existing.appCookie(),
632 groupDesc.appCookie());
Saurav Das8be4e3a2016-03-11 17:19:07 -0800633 return;
634 }
Yi Tsengfa394de2017-02-01 11:26:40 -0800635 id = new GroupId(groupDesc.givenGroupId());
Saurav Das100e3b82015-04-30 11:12:10 -0700636 }
alshabib10580802015-02-18 18:30:33 -0800637 // Create a group entry object
638 StoredGroupEntry group = new DefaultGroup(id, groupDesc);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700639 // Insert the newly created group entry into key and id maps
640 getGroupStoreKeyMap().
alshabibb0285992016-03-28 23:30:37 -0700641 put(new GroupStoreKeyMapKey(groupDesc.deviceId(),
642 groupDesc.appCookie()), group);
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700643 // Ensure it also inserted into group id based table to
644 // avoid any chances of duplication in group id generation
645 getGroupIdTable(groupDesc.deviceId()).
alshabibb0285992016-03-28 23:30:37 -0700646 put(id, group);
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700647 log.debug("storeGroupDescriptionInternal: Processing Group ADD request for Id {} in device {}",
alshabibb0285992016-03-28 23:30:37 -0700648 id,
649 groupDesc.deviceId());
alshabib10580802015-02-18 18:30:33 -0800650 notifyDelegate(new GroupEvent(GroupEvent.Type.GROUP_ADD_REQUESTED,
651 group));
652 }
653
654 /**
655 * Updates the existing group entry with the information
656 * from group description.
657 *
alshabibb0285992016-03-28 23:30:37 -0700658 * @param deviceId the device ID
alshabib10580802015-02-18 18:30:33 -0800659 * @param oldAppCookie the current group key
alshabibb0285992016-03-28 23:30:37 -0700660 * @param type update type
661 * @param newBuckets group buckets for updates
alshabib10580802015-02-18 18:30:33 -0800662 * @param newAppCookie optional new group key
663 */
664 @Override
665 public void updateGroupDescription(DeviceId deviceId,
666 GroupKey oldAppCookie,
667 UpdateType type,
668 GroupBuckets newBuckets,
669 GroupKey newAppCookie) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700670 // Check if group update to be done by a remote instance
sangho52abe3a2015-05-05 14:13:34 -0700671 if (mastershipService.getMasterFor(deviceId) != null &&
672 mastershipService.getLocalRole(deviceId) != MastershipRole.MASTER) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700673 log.debug("updateGroupDescription: Device {} local role is not MASTER",
674 deviceId);
675 if (mastershipService.getMasterFor(deviceId) == null) {
676 log.error("No Master for device {}..."
alshabibb0285992016-03-28 23:30:37 -0700677 + "Can not perform update group operation",
678 deviceId);
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700679 //TODO: Send Group operation failure event
680 return;
681 }
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700682 GroupStoreMessage groupOp = GroupStoreMessage.
683 createGroupUpdateRequestMsg(deviceId,
684 oldAppCookie,
685 type,
686 newBuckets,
687 newAppCookie);
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700688
Madan Jampani175e8fd2015-05-20 14:10:45 -0700689 clusterCommunicator.unicast(groupOp,
alshabibb0285992016-03-28 23:30:37 -0700690 GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST,
691 clusterMsgSerializer::serialize,
692 mastershipService.getMasterFor(deviceId)).whenComplete((result, error) -> {
693 if (error != null) {
694 log.warn("Failed to send request to master: {} to {}",
695 groupOp,
696 mastershipService.getMasterFor(deviceId), error);
697 }
698 //TODO: Send Group operation failure event
699 });
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700700 return;
701 }
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700702 log.debug("updateGroupDescription for device {} is getting handled locally",
703 deviceId);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700704 updateGroupDescriptionInternal(deviceId,
705 oldAppCookie,
706 type,
707 newBuckets,
708 newAppCookie);
709 }
710
711 private void updateGroupDescriptionInternal(DeviceId deviceId,
alshabibb0285992016-03-28 23:30:37 -0700712 GroupKey oldAppCookie,
713 UpdateType type,
714 GroupBuckets newBuckets,
715 GroupKey newAppCookie) {
alshabib10580802015-02-18 18:30:33 -0800716 // Check if a group is existing with the provided key
717 Group oldGroup = getGroup(deviceId, oldAppCookie);
718 if (oldGroup == null) {
Saurav Das8be4e3a2016-03-11 17:19:07 -0800719 log.warn("updateGroupDescriptionInternal: Group not found...strange. "
alshabibb0285992016-03-28 23:30:37 -0700720 + "GroupKey:{} DeviceId:{}", oldAppCookie, deviceId);
alshabib10580802015-02-18 18:30:33 -0800721 return;
722 }
723
724 List<GroupBucket> newBucketList = getUpdatedBucketList(oldGroup,
725 type,
726 newBuckets);
727 if (newBucketList != null) {
728 // Create a new group object from the old group
729 GroupBuckets updatedBuckets = new GroupBuckets(newBucketList);
730 GroupKey newCookie = (newAppCookie != null) ? newAppCookie : oldAppCookie;
731 GroupDescription updatedGroupDesc = new DefaultGroupDescription(
732 oldGroup.deviceId(),
733 oldGroup.type(),
734 updatedBuckets,
735 newCookie,
Saurav Das100e3b82015-04-30 11:12:10 -0700736 oldGroup.givenGroupId(),
alshabib10580802015-02-18 18:30:33 -0800737 oldGroup.appId());
738 StoredGroupEntry newGroup = new DefaultGroup(oldGroup.id(),
739 updatedGroupDesc);
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700740 log.debug("updateGroupDescriptionInternal: group entry {} in device {} moving from {} to PENDING_UPDATE",
alshabibb0285992016-03-28 23:30:37 -0700741 oldGroup.id(),
742 oldGroup.deviceId(),
743 oldGroup.state());
alshabib10580802015-02-18 18:30:33 -0800744 newGroup.setState(GroupState.PENDING_UPDATE);
745 newGroup.setLife(oldGroup.life());
746 newGroup.setPackets(oldGroup.packets());
747 newGroup.setBytes(oldGroup.bytes());
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700748 //Update the group entry in groupkey based map.
749 //Update to groupid based map will happen in the
750 //groupkey based map update listener
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700751 log.debug("updateGroupDescriptionInternal with type {}: Group updated with buckets",
752 type);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700753 getGroupStoreKeyMap().
alshabibb0285992016-03-28 23:30:37 -0700754 put(new GroupStoreKeyMapKey(newGroup.deviceId(),
755 newGroup.appCookie()), newGroup);
alshabib10580802015-02-18 18:30:33 -0800756 notifyDelegate(new GroupEvent(Type.GROUP_UPDATE_REQUESTED, newGroup));
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700757 } else {
758 log.warn("updateGroupDescriptionInternal with type {}: No "
alshabibb0285992016-03-28 23:30:37 -0700759 + "change in the buckets in update", type);
alshabib10580802015-02-18 18:30:33 -0800760 }
761 }
762
763 private List<GroupBucket> getUpdatedBucketList(Group oldGroup,
764 UpdateType type,
765 GroupBuckets buckets) {
Victor Silva0282ab82016-11-15 16:30:27 -0300766 if (type == UpdateType.SET) {
767 return buckets.buckets();
768 }
769
Victor Silvadf1eeae2016-08-12 15:28:57 -0300770 List<GroupBucket> oldBuckets = oldGroup.buckets().buckets();
771 List<GroupBucket> updatedBucketList = new ArrayList<>();
alshabib10580802015-02-18 18:30:33 -0800772 boolean groupDescUpdated = false;
773
774 if (type == UpdateType.ADD) {
Victor Silvadf1eeae2016-08-12 15:28:57 -0300775 List<GroupBucket> newBuckets = buckets.buckets();
776
777 // Add old buckets that will not be updated and check if any will be updated.
778 for (GroupBucket oldBucket : oldBuckets) {
779 int newBucketIndex = newBuckets.indexOf(oldBucket);
780
781 if (newBucketIndex != -1) {
782 GroupBucket newBucket = newBuckets.get(newBucketIndex);
783 if (!newBucket.hasSameParameters(oldBucket)) {
784 // Bucket will be updated
785 groupDescUpdated = true;
786 }
787 } else {
788 // Old bucket will remain the same - add it.
789 updatedBucketList.add(oldBucket);
alshabib10580802015-02-18 18:30:33 -0800790 }
791 }
Victor Silvadf1eeae2016-08-12 15:28:57 -0300792
793 // Add all new buckets
794 updatedBucketList.addAll(newBuckets);
795 if (!oldBuckets.containsAll(newBuckets)) {
796 groupDescUpdated = true;
797 }
798
alshabib10580802015-02-18 18:30:33 -0800799 } else if (type == UpdateType.REMOVE) {
Victor Silvadf1eeae2016-08-12 15:28:57 -0300800 List<GroupBucket> bucketsToRemove = buckets.buckets();
801
802 // Check which old buckets should remain
803 for (GroupBucket oldBucket : oldBuckets) {
804 if (!bucketsToRemove.contains(oldBucket)) {
805 updatedBucketList.add(oldBucket);
806 } else {
alshabib10580802015-02-18 18:30:33 -0800807 groupDescUpdated = true;
808 }
809 }
810 }
811
812 if (groupDescUpdated) {
Victor Silvadf1eeae2016-08-12 15:28:57 -0300813 return updatedBucketList;
alshabib10580802015-02-18 18:30:33 -0800814 } else {
815 return null;
816 }
817 }
818
819 /**
820 * Triggers deleting the existing group entry.
821 *
alshabibb0285992016-03-28 23:30:37 -0700822 * @param deviceId the device ID
alshabib10580802015-02-18 18:30:33 -0800823 * @param appCookie the group key
824 */
825 @Override
826 public void deleteGroupDescription(DeviceId deviceId,
827 GroupKey appCookie) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700828 // Check if group to be deleted by a remote instance
829 if (mastershipService.
830 getLocalRole(deviceId) != MastershipRole.MASTER) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700831 log.debug("deleteGroupDescription: Device {} local role is not MASTER",
832 deviceId);
833 if (mastershipService.getMasterFor(deviceId) == null) {
834 log.error("No Master for device {}..."
alshabibb0285992016-03-28 23:30:37 -0700835 + "Can not perform delete group operation",
836 deviceId);
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700837 //TODO: Send Group operation failure event
838 return;
839 }
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700840 GroupStoreMessage groupOp = GroupStoreMessage.
841 createGroupDeleteRequestMsg(deviceId,
842 appCookie);
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700843
Madan Jampani175e8fd2015-05-20 14:10:45 -0700844 clusterCommunicator.unicast(groupOp,
alshabibb0285992016-03-28 23:30:37 -0700845 GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST,
846 clusterMsgSerializer::serialize,
847 mastershipService.getMasterFor(deviceId)).whenComplete((result, error) -> {
848 if (error != null) {
849 log.warn("Failed to send request to master: {} to {}",
850 groupOp,
851 mastershipService.getMasterFor(deviceId), error);
852 }
853 //TODO: Send Group operation failure event
854 });
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700855 return;
856 }
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700857 log.debug("deleteGroupDescription in device {} is getting handled locally",
858 deviceId);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700859 deleteGroupDescriptionInternal(deviceId, appCookie);
860 }
861
862 private void deleteGroupDescriptionInternal(DeviceId deviceId,
863 GroupKey appCookie) {
alshabib10580802015-02-18 18:30:33 -0800864 // Check if a group is existing with the provided key
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700865 StoredGroupEntry existing = getStoredGroupEntry(deviceId, appCookie);
alshabib10580802015-02-18 18:30:33 -0800866 if (existing == null) {
867 return;
868 }
869
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700870 log.debug("deleteGroupDescriptionInternal: group entry {} in device {} moving from {} to PENDING_DELETE",
alshabibb0285992016-03-28 23:30:37 -0700871 existing.id(),
872 existing.deviceId(),
873 existing.state());
alshabib10580802015-02-18 18:30:33 -0800874 synchronized (existing) {
875 existing.setState(GroupState.PENDING_DELETE);
Saurav Das80980c72016-03-23 11:22:49 -0700876 getGroupStoreKeyMap().
alshabibb0285992016-03-28 23:30:37 -0700877 put(new GroupStoreKeyMapKey(existing.deviceId(), existing.appCookie()),
878 existing);
alshabib10580802015-02-18 18:30:33 -0800879 }
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700880 log.debug("deleteGroupDescriptionInternal: in device {} issuing GROUP_REMOVE_REQUESTED",
881 deviceId);
alshabib10580802015-02-18 18:30:33 -0800882 notifyDelegate(new GroupEvent(Type.GROUP_REMOVE_REQUESTED, existing));
883 }
884
885 /**
886 * Stores a new group entry, or updates an existing entry.
887 *
888 * @param group group entry
889 */
890 @Override
891 public void addOrUpdateGroupEntry(Group group) {
892 // check if this new entry is an update to an existing entry
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700893 StoredGroupEntry existing = getStoredGroupEntry(group.deviceId(),
894 group.id());
alshabib10580802015-02-18 18:30:33 -0800895 GroupEvent event = null;
896
897 if (existing != null) {
Saurav Das0fd79d92016-03-07 10:58:36 -0800898 log.trace("addOrUpdateGroupEntry: updating group entry {} in device {}",
alshabibb0285992016-03-28 23:30:37 -0700899 group.id(),
900 group.deviceId());
alshabib10580802015-02-18 18:30:33 -0800901 synchronized (existing) {
alshabibb0285992016-03-28 23:30:37 -0700902 for (GroupBucket bucket : group.buckets().buckets()) {
Sho SHIMIZU30d639b2015-05-05 09:30:35 -0700903 Optional<GroupBucket> matchingBucket =
Srikanth Vavilapalli10e75cd2015-04-13 16:21:24 -0700904 existing.buckets().buckets()
alshabibb0285992016-03-28 23:30:37 -0700905 .stream()
906 .filter((existingBucket) -> (existingBucket.equals(bucket)))
907 .findFirst();
Srikanth Vavilapalli10e75cd2015-04-13 16:21:24 -0700908 if (matchingBucket.isPresent()) {
909 ((StoredGroupBucketEntry) matchingBucket.
910 get()).setPackets(bucket.packets());
911 ((StoredGroupBucketEntry) matchingBucket.
912 get()).setBytes(bucket.bytes());
913 } else {
914 log.warn("addOrUpdateGroupEntry: No matching "
alshabibb0285992016-03-28 23:30:37 -0700915 + "buckets to update stats");
Srikanth Vavilapalli10e75cd2015-04-13 16:21:24 -0700916 }
917 }
alshabib10580802015-02-18 18:30:33 -0800918 existing.setLife(group.life());
919 existing.setPackets(group.packets());
920 existing.setBytes(group.bytes());
alshabibb0285992016-03-28 23:30:37 -0700921 existing.setReferenceCount(group.referenceCount());
Srikanth Vavilapalli5428b6c2015-05-14 20:22:47 -0700922 if ((existing.state() == GroupState.PENDING_ADD) ||
alshabibb0285992016-03-28 23:30:37 -0700923 (existing.state() == GroupState.PENDING_ADD_RETRY)) {
Saurav Das0fd79d92016-03-07 10:58:36 -0800924 log.trace("addOrUpdateGroupEntry: group entry {} in device {} moving from {} to ADDED",
alshabibb0285992016-03-28 23:30:37 -0700925 existing.id(),
926 existing.deviceId(),
927 existing.state());
alshabib10580802015-02-18 18:30:33 -0800928 existing.setState(GroupState.ADDED);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700929 existing.setIsGroupStateAddedFirstTime(true);
alshabib10580802015-02-18 18:30:33 -0800930 event = new GroupEvent(Type.GROUP_ADDED, existing);
931 } else {
Saurav Das0fd79d92016-03-07 10:58:36 -0800932 log.trace("addOrUpdateGroupEntry: group entry {} in device {} moving from {} to ADDED",
alshabibb0285992016-03-28 23:30:37 -0700933 existing.id(),
934 existing.deviceId(),
935 GroupState.PENDING_UPDATE);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700936 existing.setState(GroupState.ADDED);
937 existing.setIsGroupStateAddedFirstTime(false);
alshabib10580802015-02-18 18:30:33 -0800938 event = new GroupEvent(Type.GROUP_UPDATED, existing);
939 }
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700940 //Re-PUT map entries to trigger map update events
941 getGroupStoreKeyMap().
alshabibb0285992016-03-28 23:30:37 -0700942 put(new GroupStoreKeyMapKey(existing.deviceId(),
943 existing.appCookie()), existing);
alshabib10580802015-02-18 18:30:33 -0800944 }
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700945 } else {
946 log.warn("addOrUpdateGroupEntry: Group update "
alshabibb0285992016-03-28 23:30:37 -0700947 + "happening for a non-existing entry in the map");
alshabib10580802015-02-18 18:30:33 -0800948 }
949
950 if (event != null) {
951 notifyDelegate(event);
952 }
953 }
954
955 /**
956 * Removes the group entry from store.
957 *
958 * @param group group entry
959 */
960 @Override
961 public void removeGroupEntry(Group group) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700962 StoredGroupEntry existing = getStoredGroupEntry(group.deviceId(),
963 group.id());
alshabib10580802015-02-18 18:30:33 -0800964
965 if (existing != null) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700966 log.debug("removeGroupEntry: removing group entry {} in device {}",
alshabibb0285992016-03-28 23:30:37 -0700967 group.id(),
968 group.deviceId());
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700969 //Removal from groupid based map will happen in the
970 //map update listener
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700971 getGroupStoreKeyMap().remove(new GroupStoreKeyMapKey(existing.deviceId(),
972 existing.appCookie()));
alshabib10580802015-02-18 18:30:33 -0800973 notifyDelegate(new GroupEvent(Type.GROUP_REMOVED, existing));
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700974 } else {
975 log.warn("removeGroupEntry for {} in device{} is "
alshabibb0285992016-03-28 23:30:37 -0700976 + "not existing in our maps",
977 group.id(),
978 group.deviceId());
alshabib10580802015-02-18 18:30:33 -0800979 }
980 }
981
Victor Silva4e8b7832016-08-17 17:11:19 -0300982 private void purgeGroupEntries(Set<Entry<GroupStoreKeyMapKey, StoredGroupEntry>> entries) {
983 entries.forEach(entry -> {
984 groupStoreEntriesByKey.remove(entry.getKey());
985 });
986 }
987
alshabib10580802015-02-18 18:30:33 -0800988 @Override
Charles Chan0c7c43b2016-01-14 17:39:20 -0800989 public void purgeGroupEntry(DeviceId deviceId) {
Victor Silva4e8b7832016-08-17 17:11:19 -0300990 Set<Entry<GroupStoreKeyMapKey, StoredGroupEntry>> entriesPendingRemove =
Charles Chan0c7c43b2016-01-14 17:39:20 -0800991 new HashSet<>();
992
Madan Jampani0b847532016-03-03 13:44:15 -0800993 getGroupStoreKeyMap().entrySet().stream()
Charles Chan0c7c43b2016-01-14 17:39:20 -0800994 .filter(entry -> entry.getKey().deviceId().equals(deviceId))
Victor Silva4e8b7832016-08-17 17:11:19 -0300995 .forEach(entriesPendingRemove::add);
Charles Chan0c7c43b2016-01-14 17:39:20 -0800996
Victor Silva4e8b7832016-08-17 17:11:19 -0300997 purgeGroupEntries(entriesPendingRemove);
998 }
999
1000 @Override
1001 public void purgeGroupEntries() {
1002 purgeGroupEntries(getGroupStoreKeyMap().entrySet());
Charles Chan0c7c43b2016-01-14 17:39:20 -08001003 }
1004
1005 @Override
alshabib10580802015-02-18 18:30:33 -08001006 public void deviceInitialAuditCompleted(DeviceId deviceId,
1007 boolean completed) {
1008 synchronized (deviceAuditStatus) {
1009 if (completed) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001010 log.debug("AUDIT completed for device {}",
1011 deviceId);
alshabib10580802015-02-18 18:30:33 -08001012 deviceAuditStatus.put(deviceId, true);
1013 // Execute all pending group requests
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001014 List<StoredGroupEntry> pendingGroupRequests =
1015 getPendingGroupKeyTable().values()
alshabibb0285992016-03-28 23:30:37 -07001016 .stream()
1017 .filter(g -> g.deviceId().equals(deviceId))
1018 .collect(Collectors.toList());
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001019 log.debug("processing pending group add requests for device {} and number of pending requests {}",
alshabibb0285992016-03-28 23:30:37 -07001020 deviceId,
1021 pendingGroupRequests.size());
1022 for (Group group : pendingGroupRequests) {
alshabib10580802015-02-18 18:30:33 -08001023 GroupDescription tmp = new DefaultGroupDescription(
1024 group.deviceId(),
1025 group.type(),
1026 group.buckets(),
1027 group.appCookie(),
Saurav Das100e3b82015-04-30 11:12:10 -07001028 group.givenGroupId(),
alshabib10580802015-02-18 18:30:33 -08001029 group.appId());
1030 storeGroupDescriptionInternal(tmp);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001031 getPendingGroupKeyTable().
alshabibb0285992016-03-28 23:30:37 -07001032 remove(new GroupStoreKeyMapKey(deviceId, group.appCookie()));
alshabib10580802015-02-18 18:30:33 -08001033 }
alshabib10580802015-02-18 18:30:33 -08001034 } else {
Thomas Vachuskac40d4632015-04-09 16:55:03 -07001035 Boolean audited = deviceAuditStatus.get(deviceId);
1036 if (audited != null && audited) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001037 log.debug("Clearing AUDIT status for device {}", deviceId);
alshabib10580802015-02-18 18:30:33 -08001038 deviceAuditStatus.put(deviceId, false);
1039 }
1040 }
1041 }
1042 }
1043
1044 @Override
1045 public boolean deviceInitialAuditStatus(DeviceId deviceId) {
1046 synchronized (deviceAuditStatus) {
Thomas Vachuskac40d4632015-04-09 16:55:03 -07001047 Boolean audited = deviceAuditStatus.get(deviceId);
1048 return audited != null && audited;
alshabib10580802015-02-18 18:30:33 -08001049 }
1050 }
1051
1052 @Override
1053 public void groupOperationFailed(DeviceId deviceId, GroupOperation operation) {
1054
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001055 StoredGroupEntry existing = getStoredGroupEntry(deviceId,
1056 operation.groupId());
alshabib10580802015-02-18 18:30:33 -08001057
1058 if (existing == null) {
1059 log.warn("No group entry with ID {} found ", operation.groupId());
1060 return;
1061 }
1062
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001063 log.warn("groupOperationFailed: group operation {} failed"
alshabibb0285992016-03-28 23:30:37 -07001064 + "for group {} in device {} with code {}",
1065 operation.opType(),
1066 existing.id(),
1067 existing.deviceId(),
1068 operation.failureCode());
Saurav Das0fd79d92016-03-07 10:58:36 -08001069 if (operation.failureCode() == GroupOperation.GroupMsgErrorCode.GROUP_EXISTS) {
Saurav Das8be4e3a2016-03-11 17:19:07 -08001070 if (operation.buckets().equals(existing.buckets())) {
Saurav Dasc88d4662017-05-15 15:34:25 -07001071 if (existing.state() == GroupState.PENDING_ADD ||
1072 existing.state() == GroupState.PENDING_ADD_RETRY) {
Saurav Das8be4e3a2016-03-11 17:19:07 -08001073 log.info("GROUP_EXISTS: GroupID and Buckets match for group in pending "
alshabibb0285992016-03-28 23:30:37 -07001074 + "add state - moving to ADDED for group {} in device {}",
1075 existing.id(), deviceId);
Saurav Das8be4e3a2016-03-11 17:19:07 -08001076 addOrUpdateGroupEntry(existing);
1077 return;
1078 } else {
Saurav Dasc88d4662017-05-15 15:34:25 -07001079 log.warn("GROUP_EXISTS: GroupId and Buckets match but existing"
1080 + "group in state: {}", existing.state());
Saurav Das8be4e3a2016-03-11 17:19:07 -08001081 }
Saurav Dasc88d4662017-05-15 15:34:25 -07001082 } else {
1083 log.warn("GROUP EXISTS: Group ID matched but buckets did not. "
1084 + "Operation: {} Existing: {}", operation.buckets(),
1085 existing.buckets());
Saurav Das8be4e3a2016-03-11 17:19:07 -08001086 }
Saurav Das0fd79d92016-03-07 10:58:36 -08001087 }
alshabib10580802015-02-18 18:30:33 -08001088 switch (operation.opType()) {
1089 case ADD:
Srikanth Vavilapalli5428b6c2015-05-14 20:22:47 -07001090 if (existing.state() == GroupState.PENDING_ADD) {
Srikanth Vavilapalli5428b6c2015-05-14 20:22:47 -07001091 notifyDelegate(new GroupEvent(Type.GROUP_ADD_FAILED, existing));
1092 log.warn("groupOperationFailed: cleaningup "
alshabibb0285992016-03-28 23:30:37 -07001093 + "group {} from store in device {}....",
1094 existing.id(),
1095 existing.deviceId());
Srikanth Vavilapalli5428b6c2015-05-14 20:22:47 -07001096 //Removal from groupid based map will happen in the
1097 //map update listener
1098 getGroupStoreKeyMap().remove(new GroupStoreKeyMapKey(existing.deviceId(),
1099 existing.appCookie()));
1100 }
alshabib10580802015-02-18 18:30:33 -08001101 break;
1102 case MODIFY:
1103 notifyDelegate(new GroupEvent(Type.GROUP_UPDATE_FAILED, existing));
1104 break;
1105 case DELETE:
1106 notifyDelegate(new GroupEvent(Type.GROUP_REMOVE_FAILED, existing));
1107 break;
1108 default:
1109 log.warn("Unknown group operation type {}", operation.opType());
1110 }
alshabib10580802015-02-18 18:30:33 -08001111 }
1112
1113 @Override
1114 public void addOrUpdateExtraneousGroupEntry(Group group) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001115 log.debug("add/update extraneous group entry {} in device {}",
alshabibb0285992016-03-28 23:30:37 -07001116 group.id(),
1117 group.deviceId());
alshabib10580802015-02-18 18:30:33 -08001118 ConcurrentMap<GroupId, Group> extraneousIdTable =
1119 getExtraneousGroupIdTable(group.deviceId());
1120 extraneousIdTable.put(group.id(), group);
Srikanth Vavilapallie48b3cf2015-07-06 11:43:07 -07001121 // Don't remove the extraneous groups, instead re-use it when
1122 // a group request comes with the same set of buckets
alshabib10580802015-02-18 18:30:33 -08001123 }
1124
1125 @Override
1126 public void removeExtraneousGroupEntry(Group group) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001127 log.debug("remove extraneous group entry {} of device {} from store",
alshabibb0285992016-03-28 23:30:37 -07001128 group.id(),
1129 group.deviceId());
alshabib10580802015-02-18 18:30:33 -08001130 ConcurrentMap<GroupId, Group> extraneousIdTable =
1131 getExtraneousGroupIdTable(group.deviceId());
1132 extraneousIdTable.remove(group.id());
1133 }
1134
1135 @Override
1136 public Iterable<Group> getExtraneousGroups(DeviceId deviceId) {
1137 // flatten and make iterator unmodifiable
1138 return FluentIterable.from(
1139 getExtraneousGroupIdTable(deviceId).values());
1140 }
1141
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001142 /**
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -07001143 * Map handler to receive any events when the group key map is updated.
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001144 */
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -07001145 private class GroupStoreKeyMapListener implements
Madan Jampani0b847532016-03-03 13:44:15 -08001146 MapEventListener<GroupStoreKeyMapKey, StoredGroupEntry> {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001147
1148 @Override
Madan Jampani0b847532016-03-03 13:44:15 -08001149 public void event(MapEvent<GroupStoreKeyMapKey, StoredGroupEntry> mapEvent) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001150 GroupEvent groupEvent = null;
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001151 GroupStoreKeyMapKey key = mapEvent.key();
Madan Jampani0b847532016-03-03 13:44:15 -08001152 StoredGroupEntry group = Versioned.valueOrNull(mapEvent.newValue());
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001153 if ((key == null) && (group == null)) {
1154 log.error("GroupStoreKeyMapListener: Received "
alshabibb0285992016-03-28 23:30:37 -07001155 + "event {} with null entry", mapEvent.type());
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001156 return;
1157 } else if (group == null) {
1158 group = getGroupIdTable(key.deviceId()).values()
1159 .stream()
1160 .filter((storedGroup) -> (storedGroup.appCookie().equals(key.appCookie)))
Yuta HIGUCHI6e5f4702016-11-21 11:42:11 -08001161 .findFirst().orElse(null);
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001162 if (group == null) {
1163 log.error("GroupStoreKeyMapListener: Received "
alshabibb0285992016-03-28 23:30:37 -07001164 + "event {} with null entry... can not process", mapEvent.type());
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001165 return;
1166 }
1167 }
1168 log.trace("received groupid map event {} for id {} in device {}",
1169 mapEvent.type(),
1170 group.id(),
jaegonkim68e080c2016-12-01 22:31:01 +09001171 (key != null ? key.deviceId() : null));
Madan Jampani0b847532016-03-03 13:44:15 -08001172 if (mapEvent.type() == MapEvent.Type.INSERT || mapEvent.type() == MapEvent.Type.UPDATE) {
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -07001173 // Update the group ID table
1174 getGroupIdTable(group.deviceId()).put(group.id(), group);
Madan Jampani0b847532016-03-03 13:44:15 -08001175 StoredGroupEntry value = Versioned.valueOrNull(mapEvent.newValue());
1176 if (value.state() == Group.GroupState.ADDED) {
1177 if (value.isGroupStateAddedFirstTime()) {
1178 groupEvent = new GroupEvent(Type.GROUP_ADDED, value);
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001179 log.trace("Received first time GROUP_ADDED state update for id {} in device {}",
alshabibb0285992016-03-28 23:30:37 -07001180 group.id(),
1181 group.deviceId());
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001182 } else {
Madan Jampani0b847532016-03-03 13:44:15 -08001183 groupEvent = new GroupEvent(Type.GROUP_UPDATED, value);
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001184 log.trace("Received following GROUP_ADDED state update for id {} in device {}",
alshabibb0285992016-03-28 23:30:37 -07001185 group.id(),
1186 group.deviceId());
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001187 }
1188 }
Madan Jampani0b847532016-03-03 13:44:15 -08001189 } else if (mapEvent.type() == MapEvent.Type.REMOVE) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001190 groupEvent = new GroupEvent(Type.GROUP_REMOVED, group);
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -07001191 // Remove the entry from the group ID table
1192 getGroupIdTable(group.deviceId()).remove(group.id(), group);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001193 }
1194
1195 if (groupEvent != null) {
1196 notifyDelegate(groupEvent);
1197 }
1198 }
1199 }
Madan Jampani01e05fb2015-08-13 13:29:36 -07001200
helenyrwua1c41152016-08-18 16:16:14 -07001201 private void processGroupMessage(GroupStoreMessage message) {
1202 if (message.type() == GroupStoreMessage.Type.FAILOVER) {
1203 // FIXME: groupStoreEntriesByKey inaccessible here
1204 getGroupIdTable(message.deviceId()).values()
1205 .stream()
1206 .filter((storedGroup) -> (storedGroup.appCookie().equals(message.appCookie())))
1207 .findFirst().ifPresent(group -> notifyDelegate(new GroupEvent(Type.GROUP_BUCKET_FAILOVER, group)));
1208 }
1209 }
1210
Madan Jampani01e05fb2015-08-13 13:29:36 -07001211 private void process(GroupStoreMessage groupOp) {
1212 log.debug("Received remote group operation {} request for device {}",
alshabibb0285992016-03-28 23:30:37 -07001213 groupOp.type(),
1214 groupOp.deviceId());
1215 if (!mastershipService.isLocalMaster(groupOp.deviceId())) {
1216 log.warn("This node is not MASTER for device {}", groupOp.deviceId());
1217 return;
1218 }
1219 if (groupOp.type() == GroupStoreMessage.Type.ADD) {
1220 storeGroupDescriptionInternal(groupOp.groupDesc());
1221 } else if (groupOp.type() == GroupStoreMessage.Type.UPDATE) {
1222 updateGroupDescriptionInternal(groupOp.deviceId(),
1223 groupOp.appCookie(),
1224 groupOp.updateType(),
1225 groupOp.updateBuckets(),
1226 groupOp.newAppCookie());
1227 } else if (groupOp.type() == GroupStoreMessage.Type.DELETE) {
1228 deleteGroupDescriptionInternal(groupOp.deviceId(),
1229 groupOp.appCookie());
1230 }
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001231 }
1232
1233 /**
1234 * Flattened map key to be used to store group entries.
1235 */
Ray Milkeyb3c5ce22015-08-10 09:07:36 -07001236 protected static class GroupStoreMapKey {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001237 private final DeviceId deviceId;
1238
1239 public GroupStoreMapKey(DeviceId deviceId) {
1240 this.deviceId = deviceId;
1241 }
1242
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001243 public DeviceId deviceId() {
1244 return deviceId;
1245 }
1246
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001247 @Override
1248 public boolean equals(Object o) {
1249 if (this == o) {
1250 return true;
1251 }
1252 if (!(o instanceof GroupStoreMapKey)) {
1253 return false;
1254 }
1255 GroupStoreMapKey that = (GroupStoreMapKey) o;
1256 return this.deviceId.equals(that.deviceId);
1257 }
1258
1259 @Override
1260 public int hashCode() {
1261 int result = 17;
1262
1263 result = 31 * result + Objects.hash(this.deviceId);
1264
1265 return result;
1266 }
1267 }
1268
Ray Milkeyb3c5ce22015-08-10 09:07:36 -07001269 protected static class GroupStoreKeyMapKey extends GroupStoreMapKey {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001270 private final GroupKey appCookie;
alshabibb0285992016-03-28 23:30:37 -07001271
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001272 public GroupStoreKeyMapKey(DeviceId deviceId,
1273 GroupKey appCookie) {
1274 super(deviceId);
1275 this.appCookie = appCookie;
1276 }
1277
1278 @Override
1279 public boolean equals(Object o) {
1280 if (this == o) {
1281 return true;
1282 }
1283 if (!(o instanceof GroupStoreKeyMapKey)) {
1284 return false;
1285 }
1286 GroupStoreKeyMapKey that = (GroupStoreKeyMapKey) o;
1287 return (super.equals(that) &&
1288 this.appCookie.equals(that.appCookie));
1289 }
1290
1291 @Override
1292 public int hashCode() {
1293 int result = 17;
1294
1295 result = 31 * result + super.hashCode() + Objects.hash(this.appCookie);
1296
1297 return result;
1298 }
1299 }
1300
Ray Milkeyb3c5ce22015-08-10 09:07:36 -07001301 protected static class GroupStoreIdMapKey extends GroupStoreMapKey {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001302 private final GroupId groupId;
alshabibb0285992016-03-28 23:30:37 -07001303
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001304 public GroupStoreIdMapKey(DeviceId deviceId,
1305 GroupId groupId) {
1306 super(deviceId);
1307 this.groupId = groupId;
1308 }
1309
1310 @Override
1311 public boolean equals(Object o) {
1312 if (this == o) {
1313 return true;
1314 }
1315 if (!(o instanceof GroupStoreIdMapKey)) {
1316 return false;
1317 }
1318 GroupStoreIdMapKey that = (GroupStoreIdMapKey) o;
1319 return (super.equals(that) &&
1320 this.groupId.equals(that.groupId));
1321 }
1322
1323 @Override
1324 public int hashCode() {
1325 int result = 17;
1326
1327 result = 31 * result + super.hashCode() + Objects.hash(this.groupId);
1328
1329 return result;
1330 }
1331 }
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001332
1333 @Override
1334 public void pushGroupMetrics(DeviceId deviceId,
1335 Collection<Group> groupEntries) {
1336 boolean deviceInitialAuditStatus =
1337 deviceInitialAuditStatus(deviceId);
1338 Set<Group> southboundGroupEntries =
1339 Sets.newHashSet(groupEntries);
1340 Set<StoredGroupEntry> storedGroupEntries =
1341 Sets.newHashSet(getStoredGroups(deviceId));
1342 Set<Group> extraneousStoredEntries =
1343 Sets.newHashSet(getExtraneousGroups(deviceId));
1344
Sho SHIMIZU695bac62016-08-15 12:41:59 -07001345 if (log.isTraceEnabled()) {
1346 log.trace("pushGroupMetrics: Displaying all ({}) southboundGroupEntries for device {}",
1347 southboundGroupEntries.size(),
1348 deviceId);
1349 for (Group group : southboundGroupEntries) {
1350 log.trace("Group {} in device {}", group, deviceId);
1351 }
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001352
Sho SHIMIZU695bac62016-08-15 12:41:59 -07001353 log.trace("Displaying all ({}) stored group entries for device {}",
1354 storedGroupEntries.size(),
1355 deviceId);
1356 for (StoredGroupEntry group : storedGroupEntries) {
1357 log.trace("Stored Group {} for device {}", group, deviceId);
1358 }
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001359 }
1360
alshabibb0285992016-03-28 23:30:37 -07001361 garbageCollect(deviceId, southboundGroupEntries, storedGroupEntries);
1362
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001363 for (Iterator<Group> it2 = southboundGroupEntries.iterator(); it2.hasNext();) {
1364 Group group = it2.next();
1365 if (storedGroupEntries.remove(group)) {
1366 // we both have the group, let's update some info then.
1367 log.trace("Group AUDIT: group {} exists in both planes for device {}",
alshabibb0285992016-03-28 23:30:37 -07001368 group.id(), deviceId);
1369
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001370 groupAdded(group);
1371 it2.remove();
1372 }
1373 }
1374 for (Group group : southboundGroupEntries) {
1375 if (getGroup(group.deviceId(), group.id()) != null) {
1376 // There is a group existing with the same id
1377 // It is possible that group update is
1378 // in progress while we got a stale info from switch
1379 if (!storedGroupEntries.remove(getGroup(
alshabibb0285992016-03-28 23:30:37 -07001380 group.deviceId(), group.id()))) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001381 log.warn("Group AUDIT: Inconsistent state:"
alshabibb0285992016-03-28 23:30:37 -07001382 + "Group exists in ID based table while "
1383 + "not present in key based table");
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001384 }
1385 } else {
1386 // there are groups in the switch that aren't in the store
1387 log.debug("Group AUDIT: extraneous group {} exists in data plane for device {}",
alshabibb0285992016-03-28 23:30:37 -07001388 group.id(), deviceId);
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001389 extraneousStoredEntries.remove(group);
Kavitha Alagesanc56cded2017-01-13 10:48:18 +05301390 if (allowExtraneousGroups) {
1391 extraneousGroup(group);
1392 } else {
1393 notifyDelegate(new GroupEvent(Type.GROUP_REMOVE_REQUESTED, group));
1394 }
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001395 }
1396 }
1397 for (Group group : storedGroupEntries) {
1398 // there are groups in the store that aren't in the switch
1399 log.debug("Group AUDIT: group {} missing in data plane for device {}",
alshabibb0285992016-03-28 23:30:37 -07001400 group.id(), deviceId);
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001401 groupMissing(group);
1402 }
1403 for (Group group : extraneousStoredEntries) {
1404 // there are groups in the extraneous store that
1405 // aren't in the switch
Saurav Das0fd79d92016-03-07 10:58:36 -08001406 log.debug("Group AUDIT: clearing extraneous group {} from store for device {}",
alshabibb0285992016-03-28 23:30:37 -07001407 group.id(), deviceId);
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001408 removeExtraneousGroupEntry(group);
1409 }
1410
1411 if (!deviceInitialAuditStatus) {
Saurav Das0fd79d92016-03-07 10:58:36 -08001412 log.info("Group AUDIT: Setting device {} initial AUDIT completed",
alshabibb0285992016-03-28 23:30:37 -07001413 deviceId);
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001414 deviceInitialAuditCompleted(deviceId, true);
1415 }
1416 }
1417
helenyrwu89470f12016-08-12 13:18:10 -07001418 @Override
1419 public void notifyOfFailovers(Collection<Group> failoverGroups) {
helenyrwu89470f12016-08-12 13:18:10 -07001420 failoverGroups.forEach(group -> {
1421 if (group.type() == Group.Type.FAILOVER) {
helenyrwua1c41152016-08-18 16:16:14 -07001422 groupTopic.publish(GroupStoreMessage.createGroupFailoverMsg(
1423 group.deviceId(), group));
helenyrwu89470f12016-08-12 13:18:10 -07001424 }
1425 });
helenyrwu89470f12016-08-12 13:18:10 -07001426 }
1427
alshabibb0285992016-03-28 23:30:37 -07001428 private void garbageCollect(DeviceId deviceId,
1429 Set<Group> southboundGroupEntries,
1430 Set<StoredGroupEntry> storedGroupEntries) {
1431 if (!garbageCollect) {
1432 return;
1433 }
1434
1435 Iterator<StoredGroupEntry> it = storedGroupEntries.iterator();
1436 while (it.hasNext()) {
1437 StoredGroupEntry group = it.next();
1438 if (group.state() != GroupState.PENDING_DELETE && checkGroupRefCount(group)) {
1439 log.debug("Garbage collecting group {} on {}", group, deviceId);
1440 deleteGroupDescription(deviceId, group.appCookie());
1441 southboundGroupEntries.remove(group);
1442 it.remove();
1443 }
1444 }
1445 }
1446
1447 private boolean checkGroupRefCount(Group group) {
1448 return (group.referenceCount() == 0 && group.age() >= gcThresh);
1449 }
1450
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001451 private void groupMissing(Group group) {
1452 switch (group.state()) {
1453 case PENDING_DELETE:
1454 log.debug("Group {} delete confirmation from device {}",
1455 group, group.deviceId());
1456 removeGroupEntry(group);
1457 break;
1458 case ADDED:
1459 case PENDING_ADD:
Srikanth Vavilapalli5428b6c2015-05-14 20:22:47 -07001460 case PENDING_ADD_RETRY:
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001461 case PENDING_UPDATE:
1462 log.debug("Group {} is in store but not on device {}",
1463 group, group.deviceId());
1464 StoredGroupEntry existing =
1465 getStoredGroupEntry(group.deviceId(), group.id());
Srikanth Vavilapalli5428b6c2015-05-14 20:22:47 -07001466 log.debug("groupMissing: group entry {} in device {} moving from {} to PENDING_ADD_RETRY",
alshabibb0285992016-03-28 23:30:37 -07001467 existing.id(),
1468 existing.deviceId(),
1469 existing.state());
Srikanth Vavilapalli5428b6c2015-05-14 20:22:47 -07001470 existing.setState(Group.GroupState.PENDING_ADD_RETRY);
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001471 //Re-PUT map entries to trigger map update events
1472 getGroupStoreKeyMap().
alshabibb0285992016-03-28 23:30:37 -07001473 put(new GroupStoreKeyMapKey(existing.deviceId(),
1474 existing.appCookie()), existing);
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001475 notifyDelegate(new GroupEvent(GroupEvent.Type.GROUP_ADD_REQUESTED,
1476 group));
1477 break;
1478 default:
1479 log.debug("Group {} has not been installed.", group);
1480 break;
1481 }
1482 }
1483
1484 private void extraneousGroup(Group group) {
Saurav Das0fd79d92016-03-07 10:58:36 -08001485 log.trace("Group {} is on device {} but not in store.",
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001486 group, group.deviceId());
1487 addOrUpdateExtraneousGroupEntry(group);
1488 }
1489
1490 private void groupAdded(Group group) {
1491 log.trace("Group {} Added or Updated in device {}",
1492 group, group.deviceId());
1493 addOrUpdateGroupEntry(group);
1494 }
alshabib10580802015-02-18 18:30:33 -08001495}