blob: 13350875d5331a90b5e4982921e55f9f3404c50d [file] [log] [blame]
alshabib10580802015-02-18 18:30:33 -08001/*
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2015-present Open Networking Foundation
alshabib10580802015-02-18 18:30:33 -08003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.group.impl;
17
Jonathan Hart6ec029a2015-03-24 17:12:35 -070018import com.google.common.collect.FluentIterable;
Charles Chanf4838a72015-12-07 18:13:45 -080019import com.google.common.collect.ImmutableSet;
Jonathan Hart6ec029a2015-03-24 17:12:35 -070020import com.google.common.collect.Iterables;
Srikanth Vavilapalli23181912015-05-04 09:48:09 -070021import com.google.common.collect.Sets;
alshabib10580802015-02-18 18:30:33 -080022import org.apache.felix.scr.annotations.Activate;
23import org.apache.felix.scr.annotations.Component;
24import org.apache.felix.scr.annotations.Deactivate;
alshabibb0285992016-03-28 23:30:37 -070025import org.apache.felix.scr.annotations.Modified;
26import org.apache.felix.scr.annotations.Property;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070027import org.apache.felix.scr.annotations.Reference;
28import org.apache.felix.scr.annotations.ReferenceCardinality;
alshabib10580802015-02-18 18:30:33 -080029import org.apache.felix.scr.annotations.Service;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070030import org.onlab.util.KryoNamespace;
alshabibb0285992016-03-28 23:30:37 -070031import org.onosproject.cfg.ComponentConfigService;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070032import org.onosproject.cluster.ClusterService;
Charles Chanf4838a72015-12-07 18:13:45 -080033import org.onosproject.cluster.NodeId;
alshabib10580802015-02-18 18:30:33 -080034import org.onosproject.core.GroupId;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070035import org.onosproject.mastership.MastershipService;
alshabib10580802015-02-18 18:30:33 -080036import org.onosproject.net.DeviceId;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070037import org.onosproject.net.MastershipRole;
alshabib10580802015-02-18 18:30:33 -080038import org.onosproject.net.group.DefaultGroup;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070039import org.onosproject.net.group.DefaultGroupBucket;
alshabib10580802015-02-18 18:30:33 -080040import org.onosproject.net.group.DefaultGroupDescription;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070041import org.onosproject.net.group.DefaultGroupKey;
alshabib10580802015-02-18 18:30:33 -080042import org.onosproject.net.group.Group;
43import org.onosproject.net.group.Group.GroupState;
44import org.onosproject.net.group.GroupBucket;
45import org.onosproject.net.group.GroupBuckets;
46import org.onosproject.net.group.GroupDescription;
47import org.onosproject.net.group.GroupEvent;
48import org.onosproject.net.group.GroupEvent.Type;
49import org.onosproject.net.group.GroupKey;
50import org.onosproject.net.group.GroupOperation;
51import org.onosproject.net.group.GroupStore;
52import org.onosproject.net.group.GroupStoreDelegate;
Srikanth Vavilapalli10e75cd2015-04-13 16:21:24 -070053import org.onosproject.net.group.StoredGroupBucketEntry;
alshabib10580802015-02-18 18:30:33 -080054import org.onosproject.net.group.StoredGroupEntry;
55import org.onosproject.store.AbstractStore;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070056import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070057import org.onosproject.store.serializers.KryoNamespaces;
Madan Jampani0b847532016-03-03 13:44:15 -080058import org.onosproject.store.service.ConsistentMap;
59import org.onosproject.store.service.MapEvent;
60import org.onosproject.store.service.MapEventListener;
alshabibb0285992016-03-28 23:30:37 -070061import org.onosproject.store.service.MultiValuedTimestamp;
Madan Jampani0b847532016-03-03 13:44:15 -080062import org.onosproject.store.service.Serializer;
Jonathan Hart6ec029a2015-03-24 17:12:35 -070063import org.onosproject.store.service.StorageService;
helenyrwua1c41152016-08-18 16:16:14 -070064import org.onosproject.store.service.Topic;
Madan Jampani0b847532016-03-03 13:44:15 -080065import org.onosproject.store.service.Versioned;
Kavitha Alagesanc884c3ef2017-01-19 12:32:26 +053066import org.onosproject.store.service.DistributedPrimitive.Status;
alshabibb0285992016-03-28 23:30:37 -070067import org.osgi.service.component.ComponentContext;
alshabib10580802015-02-18 18:30:33 -080068import org.slf4j.Logger;
69
Jonathan Hart6ec029a2015-03-24 17:12:35 -070070import java.util.ArrayList;
Srikanth Vavilapalli23181912015-05-04 09:48:09 -070071import java.util.Collection;
Charles Chanf4838a72015-12-07 18:13:45 -080072import java.util.Collections;
alshabibb0285992016-03-28 23:30:37 -070073import java.util.Dictionary;
Jonathan Hart6ec029a2015-03-24 17:12:35 -070074import java.util.HashMap;
Charles Chan0c7c43b2016-01-14 17:39:20 -080075import java.util.HashSet;
Srikanth Vavilapalli23181912015-05-04 09:48:09 -070076import java.util.Iterator;
Jonathan Hart6ec029a2015-03-24 17:12:35 -070077import java.util.List;
Madan Jampani0b847532016-03-03 13:44:15 -080078import java.util.Map;
Charles Chan0c7c43b2016-01-14 17:39:20 -080079import java.util.Map.Entry;
Jonathan Hart6ec029a2015-03-24 17:12:35 -070080import java.util.Objects;
Sho SHIMIZU30d639b2015-05-05 09:30:35 -070081import java.util.Optional;
alshabibb0285992016-03-28 23:30:37 -070082import java.util.Properties;
Srikanth Vavilapalli23181912015-05-04 09:48:09 -070083import java.util.Set;
Jonathan Hart6ec029a2015-03-24 17:12:35 -070084import java.util.concurrent.ConcurrentHashMap;
85import java.util.concurrent.ConcurrentMap;
86import java.util.concurrent.ExecutorService;
87import java.util.concurrent.Executors;
Kavitha Alagesanc884c3ef2017-01-19 12:32:26 +053088import java.util.concurrent.ScheduledExecutorService;
Jonathan Hart6ec029a2015-03-24 17:12:35 -070089import java.util.concurrent.atomic.AtomicInteger;
Kavitha Alagesanc884c3ef2017-01-19 12:32:26 +053090import java.util.function.Consumer;
Jonathan Hart6ec029a2015-03-24 17:12:35 -070091import java.util.stream.Collectors;
92
alshabibb0285992016-03-28 23:30:37 -070093import static com.google.common.base.Strings.isNullOrEmpty;
Kavitha Alagesanc884c3ef2017-01-19 12:32:26 +053094import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
alshabibb0285992016-03-28 23:30:37 -070095import static org.onlab.util.Tools.get;
Jonathan Hart6ec029a2015-03-24 17:12:35 -070096import static org.onlab.util.Tools.groupedThreads;
97import static org.slf4j.LoggerFactory.getLogger;
alshabib10580802015-02-18 18:30:33 -080098
99/**
Saurav Das0fd79d92016-03-07 10:58:36 -0800100 * Manages inventory of group entries using distributed group stores from the
101 * storage service.
alshabib10580802015-02-18 18:30:33 -0800102 */
103@Component(immediate = true)
104@Service
105public class DistributedGroupStore
106 extends AbstractStore<GroupEvent, GroupStoreDelegate>
107 implements GroupStore {
108
109 private final Logger log = getLogger(getClass());
110
alshabibb0285992016-03-28 23:30:37 -0700111 private static final boolean GARBAGE_COLLECT = false;
112 private static final int GC_THRESH = 6;
Kavitha Alagesanc56cded2017-01-13 10:48:18 +0530113 private static final boolean ALLOW_EXTRANEOUS_GROUPS = true;
alshabibb0285992016-03-28 23:30:37 -0700114
alshabib10580802015-02-18 18:30:33 -0800115 private final int dummyId = 0xffffffff;
Yi Tsengfa394de2017-02-01 11:26:40 -0800116 private final GroupId dummyGroupId = new GroupId(dummyId);
alshabib10580802015-02-18 18:30:33 -0800117
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700118 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
119 protected ClusterCommunicationService clusterCommunicator;
120
121 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
122 protected ClusterService clusterService;
123
124 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700125 protected StorageService storageService;
126
127 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700128 protected MastershipService mastershipService;
129
alshabibb0285992016-03-28 23:30:37 -0700130 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
131 protected ComponentConfigService cfgService;
132
Kavitha Alagesanc884c3ef2017-01-19 12:32:26 +0530133 private ScheduledExecutorService executor;
134 private Consumer<Status> statusChangeListener;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700135 // Per device group table with (device id + app cookie) as key
Madan Jampani0b847532016-03-03 13:44:15 -0800136 private ConsistentMap<GroupStoreKeyMapKey,
alshabibb0285992016-03-28 23:30:37 -0700137 StoredGroupEntry> groupStoreEntriesByKey = null;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700138 // Per device group table with (device id + group id) as key
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700139 private final ConcurrentMap<DeviceId, ConcurrentMap<GroupId, StoredGroupEntry>>
alshabibb0285992016-03-28 23:30:37 -0700140 groupEntriesById = new ConcurrentHashMap<>();
Madan Jampani0b847532016-03-03 13:44:15 -0800141 private ConsistentMap<GroupStoreKeyMapKey,
alshabibb0285992016-03-28 23:30:37 -0700142 StoredGroupEntry> auditPendingReqQueue = null;
Frank Wange0eb5ce2016-07-01 18:21:25 +0800143 private MapEventListener<GroupStoreKeyMapKey, StoredGroupEntry>
144 mapListener = new GroupStoreKeyMapListener();
alshabib10580802015-02-18 18:30:33 -0800145 private final ConcurrentMap<DeviceId, ConcurrentMap<GroupId, Group>>
146 extraneousGroupEntriesById = new ConcurrentHashMap<>();
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700147 private ExecutorService messageHandlingExecutor;
148 private static final int MESSAGE_HANDLER_THREAD_POOL_SIZE = 1;
Sho SHIMIZU7a4087b2015-09-10 09:23:16 -0700149 private final HashMap<DeviceId, Boolean> deviceAuditStatus = new HashMap<>();
alshabib10580802015-02-18 18:30:33 -0800150
151 private final AtomicInteger groupIdGen = new AtomicInteger();
152
HIGUCHI Yuta180d70f2015-10-01 16:13:56 -0700153 private KryoNamespace clusterMsgSerializer;
154
helenyrwua1c41152016-08-18 16:16:14 -0700155 private static Topic<GroupStoreMessage> groupTopic;
156
alshabibb0285992016-03-28 23:30:37 -0700157 @Property(name = "garbageCollect", boolValue = GARBAGE_COLLECT,
158 label = "Enable group garbage collection")
159 private boolean garbageCollect = GARBAGE_COLLECT;
160
161 @Property(name = "gcThresh", intValue = GC_THRESH,
162 label = "Number of rounds for group garbage collection")
163 private int gcThresh = GC_THRESH;
164
Kavitha Alagesanc56cded2017-01-13 10:48:18 +0530165 @Property(name = "allowExtraneousGroups", boolValue = ALLOW_EXTRANEOUS_GROUPS,
166 label = "Allow groups in switches not installed by ONOS")
167 private boolean allowExtraneousGroups = ALLOW_EXTRANEOUS_GROUPS;
alshabibb0285992016-03-28 23:30:37 -0700168
alshabib10580802015-02-18 18:30:33 -0800169 @Activate
sisubram4beea652017-08-09 10:38:14 +0000170 public void activate(ComponentContext context) {
alshabibb0285992016-03-28 23:30:37 -0700171 cfgService.registerProperties(getClass());
sisubram4beea652017-08-09 10:38:14 +0000172 modified(context);
HIGUCHI Yuta3a84b322016-05-18 13:38:07 -0700173 KryoNamespace.Builder kryoBuilder = new KryoNamespace.Builder()
alshabibb0285992016-03-28 23:30:37 -0700174 .register(KryoNamespaces.API)
HIGUCHI Yuta3a84b322016-05-18 13:38:07 -0700175 .nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID)
alshabibb0285992016-03-28 23:30:37 -0700176 .register(DefaultGroup.class,
177 DefaultGroupBucket.class,
178 DefaultGroupDescription.class,
179 DefaultGroupKey.class,
180 GroupDescription.Type.class,
181 Group.GroupState.class,
182 GroupBuckets.class,
alshabibb0285992016-03-28 23:30:37 -0700183 GroupStoreMessage.class,
184 GroupStoreMessage.Type.class,
185 UpdateType.class,
186 GroupStoreMessageSubjects.class,
187 MultiValuedTimestamp.class,
188 GroupStoreKeyMapKey.class,
189 GroupStoreIdMapKey.class,
190 GroupStoreMapKey.class
191 );
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700192
HIGUCHI Yuta3a84b322016-05-18 13:38:07 -0700193 clusterMsgSerializer = kryoBuilder.build("GroupStore");
194 Serializer serializer = Serializer.using(clusterMsgSerializer);
HIGUCHI Yuta180d70f2015-10-01 16:13:56 -0700195
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700196 messageHandlingExecutor = Executors.
197 newFixedThreadPool(MESSAGE_HANDLER_THREAD_POOL_SIZE,
198 groupedThreads("onos/store/group",
HIGUCHI Yutad9e01052016-04-14 09:31:42 -0700199 "message-handlers",
200 log));
Madan Jampani01e05fb2015-08-13 13:29:36 -0700201
202 clusterCommunicator.addSubscriber(GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST,
alshabibb0285992016-03-28 23:30:37 -0700203 clusterMsgSerializer::deserialize,
204 this::process,
205 messageHandlingExecutor);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700206
Madan Jampani0b847532016-03-03 13:44:15 -0800207 log.debug("Creating Consistent map onos-group-store-keymap");
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700208
Madan Jampani0b847532016-03-03 13:44:15 -0800209 groupStoreEntriesByKey = storageService.<GroupStoreKeyMapKey, StoredGroupEntry>consistentMapBuilder()
210 .withName("onos-group-store-keymap")
HIGUCHI Yuta3a84b322016-05-18 13:38:07 -0700211 .withSerializer(serializer)
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700212 .build();
Frank Wange0eb5ce2016-07-01 18:21:25 +0800213 groupStoreEntriesByKey.addListener(mapListener);
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700214 log.debug("Current size of groupstorekeymap:{}",
215 groupStoreEntriesByKey.size());
Thiago Santosfb73c502016-08-18 18:15:13 -0300216 synchronizeGroupStoreEntries();
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700217
Kavitha Alagesanc884c3ef2017-01-19 12:32:26 +0530218 log.debug("Creating GroupStoreId Map From GroupStoreKey Map");
219 matchGroupEntries();
220 executor = newSingleThreadScheduledExecutor(groupedThreads("onos/group", "store", log));
221 statusChangeListener = status -> {
222 if (status == Status.ACTIVE) {
223 executor.execute(this::matchGroupEntries);
224 }
225 };
226 groupStoreEntriesByKey.addStatusChangeListener(statusChangeListener);
227
Madan Jampani0b847532016-03-03 13:44:15 -0800228 log.debug("Creating Consistent map pendinggroupkeymap");
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700229
Madan Jampani0b847532016-03-03 13:44:15 -0800230 auditPendingReqQueue = storageService.<GroupStoreKeyMapKey, StoredGroupEntry>consistentMapBuilder()
231 .withName("onos-pending-group-keymap")
HIGUCHI Yuta3a84b322016-05-18 13:38:07 -0700232 .withSerializer(serializer)
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700233 .build();
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700234 log.debug("Current size of pendinggroupkeymap:{}",
235 auditPendingReqQueue.size());
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700236
helenyrwua1c41152016-08-18 16:16:14 -0700237 groupTopic = getOrCreateGroupTopic(serializer);
238 groupTopic.subscribe(this::processGroupMessage);
239
alshabib10580802015-02-18 18:30:33 -0800240 log.info("Started");
241 }
242
243 @Deactivate
244 public void deactivate() {
Frank Wange0eb5ce2016-07-01 18:21:25 +0800245 groupStoreEntriesByKey.removeListener(mapListener);
alshabibb0285992016-03-28 23:30:37 -0700246 cfgService.unregisterProperties(getClass(), false);
HIGUCHI Yuta180d70f2015-10-01 16:13:56 -0700247 clusterCommunicator.removeSubscriber(GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST);
alshabib10580802015-02-18 18:30:33 -0800248 log.info("Stopped");
249 }
250
alshabibb0285992016-03-28 23:30:37 -0700251 @Modified
252 public void modified(ComponentContext context) {
253 Dictionary<?, ?> properties = context != null ? context.getProperties() : new Properties();
254
255 try {
256 String s = get(properties, "garbageCollect");
257 garbageCollect = isNullOrEmpty(s) ? GARBAGE_COLLECT : Boolean.parseBoolean(s.trim());
258
259 s = get(properties, "gcThresh");
260 gcThresh = isNullOrEmpty(s) ? GC_THRESH : Integer.parseInt(s.trim());
Kavitha Alagesanc56cded2017-01-13 10:48:18 +0530261
262 s = get(properties, "allowExtraneousGroups");
263 allowExtraneousGroups = isNullOrEmpty(s) ? ALLOW_EXTRANEOUS_GROUPS : Boolean.parseBoolean(s.trim());
alshabibb0285992016-03-28 23:30:37 -0700264 } catch (Exception e) {
265 gcThresh = GC_THRESH;
266 garbageCollect = GARBAGE_COLLECT;
Kavitha Alagesanc56cded2017-01-13 10:48:18 +0530267 allowExtraneousGroups = ALLOW_EXTRANEOUS_GROUPS;
alshabibb0285992016-03-28 23:30:37 -0700268 }
269 }
270
helenyrwua1c41152016-08-18 16:16:14 -0700271 private Topic<GroupStoreMessage> getOrCreateGroupTopic(Serializer serializer) {
272 if (groupTopic == null) {
273 return storageService.getTopic("group-failover-notif", serializer);
274 } else {
275 return groupTopic;
276 }
Sho SHIMIZUa6285542017-01-12 15:08:24 -0800277 }
helenyrwua1c41152016-08-18 16:16:14 -0700278
alshabib10580802015-02-18 18:30:33 -0800279 /**
Kavitha Alagesanc884c3ef2017-01-19 12:32:26 +0530280 * Updating values of groupEntriesById.
281 */
282 private void matchGroupEntries() {
283 for (Entry<GroupStoreKeyMapKey, StoredGroupEntry> entry : groupStoreEntriesByKey.asJavaMap().entrySet()) {
284 StoredGroupEntry group = entry.getValue();
285 getGroupIdTable(entry.getKey().deviceId()).put(group.id(), group);
286 }
287 }
288
Thiago Santosfb73c502016-08-18 18:15:13 -0300289
290 private void synchronizeGroupStoreEntries() {
291 Map<GroupStoreKeyMapKey, StoredGroupEntry> groupEntryMap = groupStoreEntriesByKey.asJavaMap();
292 for (Entry<GroupStoreKeyMapKey, StoredGroupEntry> entry : groupEntryMap.entrySet()) {
Thiago Santosfb73c502016-08-18 18:15:13 -0300293 StoredGroupEntry value = entry.getValue();
Thiago Santosfb73c502016-08-18 18:15:13 -0300294 ConcurrentMap<GroupId, StoredGroupEntry> groupIdTable = getGroupIdTable(value.deviceId());
295 groupIdTable.put(value.id(), value);
296 }
297 }
298
Kavitha Alagesanc884c3ef2017-01-19 12:32:26 +0530299 /**
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700300 * Returns the group store eventual consistent key map.
alshabib10580802015-02-18 18:30:33 -0800301 *
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700302 * @return Map representing group key table.
alshabib10580802015-02-18 18:30:33 -0800303 */
Madan Jampani0b847532016-03-03 13:44:15 -0800304 private Map<GroupStoreKeyMapKey, StoredGroupEntry>
alshabibb0285992016-03-28 23:30:37 -0700305 getGroupStoreKeyMap() {
Madan Jampani0b847532016-03-03 13:44:15 -0800306 return groupStoreEntriesByKey.asJavaMap();
alshabib10580802015-02-18 18:30:33 -0800307 }
308
309 /**
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700310 * Returns the group id table for specified device.
alshabib10580802015-02-18 18:30:33 -0800311 *
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700312 * @param deviceId identifier of the device
313 * @return Map representing group key table of given device.
alshabib10580802015-02-18 18:30:33 -0800314 */
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700315 private ConcurrentMap<GroupId, StoredGroupEntry> getGroupIdTable(DeviceId deviceId) {
Yuta HIGUCHIc2e68152016-08-16 13:47:36 -0700316 return groupEntriesById.computeIfAbsent(deviceId, k -> new ConcurrentHashMap<>());
alshabib10580802015-02-18 18:30:33 -0800317 }
318
319 /**
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700320 * Returns the pending group request table.
alshabib10580802015-02-18 18:30:33 -0800321 *
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700322 * @return Map representing group key table.
alshabib10580802015-02-18 18:30:33 -0800323 */
Madan Jampani0b847532016-03-03 13:44:15 -0800324 private Map<GroupStoreKeyMapKey, StoredGroupEntry>
alshabibb0285992016-03-28 23:30:37 -0700325 getPendingGroupKeyTable() {
Madan Jampani0b847532016-03-03 13:44:15 -0800326 return auditPendingReqQueue.asJavaMap();
alshabib10580802015-02-18 18:30:33 -0800327 }
328
329 /**
330 * Returns the extraneous group id table for specified device.
331 *
332 * @param deviceId identifier of the device
333 * @return Map representing group key table of given device.
334 */
335 private ConcurrentMap<GroupId, Group>
336 getExtraneousGroupIdTable(DeviceId deviceId) {
Yuta HIGUCHIc2e68152016-08-16 13:47:36 -0700337 return extraneousGroupEntriesById.computeIfAbsent(deviceId, k -> new ConcurrentHashMap<>());
alshabib10580802015-02-18 18:30:33 -0800338 }
339
340 /**
341 * Returns the number of groups for the specified device in the store.
342 *
343 * @return number of groups for the specified device
344 */
345 @Override
346 public int getGroupCount(DeviceId deviceId) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700347 return (getGroups(deviceId) != null) ?
alshabibb0285992016-03-28 23:30:37 -0700348 Iterables.size(getGroups(deviceId)) : 0;
alshabib10580802015-02-18 18:30:33 -0800349 }
350
351 /**
352 * Returns the groups associated with a device.
353 *
354 * @param deviceId the device ID
alshabib10580802015-02-18 18:30:33 -0800355 * @return the group entries
356 */
357 @Override
358 public Iterable<Group> getGroups(DeviceId deviceId) {
Charles Chanf4838a72015-12-07 18:13:45 -0800359 // Let ImmutableSet.copyOf do the type conversion
360 return ImmutableSet.copyOf(getStoredGroups(deviceId));
alshabib10580802015-02-18 18:30:33 -0800361 }
362
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700363 private Iterable<StoredGroupEntry> getStoredGroups(DeviceId deviceId) {
Charles Chanf4838a72015-12-07 18:13:45 -0800364 NodeId master = mastershipService.getMasterFor(deviceId);
365 if (master == null) {
366 log.debug("Failed to getGroups: No master for {}", deviceId);
367 return Collections.emptySet();
368 }
369
370 Set<StoredGroupEntry> storedGroups = getGroupStoreKeyMap().values()
371 .stream()
372 .filter(input -> input.deviceId().equals(deviceId))
373 .collect(Collectors.toSet());
374 return ImmutableSet.copyOf(storedGroups);
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700375 }
376
alshabib10580802015-02-18 18:30:33 -0800377 /**
378 * Returns the stored group entry.
379 *
alshabibb0285992016-03-28 23:30:37 -0700380 * @param deviceId the device ID
alshabib10580802015-02-18 18:30:33 -0800381 * @param appCookie the group key
alshabib10580802015-02-18 18:30:33 -0800382 * @return a group associated with the key
383 */
384 @Override
385 public Group getGroup(DeviceId deviceId, GroupKey appCookie) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700386 return getStoredGroupEntry(deviceId, appCookie);
387 }
388
389 private StoredGroupEntry getStoredGroupEntry(DeviceId deviceId,
390 GroupKey appCookie) {
391 return getGroupStoreKeyMap().get(new GroupStoreKeyMapKey(deviceId,
392 appCookie));
393 }
394
395 @Override
396 public Group getGroup(DeviceId deviceId, GroupId groupId) {
397 return getStoredGroupEntry(deviceId, groupId);
398 }
399
400 private StoredGroupEntry getStoredGroupEntry(DeviceId deviceId,
401 GroupId groupId) {
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700402 return getGroupIdTable(deviceId).get(groupId);
alshabib10580802015-02-18 18:30:33 -0800403 }
404
405 private int getFreeGroupIdValue(DeviceId deviceId) {
406 int freeId = groupIdGen.incrementAndGet();
407
408 while (true) {
Yi Tsengfa394de2017-02-01 11:26:40 -0800409 Group existing = getGroup(deviceId, new GroupId(freeId));
alshabib10580802015-02-18 18:30:33 -0800410 if (existing == null) {
411 existing = (
412 extraneousGroupEntriesById.get(deviceId) != null) ?
413 extraneousGroupEntriesById.get(deviceId).
Yi Tsengfa394de2017-02-01 11:26:40 -0800414 get(new GroupId(freeId)) :
alshabib10580802015-02-18 18:30:33 -0800415 null;
416 }
417 if (existing != null) {
418 freeId = groupIdGen.incrementAndGet();
419 } else {
420 break;
421 }
422 }
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700423 log.debug("getFreeGroupIdValue: Next Free ID is {}", freeId);
alshabib10580802015-02-18 18:30:33 -0800424 return freeId;
425 }
426
427 /**
428 * Stores a new group entry using the information from group description.
429 *
430 * @param groupDesc group description to be used to create group entry
431 */
432 @Override
433 public void storeGroupDescription(GroupDescription groupDesc) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700434 log.debug("In storeGroupDescription");
alshabib10580802015-02-18 18:30:33 -0800435 // Check if a group is existing with the same key
Saurav Das8a0732e2015-11-20 15:27:53 -0800436 Group existingGroup = getGroup(groupDesc.deviceId(), groupDesc.appCookie());
437 if (existingGroup != null) {
Saurav Das95047002018-01-25 09:49:01 -0800438 log.debug("Group already exists with the same key {} in dev:{} with id:0x{}",
Saurav Das8a0732e2015-11-20 15:27:53 -0800439 groupDesc.appCookie(), groupDesc.deviceId(),
440 Integer.toHexString(existingGroup.id().id()));
alshabib10580802015-02-18 18:30:33 -0800441 return;
442 }
443
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700444 // Check if group to be created by a remote instance
Madan Jampani175e8fd2015-05-20 14:10:45 -0700445 if (mastershipService.getLocalRole(groupDesc.deviceId()) != MastershipRole.MASTER) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700446 log.debug("storeGroupDescription: Device {} local role is not MASTER",
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700447 groupDesc.deviceId());
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700448 if (mastershipService.getMasterFor(groupDesc.deviceId()) == null) {
Sivachidambaram Subramanian9f816de2017-06-13 07:16:54 +0530449 log.debug("No Master for device {}..."
450 + "Queuing Group ADD request",
alshabibb0285992016-03-28 23:30:37 -0700451 groupDesc.deviceId());
Sivachidambaram Subramanian9f816de2017-06-13 07:16:54 +0530452 addToPendingAudit(groupDesc);
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700453 return;
454 }
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700455 GroupStoreMessage groupOp = GroupStoreMessage.
456 createGroupAddRequestMsg(groupDesc.deviceId(),
457 groupDesc);
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700458
Madan Jampani175e8fd2015-05-20 14:10:45 -0700459 clusterCommunicator.unicast(groupOp,
alshabibb0285992016-03-28 23:30:37 -0700460 GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST,
461 clusterMsgSerializer::serialize,
462 mastershipService.getMasterFor(groupDesc.deviceId()))
463 .whenComplete((result, error) -> {
Madan Jampani175e8fd2015-05-20 14:10:45 -0700464 if (error != null) {
465 log.warn("Failed to send request to master: {} to {}",
alshabibb0285992016-03-28 23:30:37 -0700466 groupOp,
467 mastershipService.getMasterFor(groupDesc.deviceId()));
Madan Jampani175e8fd2015-05-20 14:10:45 -0700468 //TODO: Send Group operation failure event
469 } else {
470 log.debug("Sent Group operation request for device {} "
alshabibb0285992016-03-28 23:30:37 -0700471 + "to remote MASTER {}",
472 groupDesc.deviceId(),
473 mastershipService.getMasterFor(groupDesc.deviceId()));
Madan Jampani175e8fd2015-05-20 14:10:45 -0700474 }
475 });
alshabib10580802015-02-18 18:30:33 -0800476 return;
477 }
478
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700479 log.debug("Store group for device {} is getting handled locally",
480 groupDesc.deviceId());
alshabib10580802015-02-18 18:30:33 -0800481 storeGroupDescriptionInternal(groupDesc);
482 }
483
Sivachidambaram Subramanian9f816de2017-06-13 07:16:54 +0530484 private void addToPendingAudit(GroupDescription groupDesc) {
485 Integer groupIdVal = groupDesc.givenGroupId();
486 GroupId groupId = (groupIdVal != null) ? new GroupId(groupIdVal) : dummyGroupId;
487 addToPendingKeyTable(new DefaultGroup(groupId, groupDesc));
488 }
489
490 private void addToPendingKeyTable(StoredGroupEntry group) {
491 group.setState(GroupState.WAITING_AUDIT_COMPLETE);
492 Map<GroupStoreKeyMapKey, StoredGroupEntry> pendingKeyTable =
493 getPendingGroupKeyTable();
494 pendingKeyTable.put(new GroupStoreKeyMapKey(group.deviceId(),
495 group.appCookie()),
496 group);
497 }
498
Srikanth Vavilapallie48b3cf2015-07-06 11:43:07 -0700499 private Group getMatchingExtraneousGroupbyId(DeviceId deviceId, Integer groupId) {
500 ConcurrentMap<GroupId, Group> extraneousMap =
501 extraneousGroupEntriesById.get(deviceId);
502 if (extraneousMap == null) {
503 return null;
504 }
Yi Tsengfa394de2017-02-01 11:26:40 -0800505 return extraneousMap.get(new GroupId(groupId));
Srikanth Vavilapallie48b3cf2015-07-06 11:43:07 -0700506 }
507
508 private Group getMatchingExtraneousGroupbyBuckets(DeviceId deviceId,
509 GroupBuckets buckets) {
510 ConcurrentMap<GroupId, Group> extraneousMap =
511 extraneousGroupEntriesById.get(deviceId);
512 if (extraneousMap == null) {
513 return null;
514 }
515
alshabibb0285992016-03-28 23:30:37 -0700516 for (Group extraneousGroup : extraneousMap.values()) {
Srikanth Vavilapallie48b3cf2015-07-06 11:43:07 -0700517 if (extraneousGroup.buckets().equals(buckets)) {
518 return extraneousGroup;
519 }
520 }
521 return null;
522 }
523
alshabib10580802015-02-18 18:30:33 -0800524 private void storeGroupDescriptionInternal(GroupDescription groupDesc) {
525 // Check if a group is existing with the same key
526 if (getGroup(groupDesc.deviceId(), groupDesc.appCookie()) != null) {
527 return;
528 }
529
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700530 if (deviceAuditStatus.get(groupDesc.deviceId()) == null) {
531 // Device group audit has not completed yet
532 // Add this group description to pending group key table
533 // Create a group entry object with Dummy Group ID
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700534 log.debug("storeGroupDescriptionInternal: Device {} AUDIT pending...Queuing Group ADD request",
alshabibb0285992016-03-28 23:30:37 -0700535 groupDesc.deviceId());
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700536 StoredGroupEntry group = new DefaultGroup(dummyGroupId, groupDesc);
537 group.setState(GroupState.WAITING_AUDIT_COMPLETE);
Madan Jampani0b847532016-03-03 13:44:15 -0800538 Map<GroupStoreKeyMapKey, StoredGroupEntry> pendingKeyTable =
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700539 getPendingGroupKeyTable();
540 pendingKeyTable.put(new GroupStoreKeyMapKey(groupDesc.deviceId(),
541 groupDesc.appCookie()),
542 group);
543 return;
544 }
545
Srikanth Vavilapallie48b3cf2015-07-06 11:43:07 -0700546 Group matchingExtraneousGroup = null;
547 if (groupDesc.givenGroupId() != null) {
548 //Check if there is a extraneous group existing with the same Id
549 matchingExtraneousGroup = getMatchingExtraneousGroupbyId(
alshabibb0285992016-03-28 23:30:37 -0700550 groupDesc.deviceId(), groupDesc.givenGroupId());
Srikanth Vavilapallie48b3cf2015-07-06 11:43:07 -0700551 if (matchingExtraneousGroup != null) {
Saurav Das0fd79d92016-03-07 10:58:36 -0800552 log.debug("storeGroupDescriptionInternal: Matching extraneous group "
alshabibb0285992016-03-28 23:30:37 -0700553 + "found in Device {} for group id 0x{}",
Srikanth Vavilapallie48b3cf2015-07-06 11:43:07 -0700554 groupDesc.deviceId(),
Saurav Das0fd79d92016-03-07 10:58:36 -0800555 Integer.toHexString(groupDesc.givenGroupId()));
Srikanth Vavilapallie48b3cf2015-07-06 11:43:07 -0700556 //Check if the group buckets matches with user provided buckets
557 if (matchingExtraneousGroup.buckets().equals(groupDesc.buckets())) {
558 //Group is already existing with the same buckets and Id
559 // Create a group entry object
Saurav Das0fd79d92016-03-07 10:58:36 -0800560 log.debug("storeGroupDescriptionInternal: Buckets also matching "
alshabibb0285992016-03-28 23:30:37 -0700561 + "in Device {} for group id 0x{}",
Srikanth Vavilapallie48b3cf2015-07-06 11:43:07 -0700562 groupDesc.deviceId(),
Saurav Das0fd79d92016-03-07 10:58:36 -0800563 Integer.toHexString(groupDesc.givenGroupId()));
Srikanth Vavilapallie48b3cf2015-07-06 11:43:07 -0700564 StoredGroupEntry group = new DefaultGroup(
alshabibb0285992016-03-28 23:30:37 -0700565 matchingExtraneousGroup.id(), groupDesc);
Srikanth Vavilapallie48b3cf2015-07-06 11:43:07 -0700566 // Insert the newly created group entry into key and id maps
567 getGroupStoreKeyMap().
alshabibb0285992016-03-28 23:30:37 -0700568 put(new GroupStoreKeyMapKey(groupDesc.deviceId(),
569 groupDesc.appCookie()), group);
Srikanth Vavilapallie48b3cf2015-07-06 11:43:07 -0700570 // Ensure it also inserted into group id based table to
571 // avoid any chances of duplication in group id generation
572 getGroupIdTable(groupDesc.deviceId()).
alshabibb0285992016-03-28 23:30:37 -0700573 put(matchingExtraneousGroup.id(), group);
Srikanth Vavilapallie48b3cf2015-07-06 11:43:07 -0700574 addOrUpdateGroupEntry(matchingExtraneousGroup);
575 removeExtraneousGroupEntry(matchingExtraneousGroup);
576 return;
577 } else {
578 //Group buckets are not matching. Update group
579 //with user provided buckets.
Saurav Das0fd79d92016-03-07 10:58:36 -0800580 log.debug("storeGroupDescriptionInternal: Buckets are not "
alshabibb0285992016-03-28 23:30:37 -0700581 + "matching in Device {} for group id 0x{}",
Srikanth Vavilapallie48b3cf2015-07-06 11:43:07 -0700582 groupDesc.deviceId(),
Saurav Das0fd79d92016-03-07 10:58:36 -0800583 Integer.toHexString(groupDesc.givenGroupId()));
584 StoredGroupEntry modifiedGroup = new DefaultGroup(
alshabibb0285992016-03-28 23:30:37 -0700585 matchingExtraneousGroup.id(), groupDesc);
Saurav Das0fd79d92016-03-07 10:58:36 -0800586 modifiedGroup.setState(GroupState.PENDING_UPDATE);
587 getGroupStoreKeyMap().
alshabibb0285992016-03-28 23:30:37 -0700588 put(new GroupStoreKeyMapKey(groupDesc.deviceId(),
589 groupDesc.appCookie()), modifiedGroup);
Saurav Das0fd79d92016-03-07 10:58:36 -0800590 // Ensure it also inserted into group id based table to
591 // avoid any chances of duplication in group id generation
592 getGroupIdTable(groupDesc.deviceId()).
alshabibb0285992016-03-28 23:30:37 -0700593 put(matchingExtraneousGroup.id(), modifiedGroup);
Saurav Das0fd79d92016-03-07 10:58:36 -0800594 removeExtraneousGroupEntry(matchingExtraneousGroup);
595 log.debug("storeGroupDescriptionInternal: Triggering Group "
alshabibb0285992016-03-28 23:30:37 -0700596 + "UPDATE request for {} in device {}",
Saurav Das0fd79d92016-03-07 10:58:36 -0800597 matchingExtraneousGroup.id(),
598 groupDesc.deviceId());
599 notifyDelegate(new GroupEvent(Type.GROUP_UPDATE_REQUESTED, modifiedGroup));
600 return;
Srikanth Vavilapallie48b3cf2015-07-06 11:43:07 -0700601 }
602 }
603 } else {
604 //Check if there is an extraneous group with user provided buckets
605 matchingExtraneousGroup = getMatchingExtraneousGroupbyBuckets(
alshabibb0285992016-03-28 23:30:37 -0700606 groupDesc.deviceId(), groupDesc.buckets());
Srikanth Vavilapallie48b3cf2015-07-06 11:43:07 -0700607 if (matchingExtraneousGroup != null) {
608 //Group is already existing with the same buckets.
609 //So reuse this group.
610 log.debug("storeGroupDescriptionInternal: Matching extraneous group found in Device {}",
611 groupDesc.deviceId());
612 //Create a group entry object
613 StoredGroupEntry group = new DefaultGroup(
alshabibb0285992016-03-28 23:30:37 -0700614 matchingExtraneousGroup.id(), groupDesc);
Srikanth Vavilapallie48b3cf2015-07-06 11:43:07 -0700615 // Insert the newly created group entry into key and id maps
616 getGroupStoreKeyMap().
alshabibb0285992016-03-28 23:30:37 -0700617 put(new GroupStoreKeyMapKey(groupDesc.deviceId(),
618 groupDesc.appCookie()), group);
Srikanth Vavilapallie48b3cf2015-07-06 11:43:07 -0700619 // Ensure it also inserted into group id based table to
620 // avoid any chances of duplication in group id generation
621 getGroupIdTable(groupDesc.deviceId()).
alshabibb0285992016-03-28 23:30:37 -0700622 put(matchingExtraneousGroup.id(), group);
Srikanth Vavilapallie48b3cf2015-07-06 11:43:07 -0700623 addOrUpdateGroupEntry(matchingExtraneousGroup);
624 removeExtraneousGroupEntry(matchingExtraneousGroup);
625 return;
626 } else {
627 //TODO: Check if there are any empty groups that can be used here
628 log.debug("storeGroupDescriptionInternal: No matching extraneous groups found in Device {}",
629 groupDesc.deviceId());
630 }
631 }
632
Saurav Das100e3b82015-04-30 11:12:10 -0700633 GroupId id = null;
634 if (groupDesc.givenGroupId() == null) {
635 // Get a new group identifier
Yi Tsengfa394de2017-02-01 11:26:40 -0800636 id = new GroupId(getFreeGroupIdValue(groupDesc.deviceId()));
Saurav Das100e3b82015-04-30 11:12:10 -0700637 } else {
Saurav Das8be4e3a2016-03-11 17:19:07 -0800638 // we need to use the identifier passed in by caller, but check if
639 // already used
640 Group existing = getGroup(groupDesc.deviceId(),
Yi Tsengfa394de2017-02-01 11:26:40 -0800641 new GroupId(groupDesc.givenGroupId()));
Saurav Das8be4e3a2016-03-11 17:19:07 -0800642 if (existing != null) {
643 log.warn("Group already exists with the same id: 0x{} in dev:{} "
alshabibb0285992016-03-28 23:30:37 -0700644 + "but with different key: {} (request gkey: {})",
645 Integer.toHexString(groupDesc.givenGroupId()),
646 groupDesc.deviceId(),
647 existing.appCookie(),
648 groupDesc.appCookie());
Saurav Das8be4e3a2016-03-11 17:19:07 -0800649 return;
650 }
Yi Tsengfa394de2017-02-01 11:26:40 -0800651 id = new GroupId(groupDesc.givenGroupId());
Saurav Das100e3b82015-04-30 11:12:10 -0700652 }
alshabib10580802015-02-18 18:30:33 -0800653 // Create a group entry object
654 StoredGroupEntry group = new DefaultGroup(id, groupDesc);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700655 // Insert the newly created group entry into key and id maps
656 getGroupStoreKeyMap().
alshabibb0285992016-03-28 23:30:37 -0700657 put(new GroupStoreKeyMapKey(groupDesc.deviceId(),
658 groupDesc.appCookie()), group);
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700659 // Ensure it also inserted into group id based table to
660 // avoid any chances of duplication in group id generation
661 getGroupIdTable(groupDesc.deviceId()).
alshabibb0285992016-03-28 23:30:37 -0700662 put(id, group);
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700663 log.debug("storeGroupDescriptionInternal: Processing Group ADD request for Id {} in device {}",
alshabibb0285992016-03-28 23:30:37 -0700664 id,
665 groupDesc.deviceId());
alshabib10580802015-02-18 18:30:33 -0800666 notifyDelegate(new GroupEvent(GroupEvent.Type.GROUP_ADD_REQUESTED,
667 group));
668 }
669
670 /**
671 * Updates the existing group entry with the information
672 * from group description.
673 *
alshabibb0285992016-03-28 23:30:37 -0700674 * @param deviceId the device ID
alshabib10580802015-02-18 18:30:33 -0800675 * @param oldAppCookie the current group key
alshabibb0285992016-03-28 23:30:37 -0700676 * @param type update type
677 * @param newBuckets group buckets for updates
alshabib10580802015-02-18 18:30:33 -0800678 * @param newAppCookie optional new group key
679 */
680 @Override
681 public void updateGroupDescription(DeviceId deviceId,
682 GroupKey oldAppCookie,
683 UpdateType type,
684 GroupBuckets newBuckets,
685 GroupKey newAppCookie) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700686 // Check if group update to be done by a remote instance
sangho52abe3a2015-05-05 14:13:34 -0700687 if (mastershipService.getMasterFor(deviceId) != null &&
688 mastershipService.getLocalRole(deviceId) != MastershipRole.MASTER) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700689 log.debug("updateGroupDescription: Device {} local role is not MASTER",
690 deviceId);
691 if (mastershipService.getMasterFor(deviceId) == null) {
692 log.error("No Master for device {}..."
alshabibb0285992016-03-28 23:30:37 -0700693 + "Can not perform update group operation",
694 deviceId);
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700695 //TODO: Send Group operation failure event
696 return;
697 }
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700698 GroupStoreMessage groupOp = GroupStoreMessage.
699 createGroupUpdateRequestMsg(deviceId,
700 oldAppCookie,
701 type,
702 newBuckets,
703 newAppCookie);
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700704
Madan Jampani175e8fd2015-05-20 14:10:45 -0700705 clusterCommunicator.unicast(groupOp,
alshabibb0285992016-03-28 23:30:37 -0700706 GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST,
707 clusterMsgSerializer::serialize,
708 mastershipService.getMasterFor(deviceId)).whenComplete((result, error) -> {
709 if (error != null) {
710 log.warn("Failed to send request to master: {} to {}",
711 groupOp,
712 mastershipService.getMasterFor(deviceId), error);
713 }
714 //TODO: Send Group operation failure event
715 });
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700716 return;
717 }
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700718 log.debug("updateGroupDescription for device {} is getting handled locally",
719 deviceId);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700720 updateGroupDescriptionInternal(deviceId,
721 oldAppCookie,
722 type,
723 newBuckets,
724 newAppCookie);
725 }
726
727 private void updateGroupDescriptionInternal(DeviceId deviceId,
alshabibb0285992016-03-28 23:30:37 -0700728 GroupKey oldAppCookie,
729 UpdateType type,
730 GroupBuckets newBuckets,
731 GroupKey newAppCookie) {
alshabib10580802015-02-18 18:30:33 -0800732 // Check if a group is existing with the provided key
733 Group oldGroup = getGroup(deviceId, oldAppCookie);
734 if (oldGroup == null) {
Saurav Das8be4e3a2016-03-11 17:19:07 -0800735 log.warn("updateGroupDescriptionInternal: Group not found...strange. "
alshabibb0285992016-03-28 23:30:37 -0700736 + "GroupKey:{} DeviceId:{}", oldAppCookie, deviceId);
alshabib10580802015-02-18 18:30:33 -0800737 return;
738 }
739
740 List<GroupBucket> newBucketList = getUpdatedBucketList(oldGroup,
741 type,
742 newBuckets);
743 if (newBucketList != null) {
744 // Create a new group object from the old group
745 GroupBuckets updatedBuckets = new GroupBuckets(newBucketList);
746 GroupKey newCookie = (newAppCookie != null) ? newAppCookie : oldAppCookie;
747 GroupDescription updatedGroupDesc = new DefaultGroupDescription(
748 oldGroup.deviceId(),
749 oldGroup.type(),
750 updatedBuckets,
751 newCookie,
Saurav Das100e3b82015-04-30 11:12:10 -0700752 oldGroup.givenGroupId(),
alshabib10580802015-02-18 18:30:33 -0800753 oldGroup.appId());
754 StoredGroupEntry newGroup = new DefaultGroup(oldGroup.id(),
755 updatedGroupDesc);
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700756 log.debug("updateGroupDescriptionInternal: group entry {} in device {} moving from {} to PENDING_UPDATE",
alshabibb0285992016-03-28 23:30:37 -0700757 oldGroup.id(),
758 oldGroup.deviceId(),
759 oldGroup.state());
alshabib10580802015-02-18 18:30:33 -0800760 newGroup.setState(GroupState.PENDING_UPDATE);
761 newGroup.setLife(oldGroup.life());
762 newGroup.setPackets(oldGroup.packets());
763 newGroup.setBytes(oldGroup.bytes());
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700764 //Update the group entry in groupkey based map.
765 //Update to groupid based map will happen in the
766 //groupkey based map update listener
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700767 log.debug("updateGroupDescriptionInternal with type {}: Group updated with buckets",
768 type);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700769 getGroupStoreKeyMap().
alshabibb0285992016-03-28 23:30:37 -0700770 put(new GroupStoreKeyMapKey(newGroup.deviceId(),
771 newGroup.appCookie()), newGroup);
alshabib10580802015-02-18 18:30:33 -0800772 notifyDelegate(new GroupEvent(Type.GROUP_UPDATE_REQUESTED, newGroup));
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700773 } else {
774 log.warn("updateGroupDescriptionInternal with type {}: No "
alshabibb0285992016-03-28 23:30:37 -0700775 + "change in the buckets in update", type);
alshabib10580802015-02-18 18:30:33 -0800776 }
777 }
778
779 private List<GroupBucket> getUpdatedBucketList(Group oldGroup,
780 UpdateType type,
781 GroupBuckets buckets) {
Victor Silva0282ab82016-11-15 16:30:27 -0300782 if (type == UpdateType.SET) {
783 return buckets.buckets();
784 }
785
Victor Silvadf1eeae2016-08-12 15:28:57 -0300786 List<GroupBucket> oldBuckets = oldGroup.buckets().buckets();
787 List<GroupBucket> updatedBucketList = new ArrayList<>();
alshabib10580802015-02-18 18:30:33 -0800788 boolean groupDescUpdated = false;
789
790 if (type == UpdateType.ADD) {
Victor Silvadf1eeae2016-08-12 15:28:57 -0300791 List<GroupBucket> newBuckets = buckets.buckets();
792
793 // Add old buckets that will not be updated and check if any will be updated.
794 for (GroupBucket oldBucket : oldBuckets) {
795 int newBucketIndex = newBuckets.indexOf(oldBucket);
796
797 if (newBucketIndex != -1) {
798 GroupBucket newBucket = newBuckets.get(newBucketIndex);
799 if (!newBucket.hasSameParameters(oldBucket)) {
800 // Bucket will be updated
801 groupDescUpdated = true;
802 }
803 } else {
804 // Old bucket will remain the same - add it.
805 updatedBucketList.add(oldBucket);
alshabib10580802015-02-18 18:30:33 -0800806 }
807 }
Victor Silvadf1eeae2016-08-12 15:28:57 -0300808
809 // Add all new buckets
810 updatedBucketList.addAll(newBuckets);
811 if (!oldBuckets.containsAll(newBuckets)) {
812 groupDescUpdated = true;
813 }
814
alshabib10580802015-02-18 18:30:33 -0800815 } else if (type == UpdateType.REMOVE) {
Victor Silvadf1eeae2016-08-12 15:28:57 -0300816 List<GroupBucket> bucketsToRemove = buckets.buckets();
817
818 // Check which old buckets should remain
819 for (GroupBucket oldBucket : oldBuckets) {
820 if (!bucketsToRemove.contains(oldBucket)) {
821 updatedBucketList.add(oldBucket);
822 } else {
alshabib10580802015-02-18 18:30:33 -0800823 groupDescUpdated = true;
824 }
825 }
826 }
827
828 if (groupDescUpdated) {
Victor Silvadf1eeae2016-08-12 15:28:57 -0300829 return updatedBucketList;
alshabib10580802015-02-18 18:30:33 -0800830 } else {
831 return null;
832 }
833 }
834
835 /**
836 * Triggers deleting the existing group entry.
837 *
alshabibb0285992016-03-28 23:30:37 -0700838 * @param deviceId the device ID
alshabib10580802015-02-18 18:30:33 -0800839 * @param appCookie the group key
840 */
841 @Override
842 public void deleteGroupDescription(DeviceId deviceId,
843 GroupKey appCookie) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700844 // Check if group to be deleted by a remote instance
845 if (mastershipService.
846 getLocalRole(deviceId) != MastershipRole.MASTER) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700847 log.debug("deleteGroupDescription: Device {} local role is not MASTER",
848 deviceId);
849 if (mastershipService.getMasterFor(deviceId) == null) {
850 log.error("No Master for device {}..."
alshabibb0285992016-03-28 23:30:37 -0700851 + "Can not perform delete group operation",
852 deviceId);
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700853 //TODO: Send Group operation failure event
854 return;
855 }
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700856 GroupStoreMessage groupOp = GroupStoreMessage.
857 createGroupDeleteRequestMsg(deviceId,
858 appCookie);
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700859
Madan Jampani175e8fd2015-05-20 14:10:45 -0700860 clusterCommunicator.unicast(groupOp,
alshabibb0285992016-03-28 23:30:37 -0700861 GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST,
862 clusterMsgSerializer::serialize,
863 mastershipService.getMasterFor(deviceId)).whenComplete((result, error) -> {
864 if (error != null) {
865 log.warn("Failed to send request to master: {} to {}",
866 groupOp,
867 mastershipService.getMasterFor(deviceId), error);
868 }
869 //TODO: Send Group operation failure event
870 });
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700871 return;
872 }
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700873 log.debug("deleteGroupDescription in device {} is getting handled locally",
874 deviceId);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700875 deleteGroupDescriptionInternal(deviceId, appCookie);
876 }
877
878 private void deleteGroupDescriptionInternal(DeviceId deviceId,
879 GroupKey appCookie) {
alshabib10580802015-02-18 18:30:33 -0800880 // Check if a group is existing with the provided key
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700881 StoredGroupEntry existing = getStoredGroupEntry(deviceId, appCookie);
alshabib10580802015-02-18 18:30:33 -0800882 if (existing == null) {
883 return;
884 }
885
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700886 log.debug("deleteGroupDescriptionInternal: group entry {} in device {} moving from {} to PENDING_DELETE",
alshabibb0285992016-03-28 23:30:37 -0700887 existing.id(),
888 existing.deviceId(),
889 existing.state());
alshabib10580802015-02-18 18:30:33 -0800890 synchronized (existing) {
891 existing.setState(GroupState.PENDING_DELETE);
Saurav Das80980c72016-03-23 11:22:49 -0700892 getGroupStoreKeyMap().
alshabibb0285992016-03-28 23:30:37 -0700893 put(new GroupStoreKeyMapKey(existing.deviceId(), existing.appCookie()),
894 existing);
alshabib10580802015-02-18 18:30:33 -0800895 }
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700896 log.debug("deleteGroupDescriptionInternal: in device {} issuing GROUP_REMOVE_REQUESTED",
897 deviceId);
alshabib10580802015-02-18 18:30:33 -0800898 notifyDelegate(new GroupEvent(Type.GROUP_REMOVE_REQUESTED, existing));
899 }
900
901 /**
902 * Stores a new group entry, or updates an existing entry.
903 *
904 * @param group group entry
905 */
906 @Override
907 public void addOrUpdateGroupEntry(Group group) {
908 // check if this new entry is an update to an existing entry
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700909 StoredGroupEntry existing = getStoredGroupEntry(group.deviceId(),
910 group.id());
alshabib10580802015-02-18 18:30:33 -0800911 GroupEvent event = null;
912
913 if (existing != null) {
Saurav Das0fd79d92016-03-07 10:58:36 -0800914 log.trace("addOrUpdateGroupEntry: updating group entry {} in device {}",
alshabibb0285992016-03-28 23:30:37 -0700915 group.id(),
916 group.deviceId());
alshabib10580802015-02-18 18:30:33 -0800917 synchronized (existing) {
alshabibb0285992016-03-28 23:30:37 -0700918 for (GroupBucket bucket : group.buckets().buckets()) {
Sho SHIMIZU30d639b2015-05-05 09:30:35 -0700919 Optional<GroupBucket> matchingBucket =
Srikanth Vavilapalli10e75cd2015-04-13 16:21:24 -0700920 existing.buckets().buckets()
alshabibb0285992016-03-28 23:30:37 -0700921 .stream()
922 .filter((existingBucket) -> (existingBucket.equals(bucket)))
923 .findFirst();
Srikanth Vavilapalli10e75cd2015-04-13 16:21:24 -0700924 if (matchingBucket.isPresent()) {
925 ((StoredGroupBucketEntry) matchingBucket.
926 get()).setPackets(bucket.packets());
927 ((StoredGroupBucketEntry) matchingBucket.
928 get()).setBytes(bucket.bytes());
929 } else {
930 log.warn("addOrUpdateGroupEntry: No matching "
alshabibb0285992016-03-28 23:30:37 -0700931 + "buckets to update stats");
Srikanth Vavilapalli10e75cd2015-04-13 16:21:24 -0700932 }
933 }
alshabib10580802015-02-18 18:30:33 -0800934 existing.setLife(group.life());
935 existing.setPackets(group.packets());
936 existing.setBytes(group.bytes());
alshabibb0285992016-03-28 23:30:37 -0700937 existing.setReferenceCount(group.referenceCount());
Srikanth Vavilapalli5428b6c2015-05-14 20:22:47 -0700938 if ((existing.state() == GroupState.PENDING_ADD) ||
alshabibb0285992016-03-28 23:30:37 -0700939 (existing.state() == GroupState.PENDING_ADD_RETRY)) {
Saurav Das0fd79d92016-03-07 10:58:36 -0800940 log.trace("addOrUpdateGroupEntry: group entry {} in device {} moving from {} to ADDED",
alshabibb0285992016-03-28 23:30:37 -0700941 existing.id(),
942 existing.deviceId(),
943 existing.state());
alshabib10580802015-02-18 18:30:33 -0800944 existing.setState(GroupState.ADDED);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700945 existing.setIsGroupStateAddedFirstTime(true);
alshabib10580802015-02-18 18:30:33 -0800946 event = new GroupEvent(Type.GROUP_ADDED, existing);
947 } else {
Saurav Das0fd79d92016-03-07 10:58:36 -0800948 log.trace("addOrUpdateGroupEntry: group entry {} in device {} moving from {} to ADDED",
alshabibb0285992016-03-28 23:30:37 -0700949 existing.id(),
950 existing.deviceId(),
951 GroupState.PENDING_UPDATE);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700952 existing.setState(GroupState.ADDED);
953 existing.setIsGroupStateAddedFirstTime(false);
alshabib10580802015-02-18 18:30:33 -0800954 event = new GroupEvent(Type.GROUP_UPDATED, existing);
955 }
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700956 //Re-PUT map entries to trigger map update events
957 getGroupStoreKeyMap().
alshabibb0285992016-03-28 23:30:37 -0700958 put(new GroupStoreKeyMapKey(existing.deviceId(),
959 existing.appCookie()), existing);
alshabib10580802015-02-18 18:30:33 -0800960 }
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700961 } else {
962 log.warn("addOrUpdateGroupEntry: Group update "
alshabibb0285992016-03-28 23:30:37 -0700963 + "happening for a non-existing entry in the map");
alshabib10580802015-02-18 18:30:33 -0800964 }
965
966 if (event != null) {
967 notifyDelegate(event);
968 }
969 }
970
971 /**
972 * Removes the group entry from store.
973 *
974 * @param group group entry
975 */
976 @Override
977 public void removeGroupEntry(Group group) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700978 StoredGroupEntry existing = getStoredGroupEntry(group.deviceId(),
979 group.id());
alshabib10580802015-02-18 18:30:33 -0800980
981 if (existing != null) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700982 log.debug("removeGroupEntry: removing group entry {} in device {}",
alshabibb0285992016-03-28 23:30:37 -0700983 group.id(),
984 group.deviceId());
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700985 //Removal from groupid based map will happen in the
986 //map update listener
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700987 getGroupStoreKeyMap().remove(new GroupStoreKeyMapKey(existing.deviceId(),
988 existing.appCookie()));
alshabib10580802015-02-18 18:30:33 -0800989 notifyDelegate(new GroupEvent(Type.GROUP_REMOVED, existing));
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700990 } else {
991 log.warn("removeGroupEntry for {} in device{} is "
alshabibb0285992016-03-28 23:30:37 -0700992 + "not existing in our maps",
993 group.id(),
994 group.deviceId());
alshabib10580802015-02-18 18:30:33 -0800995 }
996 }
997
Victor Silva4e8b7832016-08-17 17:11:19 -0300998 private void purgeGroupEntries(Set<Entry<GroupStoreKeyMapKey, StoredGroupEntry>> entries) {
999 entries.forEach(entry -> {
1000 groupStoreEntriesByKey.remove(entry.getKey());
1001 });
1002 }
1003
alshabib10580802015-02-18 18:30:33 -08001004 @Override
Charles Chan0c7c43b2016-01-14 17:39:20 -08001005 public void purgeGroupEntry(DeviceId deviceId) {
Victor Silva4e8b7832016-08-17 17:11:19 -03001006 Set<Entry<GroupStoreKeyMapKey, StoredGroupEntry>> entriesPendingRemove =
Charles Chan0c7c43b2016-01-14 17:39:20 -08001007 new HashSet<>();
1008
Madan Jampani0b847532016-03-03 13:44:15 -08001009 getGroupStoreKeyMap().entrySet().stream()
Charles Chan0c7c43b2016-01-14 17:39:20 -08001010 .filter(entry -> entry.getKey().deviceId().equals(deviceId))
Victor Silva4e8b7832016-08-17 17:11:19 -03001011 .forEach(entriesPendingRemove::add);
Charles Chan0c7c43b2016-01-14 17:39:20 -08001012
Victor Silva4e8b7832016-08-17 17:11:19 -03001013 purgeGroupEntries(entriesPendingRemove);
1014 }
1015
1016 @Override
1017 public void purgeGroupEntries() {
1018 purgeGroupEntries(getGroupStoreKeyMap().entrySet());
Charles Chan0c7c43b2016-01-14 17:39:20 -08001019 }
1020
1021 @Override
alshabib10580802015-02-18 18:30:33 -08001022 public void deviceInitialAuditCompleted(DeviceId deviceId,
1023 boolean completed) {
1024 synchronized (deviceAuditStatus) {
1025 if (completed) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001026 log.debug("AUDIT completed for device {}",
1027 deviceId);
alshabib10580802015-02-18 18:30:33 -08001028 deviceAuditStatus.put(deviceId, true);
1029 // Execute all pending group requests
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001030 List<StoredGroupEntry> pendingGroupRequests =
1031 getPendingGroupKeyTable().values()
alshabibb0285992016-03-28 23:30:37 -07001032 .stream()
1033 .filter(g -> g.deviceId().equals(deviceId))
1034 .collect(Collectors.toList());
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001035 log.debug("processing pending group add requests for device {} and number of pending requests {}",
alshabibb0285992016-03-28 23:30:37 -07001036 deviceId,
1037 pendingGroupRequests.size());
1038 for (Group group : pendingGroupRequests) {
alshabib10580802015-02-18 18:30:33 -08001039 GroupDescription tmp = new DefaultGroupDescription(
1040 group.deviceId(),
1041 group.type(),
1042 group.buckets(),
1043 group.appCookie(),
Saurav Das100e3b82015-04-30 11:12:10 -07001044 group.givenGroupId(),
alshabib10580802015-02-18 18:30:33 -08001045 group.appId());
1046 storeGroupDescriptionInternal(tmp);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001047 getPendingGroupKeyTable().
alshabibb0285992016-03-28 23:30:37 -07001048 remove(new GroupStoreKeyMapKey(deviceId, group.appCookie()));
alshabib10580802015-02-18 18:30:33 -08001049 }
alshabib10580802015-02-18 18:30:33 -08001050 } else {
Thomas Vachuskac40d4632015-04-09 16:55:03 -07001051 Boolean audited = deviceAuditStatus.get(deviceId);
1052 if (audited != null && audited) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001053 log.debug("Clearing AUDIT status for device {}", deviceId);
alshabib10580802015-02-18 18:30:33 -08001054 deviceAuditStatus.put(deviceId, false);
1055 }
1056 }
1057 }
1058 }
1059
1060 @Override
1061 public boolean deviceInitialAuditStatus(DeviceId deviceId) {
1062 synchronized (deviceAuditStatus) {
Thomas Vachuskac40d4632015-04-09 16:55:03 -07001063 Boolean audited = deviceAuditStatus.get(deviceId);
1064 return audited != null && audited;
alshabib10580802015-02-18 18:30:33 -08001065 }
1066 }
1067
1068 @Override
1069 public void groupOperationFailed(DeviceId deviceId, GroupOperation operation) {
1070
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001071 StoredGroupEntry existing = getStoredGroupEntry(deviceId,
1072 operation.groupId());
alshabib10580802015-02-18 18:30:33 -08001073
1074 if (existing == null) {
1075 log.warn("No group entry with ID {} found ", operation.groupId());
1076 return;
1077 }
1078
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001079 log.warn("groupOperationFailed: group operation {} failed"
alshabibb0285992016-03-28 23:30:37 -07001080 + "for group {} in device {} with code {}",
1081 operation.opType(),
1082 existing.id(),
1083 existing.deviceId(),
1084 operation.failureCode());
Saurav Das0fd79d92016-03-07 10:58:36 -08001085 if (operation.failureCode() == GroupOperation.GroupMsgErrorCode.GROUP_EXISTS) {
Saurav Das8be4e3a2016-03-11 17:19:07 -08001086 if (operation.buckets().equals(existing.buckets())) {
Saurav Dasc88d4662017-05-15 15:34:25 -07001087 if (existing.state() == GroupState.PENDING_ADD ||
1088 existing.state() == GroupState.PENDING_ADD_RETRY) {
Saurav Das8be4e3a2016-03-11 17:19:07 -08001089 log.info("GROUP_EXISTS: GroupID and Buckets match for group in pending "
alshabibb0285992016-03-28 23:30:37 -07001090 + "add state - moving to ADDED for group {} in device {}",
1091 existing.id(), deviceId);
Saurav Das8be4e3a2016-03-11 17:19:07 -08001092 addOrUpdateGroupEntry(existing);
1093 return;
1094 } else {
Saurav Dasc88d4662017-05-15 15:34:25 -07001095 log.warn("GROUP_EXISTS: GroupId and Buckets match but existing"
1096 + "group in state: {}", existing.state());
Saurav Das8be4e3a2016-03-11 17:19:07 -08001097 }
Saurav Dasc88d4662017-05-15 15:34:25 -07001098 } else {
1099 log.warn("GROUP EXISTS: Group ID matched but buckets did not. "
1100 + "Operation: {} Existing: {}", operation.buckets(),
1101 existing.buckets());
Saurav Das8be4e3a2016-03-11 17:19:07 -08001102 }
Saurav Das0fd79d92016-03-07 10:58:36 -08001103 }
alshabib10580802015-02-18 18:30:33 -08001104 switch (operation.opType()) {
1105 case ADD:
Srikanth Vavilapalli5428b6c2015-05-14 20:22:47 -07001106 if (existing.state() == GroupState.PENDING_ADD) {
Srikanth Vavilapalli5428b6c2015-05-14 20:22:47 -07001107 notifyDelegate(new GroupEvent(Type.GROUP_ADD_FAILED, existing));
1108 log.warn("groupOperationFailed: cleaningup "
alshabibb0285992016-03-28 23:30:37 -07001109 + "group {} from store in device {}....",
1110 existing.id(),
1111 existing.deviceId());
Srikanth Vavilapalli5428b6c2015-05-14 20:22:47 -07001112 //Removal from groupid based map will happen in the
1113 //map update listener
1114 getGroupStoreKeyMap().remove(new GroupStoreKeyMapKey(existing.deviceId(),
1115 existing.appCookie()));
1116 }
alshabib10580802015-02-18 18:30:33 -08001117 break;
1118 case MODIFY:
1119 notifyDelegate(new GroupEvent(Type.GROUP_UPDATE_FAILED, existing));
1120 break;
1121 case DELETE:
1122 notifyDelegate(new GroupEvent(Type.GROUP_REMOVE_FAILED, existing));
1123 break;
1124 default:
1125 log.warn("Unknown group operation type {}", operation.opType());
1126 }
alshabib10580802015-02-18 18:30:33 -08001127 }
1128
1129 @Override
1130 public void addOrUpdateExtraneousGroupEntry(Group group) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001131 log.debug("add/update extraneous group entry {} in device {}",
alshabibb0285992016-03-28 23:30:37 -07001132 group.id(),
1133 group.deviceId());
alshabib10580802015-02-18 18:30:33 -08001134 ConcurrentMap<GroupId, Group> extraneousIdTable =
1135 getExtraneousGroupIdTable(group.deviceId());
1136 extraneousIdTable.put(group.id(), group);
Srikanth Vavilapallie48b3cf2015-07-06 11:43:07 -07001137 // Don't remove the extraneous groups, instead re-use it when
1138 // a group request comes with the same set of buckets
alshabib10580802015-02-18 18:30:33 -08001139 }
1140
1141 @Override
1142 public void removeExtraneousGroupEntry(Group group) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001143 log.debug("remove extraneous group entry {} of device {} from store",
alshabibb0285992016-03-28 23:30:37 -07001144 group.id(),
1145 group.deviceId());
alshabib10580802015-02-18 18:30:33 -08001146 ConcurrentMap<GroupId, Group> extraneousIdTable =
1147 getExtraneousGroupIdTable(group.deviceId());
1148 extraneousIdTable.remove(group.id());
1149 }
1150
1151 @Override
1152 public Iterable<Group> getExtraneousGroups(DeviceId deviceId) {
1153 // flatten and make iterator unmodifiable
1154 return FluentIterable.from(
1155 getExtraneousGroupIdTable(deviceId).values());
1156 }
1157
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001158 /**
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -07001159 * Map handler to receive any events when the group key map is updated.
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001160 */
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -07001161 private class GroupStoreKeyMapListener implements
Madan Jampani0b847532016-03-03 13:44:15 -08001162 MapEventListener<GroupStoreKeyMapKey, StoredGroupEntry> {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001163
1164 @Override
Madan Jampani0b847532016-03-03 13:44:15 -08001165 public void event(MapEvent<GroupStoreKeyMapKey, StoredGroupEntry> mapEvent) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001166 GroupEvent groupEvent = null;
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001167 GroupStoreKeyMapKey key = mapEvent.key();
Madan Jampani0b847532016-03-03 13:44:15 -08001168 StoredGroupEntry group = Versioned.valueOrNull(mapEvent.newValue());
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001169 if ((key == null) && (group == null)) {
1170 log.error("GroupStoreKeyMapListener: Received "
alshabibb0285992016-03-28 23:30:37 -07001171 + "event {} with null entry", mapEvent.type());
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001172 return;
1173 } else if (group == null) {
1174 group = getGroupIdTable(key.deviceId()).values()
1175 .stream()
1176 .filter((storedGroup) -> (storedGroup.appCookie().equals(key.appCookie)))
Yuta HIGUCHI6e5f4702016-11-21 11:42:11 -08001177 .findFirst().orElse(null);
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001178 if (group == null) {
1179 log.error("GroupStoreKeyMapListener: Received "
alshabibb0285992016-03-28 23:30:37 -07001180 + "event {} with null entry... can not process", mapEvent.type());
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001181 return;
1182 }
1183 }
1184 log.trace("received groupid map event {} for id {} in device {}",
1185 mapEvent.type(),
1186 group.id(),
jaegonkim68e080c2016-12-01 22:31:01 +09001187 (key != null ? key.deviceId() : null));
Madan Jampani0b847532016-03-03 13:44:15 -08001188 if (mapEvent.type() == MapEvent.Type.INSERT || mapEvent.type() == MapEvent.Type.UPDATE) {
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -07001189 // Update the group ID table
1190 getGroupIdTable(group.deviceId()).put(group.id(), group);
Madan Jampani0b847532016-03-03 13:44:15 -08001191 StoredGroupEntry value = Versioned.valueOrNull(mapEvent.newValue());
1192 if (value.state() == Group.GroupState.ADDED) {
1193 if (value.isGroupStateAddedFirstTime()) {
1194 groupEvent = new GroupEvent(Type.GROUP_ADDED, value);
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001195 log.trace("Received first time GROUP_ADDED state update for id {} in device {}",
alshabibb0285992016-03-28 23:30:37 -07001196 group.id(),
1197 group.deviceId());
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001198 } else {
Madan Jampani0b847532016-03-03 13:44:15 -08001199 groupEvent = new GroupEvent(Type.GROUP_UPDATED, value);
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001200 log.trace("Received following GROUP_ADDED state update for id {} in device {}",
alshabibb0285992016-03-28 23:30:37 -07001201 group.id(),
1202 group.deviceId());
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001203 }
1204 }
Madan Jampani0b847532016-03-03 13:44:15 -08001205 } else if (mapEvent.type() == MapEvent.Type.REMOVE) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001206 groupEvent = new GroupEvent(Type.GROUP_REMOVED, group);
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -07001207 // Remove the entry from the group ID table
1208 getGroupIdTable(group.deviceId()).remove(group.id(), group);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001209 }
1210
1211 if (groupEvent != null) {
1212 notifyDelegate(groupEvent);
1213 }
1214 }
1215 }
Madan Jampani01e05fb2015-08-13 13:29:36 -07001216
helenyrwua1c41152016-08-18 16:16:14 -07001217 private void processGroupMessage(GroupStoreMessage message) {
1218 if (message.type() == GroupStoreMessage.Type.FAILOVER) {
1219 // FIXME: groupStoreEntriesByKey inaccessible here
1220 getGroupIdTable(message.deviceId()).values()
1221 .stream()
1222 .filter((storedGroup) -> (storedGroup.appCookie().equals(message.appCookie())))
1223 .findFirst().ifPresent(group -> notifyDelegate(new GroupEvent(Type.GROUP_BUCKET_FAILOVER, group)));
1224 }
1225 }
1226
Madan Jampani01e05fb2015-08-13 13:29:36 -07001227 private void process(GroupStoreMessage groupOp) {
1228 log.debug("Received remote group operation {} request for device {}",
alshabibb0285992016-03-28 23:30:37 -07001229 groupOp.type(),
1230 groupOp.deviceId());
1231 if (!mastershipService.isLocalMaster(groupOp.deviceId())) {
1232 log.warn("This node is not MASTER for device {}", groupOp.deviceId());
1233 return;
1234 }
1235 if (groupOp.type() == GroupStoreMessage.Type.ADD) {
1236 storeGroupDescriptionInternal(groupOp.groupDesc());
1237 } else if (groupOp.type() == GroupStoreMessage.Type.UPDATE) {
1238 updateGroupDescriptionInternal(groupOp.deviceId(),
1239 groupOp.appCookie(),
1240 groupOp.updateType(),
1241 groupOp.updateBuckets(),
1242 groupOp.newAppCookie());
1243 } else if (groupOp.type() == GroupStoreMessage.Type.DELETE) {
1244 deleteGroupDescriptionInternal(groupOp.deviceId(),
1245 groupOp.appCookie());
1246 }
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001247 }
1248
1249 /**
1250 * Flattened map key to be used to store group entries.
1251 */
Ray Milkeyb3c5ce22015-08-10 09:07:36 -07001252 protected static class GroupStoreMapKey {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001253 private final DeviceId deviceId;
1254
1255 public GroupStoreMapKey(DeviceId deviceId) {
1256 this.deviceId = deviceId;
1257 }
1258
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001259 public DeviceId deviceId() {
1260 return deviceId;
1261 }
1262
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001263 @Override
1264 public boolean equals(Object o) {
1265 if (this == o) {
1266 return true;
1267 }
1268 if (!(o instanceof GroupStoreMapKey)) {
1269 return false;
1270 }
1271 GroupStoreMapKey that = (GroupStoreMapKey) o;
1272 return this.deviceId.equals(that.deviceId);
1273 }
1274
1275 @Override
1276 public int hashCode() {
1277 int result = 17;
1278
1279 result = 31 * result + Objects.hash(this.deviceId);
1280
1281 return result;
1282 }
1283 }
1284
Ray Milkeyb3c5ce22015-08-10 09:07:36 -07001285 protected static class GroupStoreKeyMapKey extends GroupStoreMapKey {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001286 private final GroupKey appCookie;
alshabibb0285992016-03-28 23:30:37 -07001287
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001288 public GroupStoreKeyMapKey(DeviceId deviceId,
1289 GroupKey appCookie) {
1290 super(deviceId);
1291 this.appCookie = appCookie;
1292 }
1293
1294 @Override
1295 public boolean equals(Object o) {
1296 if (this == o) {
1297 return true;
1298 }
1299 if (!(o instanceof GroupStoreKeyMapKey)) {
1300 return false;
1301 }
1302 GroupStoreKeyMapKey that = (GroupStoreKeyMapKey) o;
1303 return (super.equals(that) &&
1304 this.appCookie.equals(that.appCookie));
1305 }
1306
1307 @Override
1308 public int hashCode() {
1309 int result = 17;
1310
1311 result = 31 * result + super.hashCode() + Objects.hash(this.appCookie);
1312
1313 return result;
1314 }
1315 }
1316
Ray Milkeyb3c5ce22015-08-10 09:07:36 -07001317 protected static class GroupStoreIdMapKey extends GroupStoreMapKey {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001318 private final GroupId groupId;
alshabibb0285992016-03-28 23:30:37 -07001319
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001320 public GroupStoreIdMapKey(DeviceId deviceId,
1321 GroupId groupId) {
1322 super(deviceId);
1323 this.groupId = groupId;
1324 }
1325
1326 @Override
1327 public boolean equals(Object o) {
1328 if (this == o) {
1329 return true;
1330 }
1331 if (!(o instanceof GroupStoreIdMapKey)) {
1332 return false;
1333 }
1334 GroupStoreIdMapKey that = (GroupStoreIdMapKey) o;
1335 return (super.equals(that) &&
1336 this.groupId.equals(that.groupId));
1337 }
1338
1339 @Override
1340 public int hashCode() {
1341 int result = 17;
1342
1343 result = 31 * result + super.hashCode() + Objects.hash(this.groupId);
1344
1345 return result;
1346 }
1347 }
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001348
1349 @Override
1350 public void pushGroupMetrics(DeviceId deviceId,
1351 Collection<Group> groupEntries) {
1352 boolean deviceInitialAuditStatus =
1353 deviceInitialAuditStatus(deviceId);
1354 Set<Group> southboundGroupEntries =
1355 Sets.newHashSet(groupEntries);
1356 Set<StoredGroupEntry> storedGroupEntries =
1357 Sets.newHashSet(getStoredGroups(deviceId));
1358 Set<Group> extraneousStoredEntries =
1359 Sets.newHashSet(getExtraneousGroups(deviceId));
1360
Sho SHIMIZU695bac62016-08-15 12:41:59 -07001361 if (log.isTraceEnabled()) {
1362 log.trace("pushGroupMetrics: Displaying all ({}) southboundGroupEntries for device {}",
1363 southboundGroupEntries.size(),
1364 deviceId);
1365 for (Group group : southboundGroupEntries) {
1366 log.trace("Group {} in device {}", group, deviceId);
1367 }
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001368
Sho SHIMIZU695bac62016-08-15 12:41:59 -07001369 log.trace("Displaying all ({}) stored group entries for device {}",
1370 storedGroupEntries.size(),
1371 deviceId);
1372 for (StoredGroupEntry group : storedGroupEntries) {
1373 log.trace("Stored Group {} for device {}", group, deviceId);
1374 }
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001375 }
1376
alshabibb0285992016-03-28 23:30:37 -07001377 garbageCollect(deviceId, southboundGroupEntries, storedGroupEntries);
1378
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001379 for (Iterator<Group> it2 = southboundGroupEntries.iterator(); it2.hasNext();) {
1380 Group group = it2.next();
1381 if (storedGroupEntries.remove(group)) {
1382 // we both have the group, let's update some info then.
1383 log.trace("Group AUDIT: group {} exists in both planes for device {}",
alshabibb0285992016-03-28 23:30:37 -07001384 group.id(), deviceId);
1385
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001386 groupAdded(group);
1387 it2.remove();
1388 }
1389 }
1390 for (Group group : southboundGroupEntries) {
1391 if (getGroup(group.deviceId(), group.id()) != null) {
1392 // There is a group existing with the same id
1393 // It is possible that group update is
1394 // in progress while we got a stale info from switch
1395 if (!storedGroupEntries.remove(getGroup(
alshabibb0285992016-03-28 23:30:37 -07001396 group.deviceId(), group.id()))) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001397 log.warn("Group AUDIT: Inconsistent state:"
alshabibb0285992016-03-28 23:30:37 -07001398 + "Group exists in ID based table while "
1399 + "not present in key based table");
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001400 }
1401 } else {
1402 // there are groups in the switch that aren't in the store
1403 log.debug("Group AUDIT: extraneous group {} exists in data plane for device {}",
alshabibb0285992016-03-28 23:30:37 -07001404 group.id(), deviceId);
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001405 extraneousStoredEntries.remove(group);
Kavitha Alagesanc56cded2017-01-13 10:48:18 +05301406 if (allowExtraneousGroups) {
1407 extraneousGroup(group);
1408 } else {
1409 notifyDelegate(new GroupEvent(Type.GROUP_REMOVE_REQUESTED, group));
1410 }
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001411 }
1412 }
1413 for (Group group : storedGroupEntries) {
1414 // there are groups in the store that aren't in the switch
1415 log.debug("Group AUDIT: group {} missing in data plane for device {}",
alshabibb0285992016-03-28 23:30:37 -07001416 group.id(), deviceId);
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001417 groupMissing(group);
1418 }
1419 for (Group group : extraneousStoredEntries) {
1420 // there are groups in the extraneous store that
1421 // aren't in the switch
Saurav Das0fd79d92016-03-07 10:58:36 -08001422 log.debug("Group AUDIT: clearing extraneous group {} from store for device {}",
alshabibb0285992016-03-28 23:30:37 -07001423 group.id(), deviceId);
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001424 removeExtraneousGroupEntry(group);
1425 }
1426
1427 if (!deviceInitialAuditStatus) {
Saurav Das0fd79d92016-03-07 10:58:36 -08001428 log.info("Group AUDIT: Setting device {} initial AUDIT completed",
alshabibb0285992016-03-28 23:30:37 -07001429 deviceId);
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001430 deviceInitialAuditCompleted(deviceId, true);
1431 }
1432 }
1433
helenyrwu89470f12016-08-12 13:18:10 -07001434 @Override
1435 public void notifyOfFailovers(Collection<Group> failoverGroups) {
helenyrwu89470f12016-08-12 13:18:10 -07001436 failoverGroups.forEach(group -> {
1437 if (group.type() == Group.Type.FAILOVER) {
helenyrwua1c41152016-08-18 16:16:14 -07001438 groupTopic.publish(GroupStoreMessage.createGroupFailoverMsg(
1439 group.deviceId(), group));
helenyrwu89470f12016-08-12 13:18:10 -07001440 }
1441 });
helenyrwu89470f12016-08-12 13:18:10 -07001442 }
1443
alshabibb0285992016-03-28 23:30:37 -07001444 private void garbageCollect(DeviceId deviceId,
1445 Set<Group> southboundGroupEntries,
1446 Set<StoredGroupEntry> storedGroupEntries) {
1447 if (!garbageCollect) {
1448 return;
1449 }
1450
1451 Iterator<StoredGroupEntry> it = storedGroupEntries.iterator();
1452 while (it.hasNext()) {
1453 StoredGroupEntry group = it.next();
1454 if (group.state() != GroupState.PENDING_DELETE && checkGroupRefCount(group)) {
1455 log.debug("Garbage collecting group {} on {}", group, deviceId);
1456 deleteGroupDescription(deviceId, group.appCookie());
1457 southboundGroupEntries.remove(group);
1458 it.remove();
1459 }
1460 }
1461 }
1462
1463 private boolean checkGroupRefCount(Group group) {
1464 return (group.referenceCount() == 0 && group.age() >= gcThresh);
1465 }
1466
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001467 private void groupMissing(Group group) {
1468 switch (group.state()) {
1469 case PENDING_DELETE:
1470 log.debug("Group {} delete confirmation from device {}",
1471 group, group.deviceId());
1472 removeGroupEntry(group);
1473 break;
1474 case ADDED:
1475 case PENDING_ADD:
Srikanth Vavilapalli5428b6c2015-05-14 20:22:47 -07001476 case PENDING_ADD_RETRY:
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001477 case PENDING_UPDATE:
1478 log.debug("Group {} is in store but not on device {}",
1479 group, group.deviceId());
1480 StoredGroupEntry existing =
1481 getStoredGroupEntry(group.deviceId(), group.id());
Srikanth Vavilapalli5428b6c2015-05-14 20:22:47 -07001482 log.debug("groupMissing: group entry {} in device {} moving from {} to PENDING_ADD_RETRY",
alshabibb0285992016-03-28 23:30:37 -07001483 existing.id(),
1484 existing.deviceId(),
1485 existing.state());
Srikanth Vavilapalli5428b6c2015-05-14 20:22:47 -07001486 existing.setState(Group.GroupState.PENDING_ADD_RETRY);
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001487 //Re-PUT map entries to trigger map update events
1488 getGroupStoreKeyMap().
alshabibb0285992016-03-28 23:30:37 -07001489 put(new GroupStoreKeyMapKey(existing.deviceId(),
1490 existing.appCookie()), existing);
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001491 notifyDelegate(new GroupEvent(GroupEvent.Type.GROUP_ADD_REQUESTED,
1492 group));
1493 break;
1494 default:
1495 log.debug("Group {} has not been installed.", group);
1496 break;
1497 }
1498 }
1499
1500 private void extraneousGroup(Group group) {
Saurav Das0fd79d92016-03-07 10:58:36 -08001501 log.trace("Group {} is on device {} but not in store.",
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001502 group, group.deviceId());
1503 addOrUpdateExtraneousGroupEntry(group);
1504 }
1505
1506 private void groupAdded(Group group) {
1507 log.trace("Group {} Added or Updated in device {}",
1508 group, group.deviceId());
1509 addOrUpdateGroupEntry(group);
1510 }
alshabib10580802015-02-18 18:30:33 -08001511}