blob: 58417a806d423912af212779040ccafd036eea22 [file] [log] [blame]
alshabib10580802015-02-18 18:30:33 -08001/*
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2015-present Open Networking Foundation
alshabib10580802015-02-18 18:30:33 -08003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.group.impl;
17
Jonathan Hart6ec029a2015-03-24 17:12:35 -070018import com.google.common.collect.FluentIterable;
Charles Chanf4838a72015-12-07 18:13:45 -080019import com.google.common.collect.ImmutableSet;
Jonathan Hart6ec029a2015-03-24 17:12:35 -070020import com.google.common.collect.Iterables;
Srikanth Vavilapalli23181912015-05-04 09:48:09 -070021import com.google.common.collect.Sets;
alshabib10580802015-02-18 18:30:33 -080022import org.apache.felix.scr.annotations.Activate;
23import org.apache.felix.scr.annotations.Component;
24import org.apache.felix.scr.annotations.Deactivate;
alshabibb0285992016-03-28 23:30:37 -070025import org.apache.felix.scr.annotations.Modified;
26import org.apache.felix.scr.annotations.Property;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070027import org.apache.felix.scr.annotations.Reference;
28import org.apache.felix.scr.annotations.ReferenceCardinality;
alshabib10580802015-02-18 18:30:33 -080029import org.apache.felix.scr.annotations.Service;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070030import org.onlab.util.KryoNamespace;
alshabibb0285992016-03-28 23:30:37 -070031import org.onosproject.cfg.ComponentConfigService;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070032import org.onosproject.cluster.ClusterService;
Charles Chanf4838a72015-12-07 18:13:45 -080033import org.onosproject.cluster.NodeId;
alshabib10580802015-02-18 18:30:33 -080034import org.onosproject.core.GroupId;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070035import org.onosproject.mastership.MastershipService;
alshabib10580802015-02-18 18:30:33 -080036import org.onosproject.net.DeviceId;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070037import org.onosproject.net.MastershipRole;
jaegonkim9477a9d2018-04-01 16:36:36 +090038import org.onosproject.net.driver.DriverService;
alshabib10580802015-02-18 18:30:33 -080039import org.onosproject.net.group.DefaultGroup;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070040import org.onosproject.net.group.DefaultGroupBucket;
alshabib10580802015-02-18 18:30:33 -080041import org.onosproject.net.group.DefaultGroupDescription;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070042import org.onosproject.net.group.DefaultGroupKey;
alshabib10580802015-02-18 18:30:33 -080043import org.onosproject.net.group.Group;
44import org.onosproject.net.group.Group.GroupState;
45import org.onosproject.net.group.GroupBucket;
46import org.onosproject.net.group.GroupBuckets;
47import org.onosproject.net.group.GroupDescription;
48import org.onosproject.net.group.GroupEvent;
49import org.onosproject.net.group.GroupEvent.Type;
50import org.onosproject.net.group.GroupKey;
51import org.onosproject.net.group.GroupOperation;
52import org.onosproject.net.group.GroupStore;
53import org.onosproject.net.group.GroupStoreDelegate;
Srikanth Vavilapalli10e75cd2015-04-13 16:21:24 -070054import org.onosproject.net.group.StoredGroupBucketEntry;
alshabib10580802015-02-18 18:30:33 -080055import org.onosproject.net.group.StoredGroupEntry;
56import org.onosproject.store.AbstractStore;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070057import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070058import org.onosproject.store.serializers.KryoNamespaces;
Madan Jampani0b847532016-03-03 13:44:15 -080059import org.onosproject.store.service.ConsistentMap;
60import org.onosproject.store.service.MapEvent;
61import org.onosproject.store.service.MapEventListener;
alshabibb0285992016-03-28 23:30:37 -070062import org.onosproject.store.service.MultiValuedTimestamp;
Madan Jampani0b847532016-03-03 13:44:15 -080063import org.onosproject.store.service.Serializer;
Jonathan Hart6ec029a2015-03-24 17:12:35 -070064import org.onosproject.store.service.StorageService;
helenyrwua1c41152016-08-18 16:16:14 -070065import org.onosproject.store.service.Topic;
Madan Jampani0b847532016-03-03 13:44:15 -080066import org.onosproject.store.service.Versioned;
Kavitha Alagesanc884c3ef2017-01-19 12:32:26 +053067import org.onosproject.store.service.DistributedPrimitive.Status;
alshabibb0285992016-03-28 23:30:37 -070068import org.osgi.service.component.ComponentContext;
alshabib10580802015-02-18 18:30:33 -080069import org.slf4j.Logger;
70
Jonathan Hart6ec029a2015-03-24 17:12:35 -070071import java.util.ArrayList;
Srikanth Vavilapalli23181912015-05-04 09:48:09 -070072import java.util.Collection;
Charles Chanf4838a72015-12-07 18:13:45 -080073import java.util.Collections;
alshabibb0285992016-03-28 23:30:37 -070074import java.util.Dictionary;
Jonathan Hart6ec029a2015-03-24 17:12:35 -070075import java.util.HashMap;
Charles Chan0c7c43b2016-01-14 17:39:20 -080076import java.util.HashSet;
Srikanth Vavilapalli23181912015-05-04 09:48:09 -070077import java.util.Iterator;
Jonathan Hart6ec029a2015-03-24 17:12:35 -070078import java.util.List;
Madan Jampani0b847532016-03-03 13:44:15 -080079import java.util.Map;
Charles Chan0c7c43b2016-01-14 17:39:20 -080080import java.util.Map.Entry;
Jonathan Hart6ec029a2015-03-24 17:12:35 -070081import java.util.Objects;
Sho SHIMIZU30d639b2015-05-05 09:30:35 -070082import java.util.Optional;
alshabibb0285992016-03-28 23:30:37 -070083import java.util.Properties;
Srikanth Vavilapalli23181912015-05-04 09:48:09 -070084import java.util.Set;
Jonathan Hart6ec029a2015-03-24 17:12:35 -070085import java.util.concurrent.ConcurrentHashMap;
86import java.util.concurrent.ConcurrentMap;
87import java.util.concurrent.ExecutorService;
88import java.util.concurrent.Executors;
Kavitha Alagesanc884c3ef2017-01-19 12:32:26 +053089import java.util.concurrent.ScheduledExecutorService;
Jonathan Hart6ec029a2015-03-24 17:12:35 -070090import java.util.concurrent.atomic.AtomicInteger;
Kavitha Alagesanc884c3ef2017-01-19 12:32:26 +053091import java.util.function.Consumer;
Jonathan Hart6ec029a2015-03-24 17:12:35 -070092import java.util.stream.Collectors;
93
alshabibb0285992016-03-28 23:30:37 -070094import static com.google.common.base.Strings.isNullOrEmpty;
Kavitha Alagesanc884c3ef2017-01-19 12:32:26 +053095import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
alshabibb0285992016-03-28 23:30:37 -070096import static org.onlab.util.Tools.get;
Jonathan Hart6ec029a2015-03-24 17:12:35 -070097import static org.onlab.util.Tools.groupedThreads;
98import static org.slf4j.LoggerFactory.getLogger;
alshabib10580802015-02-18 18:30:33 -080099
100/**
Saurav Das0fd79d92016-03-07 10:58:36 -0800101 * Manages inventory of group entries using distributed group stores from the
102 * storage service.
alshabib10580802015-02-18 18:30:33 -0800103 */
104@Component(immediate = true)
105@Service
106public class DistributedGroupStore
107 extends AbstractStore<GroupEvent, GroupStoreDelegate>
108 implements GroupStore {
109
110 private final Logger log = getLogger(getClass());
111
alshabibb0285992016-03-28 23:30:37 -0700112 private static final boolean GARBAGE_COLLECT = false;
113 private static final int GC_THRESH = 6;
Kavitha Alagesanc56cded2017-01-13 10:48:18 +0530114 private static final boolean ALLOW_EXTRANEOUS_GROUPS = true;
alshabibb0285992016-03-28 23:30:37 -0700115
alshabib10580802015-02-18 18:30:33 -0800116 private final int dummyId = 0xffffffff;
Yi Tsengfa394de2017-02-01 11:26:40 -0800117 private final GroupId dummyGroupId = new GroupId(dummyId);
alshabib10580802015-02-18 18:30:33 -0800118
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700119 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
120 protected ClusterCommunicationService clusterCommunicator;
121
122 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
123 protected ClusterService clusterService;
124
125 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700126 protected StorageService storageService;
127
128 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700129 protected MastershipService mastershipService;
130
alshabibb0285992016-03-28 23:30:37 -0700131 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
132 protected ComponentConfigService cfgService;
133
jaegonkim9477a9d2018-04-01 16:36:36 +0900134 // Guarantees enabling DriverService before enabling GroupStore
135 // (DriverService is used in serializing/de-serializing DefaultGroup)
136 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
137 protected DriverService driverService;
138
Kavitha Alagesanc884c3ef2017-01-19 12:32:26 +0530139 private ScheduledExecutorService executor;
140 private Consumer<Status> statusChangeListener;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700141 // Per device group table with (device id + app cookie) as key
Madan Jampani0b847532016-03-03 13:44:15 -0800142 private ConsistentMap<GroupStoreKeyMapKey,
alshabibb0285992016-03-28 23:30:37 -0700143 StoredGroupEntry> groupStoreEntriesByKey = null;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700144 // Per device group table with (device id + group id) as key
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700145 private final ConcurrentMap<DeviceId, ConcurrentMap<GroupId, StoredGroupEntry>>
alshabibb0285992016-03-28 23:30:37 -0700146 groupEntriesById = new ConcurrentHashMap<>();
Madan Jampani0b847532016-03-03 13:44:15 -0800147 private ConsistentMap<GroupStoreKeyMapKey,
alshabibb0285992016-03-28 23:30:37 -0700148 StoredGroupEntry> auditPendingReqQueue = null;
Frank Wange0eb5ce2016-07-01 18:21:25 +0800149 private MapEventListener<GroupStoreKeyMapKey, StoredGroupEntry>
150 mapListener = new GroupStoreKeyMapListener();
alshabib10580802015-02-18 18:30:33 -0800151 private final ConcurrentMap<DeviceId, ConcurrentMap<GroupId, Group>>
152 extraneousGroupEntriesById = new ConcurrentHashMap<>();
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700153 private ExecutorService messageHandlingExecutor;
154 private static final int MESSAGE_HANDLER_THREAD_POOL_SIZE = 1;
Sho SHIMIZU7a4087b2015-09-10 09:23:16 -0700155 private final HashMap<DeviceId, Boolean> deviceAuditStatus = new HashMap<>();
alshabib10580802015-02-18 18:30:33 -0800156
157 private final AtomicInteger groupIdGen = new AtomicInteger();
158
HIGUCHI Yuta180d70f2015-10-01 16:13:56 -0700159 private KryoNamespace clusterMsgSerializer;
160
helenyrwua1c41152016-08-18 16:16:14 -0700161 private static Topic<GroupStoreMessage> groupTopic;
162
alshabibb0285992016-03-28 23:30:37 -0700163 @Property(name = "garbageCollect", boolValue = GARBAGE_COLLECT,
164 label = "Enable group garbage collection")
165 private boolean garbageCollect = GARBAGE_COLLECT;
166
167 @Property(name = "gcThresh", intValue = GC_THRESH,
168 label = "Number of rounds for group garbage collection")
169 private int gcThresh = GC_THRESH;
170
Kavitha Alagesanc56cded2017-01-13 10:48:18 +0530171 @Property(name = "allowExtraneousGroups", boolValue = ALLOW_EXTRANEOUS_GROUPS,
172 label = "Allow groups in switches not installed by ONOS")
173 private boolean allowExtraneousGroups = ALLOW_EXTRANEOUS_GROUPS;
alshabibb0285992016-03-28 23:30:37 -0700174
alshabib10580802015-02-18 18:30:33 -0800175 @Activate
sisubram4beea652017-08-09 10:38:14 +0000176 public void activate(ComponentContext context) {
alshabibb0285992016-03-28 23:30:37 -0700177 cfgService.registerProperties(getClass());
sisubram4beea652017-08-09 10:38:14 +0000178 modified(context);
HIGUCHI Yuta3a84b322016-05-18 13:38:07 -0700179 KryoNamespace.Builder kryoBuilder = new KryoNamespace.Builder()
alshabibb0285992016-03-28 23:30:37 -0700180 .register(KryoNamespaces.API)
HIGUCHI Yuta3a84b322016-05-18 13:38:07 -0700181 .nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID)
alshabibb0285992016-03-28 23:30:37 -0700182 .register(DefaultGroup.class,
183 DefaultGroupBucket.class,
184 DefaultGroupDescription.class,
185 DefaultGroupKey.class,
186 GroupDescription.Type.class,
187 Group.GroupState.class,
188 GroupBuckets.class,
alshabibb0285992016-03-28 23:30:37 -0700189 GroupStoreMessage.class,
190 GroupStoreMessage.Type.class,
191 UpdateType.class,
192 GroupStoreMessageSubjects.class,
193 MultiValuedTimestamp.class,
194 GroupStoreKeyMapKey.class,
195 GroupStoreIdMapKey.class,
196 GroupStoreMapKey.class
197 );
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700198
HIGUCHI Yuta3a84b322016-05-18 13:38:07 -0700199 clusterMsgSerializer = kryoBuilder.build("GroupStore");
200 Serializer serializer = Serializer.using(clusterMsgSerializer);
HIGUCHI Yuta180d70f2015-10-01 16:13:56 -0700201
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700202 messageHandlingExecutor = Executors.
203 newFixedThreadPool(MESSAGE_HANDLER_THREAD_POOL_SIZE,
204 groupedThreads("onos/store/group",
HIGUCHI Yutad9e01052016-04-14 09:31:42 -0700205 "message-handlers",
206 log));
Madan Jampani01e05fb2015-08-13 13:29:36 -0700207
208 clusterCommunicator.addSubscriber(GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST,
alshabibb0285992016-03-28 23:30:37 -0700209 clusterMsgSerializer::deserialize,
210 this::process,
211 messageHandlingExecutor);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700212
Madan Jampani0b847532016-03-03 13:44:15 -0800213 log.debug("Creating Consistent map onos-group-store-keymap");
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700214
Madan Jampani0b847532016-03-03 13:44:15 -0800215 groupStoreEntriesByKey = storageService.<GroupStoreKeyMapKey, StoredGroupEntry>consistentMapBuilder()
216 .withName("onos-group-store-keymap")
HIGUCHI Yuta3a84b322016-05-18 13:38:07 -0700217 .withSerializer(serializer)
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700218 .build();
Frank Wange0eb5ce2016-07-01 18:21:25 +0800219 groupStoreEntriesByKey.addListener(mapListener);
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700220 log.debug("Current size of groupstorekeymap:{}",
221 groupStoreEntriesByKey.size());
Thiago Santosfb73c502016-08-18 18:15:13 -0300222 synchronizeGroupStoreEntries();
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700223
Kavitha Alagesanc884c3ef2017-01-19 12:32:26 +0530224 log.debug("Creating GroupStoreId Map From GroupStoreKey Map");
225 matchGroupEntries();
226 executor = newSingleThreadScheduledExecutor(groupedThreads("onos/group", "store", log));
227 statusChangeListener = status -> {
228 if (status == Status.ACTIVE) {
229 executor.execute(this::matchGroupEntries);
230 }
231 };
232 groupStoreEntriesByKey.addStatusChangeListener(statusChangeListener);
233
Madan Jampani0b847532016-03-03 13:44:15 -0800234 log.debug("Creating Consistent map pendinggroupkeymap");
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700235
Madan Jampani0b847532016-03-03 13:44:15 -0800236 auditPendingReqQueue = storageService.<GroupStoreKeyMapKey, StoredGroupEntry>consistentMapBuilder()
237 .withName("onos-pending-group-keymap")
HIGUCHI Yuta3a84b322016-05-18 13:38:07 -0700238 .withSerializer(serializer)
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700239 .build();
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700240 log.debug("Current size of pendinggroupkeymap:{}",
241 auditPendingReqQueue.size());
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700242
helenyrwua1c41152016-08-18 16:16:14 -0700243 groupTopic = getOrCreateGroupTopic(serializer);
244 groupTopic.subscribe(this::processGroupMessage);
245
alshabib10580802015-02-18 18:30:33 -0800246 log.info("Started");
247 }
248
249 @Deactivate
250 public void deactivate() {
Frank Wange0eb5ce2016-07-01 18:21:25 +0800251 groupStoreEntriesByKey.removeListener(mapListener);
alshabibb0285992016-03-28 23:30:37 -0700252 cfgService.unregisterProperties(getClass(), false);
HIGUCHI Yuta180d70f2015-10-01 16:13:56 -0700253 clusterCommunicator.removeSubscriber(GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST);
alshabib10580802015-02-18 18:30:33 -0800254 log.info("Stopped");
255 }
256
alshabibb0285992016-03-28 23:30:37 -0700257 @Modified
258 public void modified(ComponentContext context) {
259 Dictionary<?, ?> properties = context != null ? context.getProperties() : new Properties();
260
261 try {
262 String s = get(properties, "garbageCollect");
263 garbageCollect = isNullOrEmpty(s) ? GARBAGE_COLLECT : Boolean.parseBoolean(s.trim());
264
265 s = get(properties, "gcThresh");
266 gcThresh = isNullOrEmpty(s) ? GC_THRESH : Integer.parseInt(s.trim());
Kavitha Alagesanc56cded2017-01-13 10:48:18 +0530267
268 s = get(properties, "allowExtraneousGroups");
269 allowExtraneousGroups = isNullOrEmpty(s) ? ALLOW_EXTRANEOUS_GROUPS : Boolean.parseBoolean(s.trim());
alshabibb0285992016-03-28 23:30:37 -0700270 } catch (Exception e) {
271 gcThresh = GC_THRESH;
272 garbageCollect = GARBAGE_COLLECT;
Kavitha Alagesanc56cded2017-01-13 10:48:18 +0530273 allowExtraneousGroups = ALLOW_EXTRANEOUS_GROUPS;
alshabibb0285992016-03-28 23:30:37 -0700274 }
275 }
276
helenyrwua1c41152016-08-18 16:16:14 -0700277 private Topic<GroupStoreMessage> getOrCreateGroupTopic(Serializer serializer) {
278 if (groupTopic == null) {
279 return storageService.getTopic("group-failover-notif", serializer);
280 } else {
281 return groupTopic;
282 }
Sho SHIMIZUa6285542017-01-12 15:08:24 -0800283 }
helenyrwua1c41152016-08-18 16:16:14 -0700284
alshabib10580802015-02-18 18:30:33 -0800285 /**
Kavitha Alagesanc884c3ef2017-01-19 12:32:26 +0530286 * Updating values of groupEntriesById.
287 */
288 private void matchGroupEntries() {
289 for (Entry<GroupStoreKeyMapKey, StoredGroupEntry> entry : groupStoreEntriesByKey.asJavaMap().entrySet()) {
290 StoredGroupEntry group = entry.getValue();
291 getGroupIdTable(entry.getKey().deviceId()).put(group.id(), group);
292 }
293 }
294
Thiago Santosfb73c502016-08-18 18:15:13 -0300295
296 private void synchronizeGroupStoreEntries() {
297 Map<GroupStoreKeyMapKey, StoredGroupEntry> groupEntryMap = groupStoreEntriesByKey.asJavaMap();
298 for (Entry<GroupStoreKeyMapKey, StoredGroupEntry> entry : groupEntryMap.entrySet()) {
Thiago Santosfb73c502016-08-18 18:15:13 -0300299 StoredGroupEntry value = entry.getValue();
Thiago Santosfb73c502016-08-18 18:15:13 -0300300 ConcurrentMap<GroupId, StoredGroupEntry> groupIdTable = getGroupIdTable(value.deviceId());
301 groupIdTable.put(value.id(), value);
302 }
303 }
304
Kavitha Alagesanc884c3ef2017-01-19 12:32:26 +0530305 /**
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700306 * Returns the group store eventual consistent key map.
alshabib10580802015-02-18 18:30:33 -0800307 *
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700308 * @return Map representing group key table.
alshabib10580802015-02-18 18:30:33 -0800309 */
Madan Jampani0b847532016-03-03 13:44:15 -0800310 private Map<GroupStoreKeyMapKey, StoredGroupEntry>
alshabibb0285992016-03-28 23:30:37 -0700311 getGroupStoreKeyMap() {
Madan Jampani0b847532016-03-03 13:44:15 -0800312 return groupStoreEntriesByKey.asJavaMap();
alshabib10580802015-02-18 18:30:33 -0800313 }
314
315 /**
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700316 * Returns the group id table for specified device.
alshabib10580802015-02-18 18:30:33 -0800317 *
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700318 * @param deviceId identifier of the device
319 * @return Map representing group key table of given device.
alshabib10580802015-02-18 18:30:33 -0800320 */
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700321 private ConcurrentMap<GroupId, StoredGroupEntry> getGroupIdTable(DeviceId deviceId) {
Yuta HIGUCHIc2e68152016-08-16 13:47:36 -0700322 return groupEntriesById.computeIfAbsent(deviceId, k -> new ConcurrentHashMap<>());
alshabib10580802015-02-18 18:30:33 -0800323 }
324
325 /**
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700326 * Returns the pending group request table.
alshabib10580802015-02-18 18:30:33 -0800327 *
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700328 * @return Map representing group key table.
alshabib10580802015-02-18 18:30:33 -0800329 */
Madan Jampani0b847532016-03-03 13:44:15 -0800330 private Map<GroupStoreKeyMapKey, StoredGroupEntry>
alshabibb0285992016-03-28 23:30:37 -0700331 getPendingGroupKeyTable() {
Madan Jampani0b847532016-03-03 13:44:15 -0800332 return auditPendingReqQueue.asJavaMap();
alshabib10580802015-02-18 18:30:33 -0800333 }
334
335 /**
336 * Returns the extraneous group id table for specified device.
337 *
338 * @param deviceId identifier of the device
339 * @return Map representing group key table of given device.
340 */
341 private ConcurrentMap<GroupId, Group>
342 getExtraneousGroupIdTable(DeviceId deviceId) {
Yuta HIGUCHIc2e68152016-08-16 13:47:36 -0700343 return extraneousGroupEntriesById.computeIfAbsent(deviceId, k -> new ConcurrentHashMap<>());
alshabib10580802015-02-18 18:30:33 -0800344 }
345
346 /**
347 * Returns the number of groups for the specified device in the store.
348 *
349 * @return number of groups for the specified device
350 */
351 @Override
352 public int getGroupCount(DeviceId deviceId) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700353 return (getGroups(deviceId) != null) ?
alshabibb0285992016-03-28 23:30:37 -0700354 Iterables.size(getGroups(deviceId)) : 0;
alshabib10580802015-02-18 18:30:33 -0800355 }
356
357 /**
358 * Returns the groups associated with a device.
359 *
360 * @param deviceId the device ID
alshabib10580802015-02-18 18:30:33 -0800361 * @return the group entries
362 */
363 @Override
364 public Iterable<Group> getGroups(DeviceId deviceId) {
Charles Chanf4838a72015-12-07 18:13:45 -0800365 // Let ImmutableSet.copyOf do the type conversion
366 return ImmutableSet.copyOf(getStoredGroups(deviceId));
alshabib10580802015-02-18 18:30:33 -0800367 }
368
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700369 private Iterable<StoredGroupEntry> getStoredGroups(DeviceId deviceId) {
Charles Chanf4838a72015-12-07 18:13:45 -0800370 NodeId master = mastershipService.getMasterFor(deviceId);
371 if (master == null) {
372 log.debug("Failed to getGroups: No master for {}", deviceId);
373 return Collections.emptySet();
374 }
375
376 Set<StoredGroupEntry> storedGroups = getGroupStoreKeyMap().values()
377 .stream()
378 .filter(input -> input.deviceId().equals(deviceId))
379 .collect(Collectors.toSet());
380 return ImmutableSet.copyOf(storedGroups);
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700381 }
382
alshabib10580802015-02-18 18:30:33 -0800383 /**
384 * Returns the stored group entry.
385 *
alshabibb0285992016-03-28 23:30:37 -0700386 * @param deviceId the device ID
alshabib10580802015-02-18 18:30:33 -0800387 * @param appCookie the group key
alshabib10580802015-02-18 18:30:33 -0800388 * @return a group associated with the key
389 */
390 @Override
391 public Group getGroup(DeviceId deviceId, GroupKey appCookie) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700392 return getStoredGroupEntry(deviceId, appCookie);
393 }
394
395 private StoredGroupEntry getStoredGroupEntry(DeviceId deviceId,
396 GroupKey appCookie) {
397 return getGroupStoreKeyMap().get(new GroupStoreKeyMapKey(deviceId,
398 appCookie));
399 }
400
401 @Override
402 public Group getGroup(DeviceId deviceId, GroupId groupId) {
403 return getStoredGroupEntry(deviceId, groupId);
404 }
405
406 private StoredGroupEntry getStoredGroupEntry(DeviceId deviceId,
407 GroupId groupId) {
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700408 return getGroupIdTable(deviceId).get(groupId);
alshabib10580802015-02-18 18:30:33 -0800409 }
410
411 private int getFreeGroupIdValue(DeviceId deviceId) {
412 int freeId = groupIdGen.incrementAndGet();
413
414 while (true) {
Yi Tsengfa394de2017-02-01 11:26:40 -0800415 Group existing = getGroup(deviceId, new GroupId(freeId));
alshabib10580802015-02-18 18:30:33 -0800416 if (existing == null) {
417 existing = (
418 extraneousGroupEntriesById.get(deviceId) != null) ?
419 extraneousGroupEntriesById.get(deviceId).
Yi Tsengfa394de2017-02-01 11:26:40 -0800420 get(new GroupId(freeId)) :
alshabib10580802015-02-18 18:30:33 -0800421 null;
422 }
423 if (existing != null) {
424 freeId = groupIdGen.incrementAndGet();
425 } else {
426 break;
427 }
428 }
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700429 log.debug("getFreeGroupIdValue: Next Free ID is {}", freeId);
alshabib10580802015-02-18 18:30:33 -0800430 return freeId;
431 }
432
433 /**
434 * Stores a new group entry using the information from group description.
435 *
436 * @param groupDesc group description to be used to create group entry
437 */
438 @Override
439 public void storeGroupDescription(GroupDescription groupDesc) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700440 log.debug("In storeGroupDescription");
alshabib10580802015-02-18 18:30:33 -0800441 // Check if a group is existing with the same key
Saurav Das8a0732e2015-11-20 15:27:53 -0800442 Group existingGroup = getGroup(groupDesc.deviceId(), groupDesc.appCookie());
443 if (existingGroup != null) {
Saurav Dasc568c342018-01-25 09:49:01 -0800444 log.debug("Group already exists with the same key {} in dev:{} with id:0x{}",
Saurav Das8a0732e2015-11-20 15:27:53 -0800445 groupDesc.appCookie(), groupDesc.deviceId(),
446 Integer.toHexString(existingGroup.id().id()));
alshabib10580802015-02-18 18:30:33 -0800447 return;
448 }
449
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700450 // Check if group to be created by a remote instance
Madan Jampani175e8fd2015-05-20 14:10:45 -0700451 if (mastershipService.getLocalRole(groupDesc.deviceId()) != MastershipRole.MASTER) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700452 log.debug("storeGroupDescription: Device {} local role is not MASTER",
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700453 groupDesc.deviceId());
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700454 if (mastershipService.getMasterFor(groupDesc.deviceId()) == null) {
Sivachidambaram Subramanian9f816de2017-06-13 07:16:54 +0530455 log.debug("No Master for device {}..."
456 + "Queuing Group ADD request",
alshabibb0285992016-03-28 23:30:37 -0700457 groupDesc.deviceId());
Sivachidambaram Subramanian9f816de2017-06-13 07:16:54 +0530458 addToPendingAudit(groupDesc);
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700459 return;
460 }
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700461 GroupStoreMessage groupOp = GroupStoreMessage.
462 createGroupAddRequestMsg(groupDesc.deviceId(),
463 groupDesc);
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700464
Madan Jampani175e8fd2015-05-20 14:10:45 -0700465 clusterCommunicator.unicast(groupOp,
alshabibb0285992016-03-28 23:30:37 -0700466 GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST,
467 clusterMsgSerializer::serialize,
468 mastershipService.getMasterFor(groupDesc.deviceId()))
469 .whenComplete((result, error) -> {
Madan Jampani175e8fd2015-05-20 14:10:45 -0700470 if (error != null) {
471 log.warn("Failed to send request to master: {} to {}",
alshabibb0285992016-03-28 23:30:37 -0700472 groupOp,
473 mastershipService.getMasterFor(groupDesc.deviceId()));
Madan Jampani175e8fd2015-05-20 14:10:45 -0700474 //TODO: Send Group operation failure event
475 } else {
476 log.debug("Sent Group operation request for device {} "
alshabibb0285992016-03-28 23:30:37 -0700477 + "to remote MASTER {}",
478 groupDesc.deviceId(),
479 mastershipService.getMasterFor(groupDesc.deviceId()));
Madan Jampani175e8fd2015-05-20 14:10:45 -0700480 }
481 });
alshabib10580802015-02-18 18:30:33 -0800482 return;
483 }
484
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700485 log.debug("Store group for device {} is getting handled locally",
486 groupDesc.deviceId());
alshabib10580802015-02-18 18:30:33 -0800487 storeGroupDescriptionInternal(groupDesc);
488 }
489
Sivachidambaram Subramanian9f816de2017-06-13 07:16:54 +0530490 private void addToPendingAudit(GroupDescription groupDesc) {
491 Integer groupIdVal = groupDesc.givenGroupId();
492 GroupId groupId = (groupIdVal != null) ? new GroupId(groupIdVal) : dummyGroupId;
493 addToPendingKeyTable(new DefaultGroup(groupId, groupDesc));
494 }
495
496 private void addToPendingKeyTable(StoredGroupEntry group) {
497 group.setState(GroupState.WAITING_AUDIT_COMPLETE);
498 Map<GroupStoreKeyMapKey, StoredGroupEntry> pendingKeyTable =
499 getPendingGroupKeyTable();
500 pendingKeyTable.put(new GroupStoreKeyMapKey(group.deviceId(),
501 group.appCookie()),
502 group);
503 }
504
Srikanth Vavilapallie48b3cf2015-07-06 11:43:07 -0700505 private Group getMatchingExtraneousGroupbyId(DeviceId deviceId, Integer groupId) {
506 ConcurrentMap<GroupId, Group> extraneousMap =
507 extraneousGroupEntriesById.get(deviceId);
508 if (extraneousMap == null) {
509 return null;
510 }
Yi Tsengfa394de2017-02-01 11:26:40 -0800511 return extraneousMap.get(new GroupId(groupId));
Srikanth Vavilapallie48b3cf2015-07-06 11:43:07 -0700512 }
513
514 private Group getMatchingExtraneousGroupbyBuckets(DeviceId deviceId,
515 GroupBuckets buckets) {
516 ConcurrentMap<GroupId, Group> extraneousMap =
517 extraneousGroupEntriesById.get(deviceId);
518 if (extraneousMap == null) {
519 return null;
520 }
521
alshabibb0285992016-03-28 23:30:37 -0700522 for (Group extraneousGroup : extraneousMap.values()) {
Srikanth Vavilapallie48b3cf2015-07-06 11:43:07 -0700523 if (extraneousGroup.buckets().equals(buckets)) {
524 return extraneousGroup;
525 }
526 }
527 return null;
528 }
529
alshabib10580802015-02-18 18:30:33 -0800530 private void storeGroupDescriptionInternal(GroupDescription groupDesc) {
531 // Check if a group is existing with the same key
532 if (getGroup(groupDesc.deviceId(), groupDesc.appCookie()) != null) {
533 return;
534 }
535
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700536 if (deviceAuditStatus.get(groupDesc.deviceId()) == null) {
537 // Device group audit has not completed yet
538 // Add this group description to pending group key table
539 // Create a group entry object with Dummy Group ID
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700540 log.debug("storeGroupDescriptionInternal: Device {} AUDIT pending...Queuing Group ADD request",
alshabibb0285992016-03-28 23:30:37 -0700541 groupDesc.deviceId());
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700542 StoredGroupEntry group = new DefaultGroup(dummyGroupId, groupDesc);
543 group.setState(GroupState.WAITING_AUDIT_COMPLETE);
Madan Jampani0b847532016-03-03 13:44:15 -0800544 Map<GroupStoreKeyMapKey, StoredGroupEntry> pendingKeyTable =
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700545 getPendingGroupKeyTable();
546 pendingKeyTable.put(new GroupStoreKeyMapKey(groupDesc.deviceId(),
547 groupDesc.appCookie()),
548 group);
549 return;
550 }
551
Srikanth Vavilapallie48b3cf2015-07-06 11:43:07 -0700552 Group matchingExtraneousGroup = null;
553 if (groupDesc.givenGroupId() != null) {
554 //Check if there is a extraneous group existing with the same Id
555 matchingExtraneousGroup = getMatchingExtraneousGroupbyId(
alshabibb0285992016-03-28 23:30:37 -0700556 groupDesc.deviceId(), groupDesc.givenGroupId());
Srikanth Vavilapallie48b3cf2015-07-06 11:43:07 -0700557 if (matchingExtraneousGroup != null) {
Saurav Das0fd79d92016-03-07 10:58:36 -0800558 log.debug("storeGroupDescriptionInternal: Matching extraneous group "
alshabibb0285992016-03-28 23:30:37 -0700559 + "found in Device {} for group id 0x{}",
Srikanth Vavilapallie48b3cf2015-07-06 11:43:07 -0700560 groupDesc.deviceId(),
Saurav Das0fd79d92016-03-07 10:58:36 -0800561 Integer.toHexString(groupDesc.givenGroupId()));
Srikanth Vavilapallie48b3cf2015-07-06 11:43:07 -0700562 //Check if the group buckets matches with user provided buckets
563 if (matchingExtraneousGroup.buckets().equals(groupDesc.buckets())) {
564 //Group is already existing with the same buckets and Id
565 // Create a group entry object
Saurav Das0fd79d92016-03-07 10:58:36 -0800566 log.debug("storeGroupDescriptionInternal: Buckets also matching "
alshabibb0285992016-03-28 23:30:37 -0700567 + "in Device {} for group id 0x{}",
Srikanth Vavilapallie48b3cf2015-07-06 11:43:07 -0700568 groupDesc.deviceId(),
Saurav Das0fd79d92016-03-07 10:58:36 -0800569 Integer.toHexString(groupDesc.givenGroupId()));
Srikanth Vavilapallie48b3cf2015-07-06 11:43:07 -0700570 StoredGroupEntry group = new DefaultGroup(
alshabibb0285992016-03-28 23:30:37 -0700571 matchingExtraneousGroup.id(), groupDesc);
Srikanth Vavilapallie48b3cf2015-07-06 11:43:07 -0700572 // Insert the newly created group entry into key and id maps
573 getGroupStoreKeyMap().
alshabibb0285992016-03-28 23:30:37 -0700574 put(new GroupStoreKeyMapKey(groupDesc.deviceId(),
575 groupDesc.appCookie()), group);
Srikanth Vavilapallie48b3cf2015-07-06 11:43:07 -0700576 // Ensure it also inserted into group id based table to
577 // avoid any chances of duplication in group id generation
578 getGroupIdTable(groupDesc.deviceId()).
alshabibb0285992016-03-28 23:30:37 -0700579 put(matchingExtraneousGroup.id(), group);
Srikanth Vavilapallie48b3cf2015-07-06 11:43:07 -0700580 addOrUpdateGroupEntry(matchingExtraneousGroup);
581 removeExtraneousGroupEntry(matchingExtraneousGroup);
582 return;
583 } else {
584 //Group buckets are not matching. Update group
585 //with user provided buckets.
Saurav Das0fd79d92016-03-07 10:58:36 -0800586 log.debug("storeGroupDescriptionInternal: Buckets are not "
alshabibb0285992016-03-28 23:30:37 -0700587 + "matching in Device {} for group id 0x{}",
Srikanth Vavilapallie48b3cf2015-07-06 11:43:07 -0700588 groupDesc.deviceId(),
Saurav Das0fd79d92016-03-07 10:58:36 -0800589 Integer.toHexString(groupDesc.givenGroupId()));
590 StoredGroupEntry modifiedGroup = new DefaultGroup(
alshabibb0285992016-03-28 23:30:37 -0700591 matchingExtraneousGroup.id(), groupDesc);
Saurav Das0fd79d92016-03-07 10:58:36 -0800592 modifiedGroup.setState(GroupState.PENDING_UPDATE);
593 getGroupStoreKeyMap().
alshabibb0285992016-03-28 23:30:37 -0700594 put(new GroupStoreKeyMapKey(groupDesc.deviceId(),
595 groupDesc.appCookie()), modifiedGroup);
Saurav Das0fd79d92016-03-07 10:58:36 -0800596 // Ensure it also inserted into group id based table to
597 // avoid any chances of duplication in group id generation
598 getGroupIdTable(groupDesc.deviceId()).
alshabibb0285992016-03-28 23:30:37 -0700599 put(matchingExtraneousGroup.id(), modifiedGroup);
Saurav Das0fd79d92016-03-07 10:58:36 -0800600 removeExtraneousGroupEntry(matchingExtraneousGroup);
601 log.debug("storeGroupDescriptionInternal: Triggering Group "
alshabibb0285992016-03-28 23:30:37 -0700602 + "UPDATE request for {} in device {}",
Saurav Das0fd79d92016-03-07 10:58:36 -0800603 matchingExtraneousGroup.id(),
604 groupDesc.deviceId());
605 notifyDelegate(new GroupEvent(Type.GROUP_UPDATE_REQUESTED, modifiedGroup));
606 return;
Srikanth Vavilapallie48b3cf2015-07-06 11:43:07 -0700607 }
608 }
609 } else {
610 //Check if there is an extraneous group with user provided buckets
611 matchingExtraneousGroup = getMatchingExtraneousGroupbyBuckets(
alshabibb0285992016-03-28 23:30:37 -0700612 groupDesc.deviceId(), groupDesc.buckets());
Srikanth Vavilapallie48b3cf2015-07-06 11:43:07 -0700613 if (matchingExtraneousGroup != null) {
614 //Group is already existing with the same buckets.
615 //So reuse this group.
616 log.debug("storeGroupDescriptionInternal: Matching extraneous group found in Device {}",
617 groupDesc.deviceId());
618 //Create a group entry object
619 StoredGroupEntry group = new DefaultGroup(
alshabibb0285992016-03-28 23:30:37 -0700620 matchingExtraneousGroup.id(), groupDesc);
Srikanth Vavilapallie48b3cf2015-07-06 11:43:07 -0700621 // Insert the newly created group entry into key and id maps
622 getGroupStoreKeyMap().
alshabibb0285992016-03-28 23:30:37 -0700623 put(new GroupStoreKeyMapKey(groupDesc.deviceId(),
624 groupDesc.appCookie()), group);
Srikanth Vavilapallie48b3cf2015-07-06 11:43:07 -0700625 // Ensure it also inserted into group id based table to
626 // avoid any chances of duplication in group id generation
627 getGroupIdTable(groupDesc.deviceId()).
alshabibb0285992016-03-28 23:30:37 -0700628 put(matchingExtraneousGroup.id(), group);
Srikanth Vavilapallie48b3cf2015-07-06 11:43:07 -0700629 addOrUpdateGroupEntry(matchingExtraneousGroup);
630 removeExtraneousGroupEntry(matchingExtraneousGroup);
631 return;
632 } else {
633 //TODO: Check if there are any empty groups that can be used here
634 log.debug("storeGroupDescriptionInternal: No matching extraneous groups found in Device {}",
635 groupDesc.deviceId());
636 }
637 }
638
Saurav Das100e3b82015-04-30 11:12:10 -0700639 GroupId id = null;
640 if (groupDesc.givenGroupId() == null) {
641 // Get a new group identifier
Yi Tsengfa394de2017-02-01 11:26:40 -0800642 id = new GroupId(getFreeGroupIdValue(groupDesc.deviceId()));
Saurav Das100e3b82015-04-30 11:12:10 -0700643 } else {
Saurav Das8be4e3a2016-03-11 17:19:07 -0800644 // we need to use the identifier passed in by caller, but check if
645 // already used
646 Group existing = getGroup(groupDesc.deviceId(),
Yi Tsengfa394de2017-02-01 11:26:40 -0800647 new GroupId(groupDesc.givenGroupId()));
Saurav Das8be4e3a2016-03-11 17:19:07 -0800648 if (existing != null) {
649 log.warn("Group already exists with the same id: 0x{} in dev:{} "
alshabibb0285992016-03-28 23:30:37 -0700650 + "but with different key: {} (request gkey: {})",
651 Integer.toHexString(groupDesc.givenGroupId()),
652 groupDesc.deviceId(),
653 existing.appCookie(),
654 groupDesc.appCookie());
Saurav Das8be4e3a2016-03-11 17:19:07 -0800655 return;
656 }
Yi Tsengfa394de2017-02-01 11:26:40 -0800657 id = new GroupId(groupDesc.givenGroupId());
Saurav Das100e3b82015-04-30 11:12:10 -0700658 }
alshabib10580802015-02-18 18:30:33 -0800659 // Create a group entry object
660 StoredGroupEntry group = new DefaultGroup(id, groupDesc);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700661 // Insert the newly created group entry into key and id maps
662 getGroupStoreKeyMap().
alshabibb0285992016-03-28 23:30:37 -0700663 put(new GroupStoreKeyMapKey(groupDesc.deviceId(),
664 groupDesc.appCookie()), group);
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700665 // Ensure it also inserted into group id based table to
666 // avoid any chances of duplication in group id generation
667 getGroupIdTable(groupDesc.deviceId()).
alshabibb0285992016-03-28 23:30:37 -0700668 put(id, group);
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700669 log.debug("storeGroupDescriptionInternal: Processing Group ADD request for Id {} in device {}",
alshabibb0285992016-03-28 23:30:37 -0700670 id,
671 groupDesc.deviceId());
alshabib10580802015-02-18 18:30:33 -0800672 notifyDelegate(new GroupEvent(GroupEvent.Type.GROUP_ADD_REQUESTED,
673 group));
674 }
675
676 /**
677 * Updates the existing group entry with the information
678 * from group description.
679 *
alshabibb0285992016-03-28 23:30:37 -0700680 * @param deviceId the device ID
alshabib10580802015-02-18 18:30:33 -0800681 * @param oldAppCookie the current group key
alshabibb0285992016-03-28 23:30:37 -0700682 * @param type update type
683 * @param newBuckets group buckets for updates
alshabib10580802015-02-18 18:30:33 -0800684 * @param newAppCookie optional new group key
685 */
686 @Override
687 public void updateGroupDescription(DeviceId deviceId,
688 GroupKey oldAppCookie,
689 UpdateType type,
690 GroupBuckets newBuckets,
691 GroupKey newAppCookie) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700692 // Check if group update to be done by a remote instance
sangho52abe3a2015-05-05 14:13:34 -0700693 if (mastershipService.getMasterFor(deviceId) != null &&
694 mastershipService.getLocalRole(deviceId) != MastershipRole.MASTER) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700695 log.debug("updateGroupDescription: Device {} local role is not MASTER",
696 deviceId);
697 if (mastershipService.getMasterFor(deviceId) == null) {
698 log.error("No Master for device {}..."
alshabibb0285992016-03-28 23:30:37 -0700699 + "Can not perform update group operation",
700 deviceId);
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700701 //TODO: Send Group operation failure event
702 return;
703 }
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700704 GroupStoreMessage groupOp = GroupStoreMessage.
705 createGroupUpdateRequestMsg(deviceId,
706 oldAppCookie,
707 type,
708 newBuckets,
709 newAppCookie);
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700710
Madan Jampani175e8fd2015-05-20 14:10:45 -0700711 clusterCommunicator.unicast(groupOp,
alshabibb0285992016-03-28 23:30:37 -0700712 GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST,
713 clusterMsgSerializer::serialize,
714 mastershipService.getMasterFor(deviceId)).whenComplete((result, error) -> {
715 if (error != null) {
716 log.warn("Failed to send request to master: {} to {}",
717 groupOp,
718 mastershipService.getMasterFor(deviceId), error);
719 }
720 //TODO: Send Group operation failure event
721 });
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700722 return;
723 }
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700724 log.debug("updateGroupDescription for device {} is getting handled locally",
725 deviceId);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700726 updateGroupDescriptionInternal(deviceId,
727 oldAppCookie,
728 type,
729 newBuckets,
730 newAppCookie);
731 }
732
733 private void updateGroupDescriptionInternal(DeviceId deviceId,
alshabibb0285992016-03-28 23:30:37 -0700734 GroupKey oldAppCookie,
735 UpdateType type,
736 GroupBuckets newBuckets,
737 GroupKey newAppCookie) {
alshabib10580802015-02-18 18:30:33 -0800738 // Check if a group is existing with the provided key
739 Group oldGroup = getGroup(deviceId, oldAppCookie);
740 if (oldGroup == null) {
Saurav Das8be4e3a2016-03-11 17:19:07 -0800741 log.warn("updateGroupDescriptionInternal: Group not found...strange. "
alshabibb0285992016-03-28 23:30:37 -0700742 + "GroupKey:{} DeviceId:{}", oldAppCookie, deviceId);
alshabib10580802015-02-18 18:30:33 -0800743 return;
744 }
745
746 List<GroupBucket> newBucketList = getUpdatedBucketList(oldGroup,
747 type,
748 newBuckets);
749 if (newBucketList != null) {
750 // Create a new group object from the old group
751 GroupBuckets updatedBuckets = new GroupBuckets(newBucketList);
752 GroupKey newCookie = (newAppCookie != null) ? newAppCookie : oldAppCookie;
753 GroupDescription updatedGroupDesc = new DefaultGroupDescription(
754 oldGroup.deviceId(),
755 oldGroup.type(),
756 updatedBuckets,
757 newCookie,
Saurav Das100e3b82015-04-30 11:12:10 -0700758 oldGroup.givenGroupId(),
alshabib10580802015-02-18 18:30:33 -0800759 oldGroup.appId());
760 StoredGroupEntry newGroup = new DefaultGroup(oldGroup.id(),
761 updatedGroupDesc);
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700762 log.debug("updateGroupDescriptionInternal: group entry {} in device {} moving from {} to PENDING_UPDATE",
alshabibb0285992016-03-28 23:30:37 -0700763 oldGroup.id(),
764 oldGroup.deviceId(),
765 oldGroup.state());
alshabib10580802015-02-18 18:30:33 -0800766 newGroup.setState(GroupState.PENDING_UPDATE);
767 newGroup.setLife(oldGroup.life());
768 newGroup.setPackets(oldGroup.packets());
769 newGroup.setBytes(oldGroup.bytes());
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700770 //Update the group entry in groupkey based map.
771 //Update to groupid based map will happen in the
772 //groupkey based map update listener
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700773 log.debug("updateGroupDescriptionInternal with type {}: Group updated with buckets",
774 type);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700775 getGroupStoreKeyMap().
alshabibb0285992016-03-28 23:30:37 -0700776 put(new GroupStoreKeyMapKey(newGroup.deviceId(),
777 newGroup.appCookie()), newGroup);
alshabib10580802015-02-18 18:30:33 -0800778 notifyDelegate(new GroupEvent(Type.GROUP_UPDATE_REQUESTED, newGroup));
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700779 } else {
780 log.warn("updateGroupDescriptionInternal with type {}: No "
alshabibb0285992016-03-28 23:30:37 -0700781 + "change in the buckets in update", type);
alshabib10580802015-02-18 18:30:33 -0800782 }
783 }
784
785 private List<GroupBucket> getUpdatedBucketList(Group oldGroup,
786 UpdateType type,
787 GroupBuckets buckets) {
Victor Silva0282ab82016-11-15 16:30:27 -0300788 if (type == UpdateType.SET) {
789 return buckets.buckets();
790 }
791
Victor Silvadf1eeae2016-08-12 15:28:57 -0300792 List<GroupBucket> oldBuckets = oldGroup.buckets().buckets();
793 List<GroupBucket> updatedBucketList = new ArrayList<>();
alshabib10580802015-02-18 18:30:33 -0800794 boolean groupDescUpdated = false;
795
796 if (type == UpdateType.ADD) {
Victor Silvadf1eeae2016-08-12 15:28:57 -0300797 List<GroupBucket> newBuckets = buckets.buckets();
798
799 // Add old buckets that will not be updated and check if any will be updated.
800 for (GroupBucket oldBucket : oldBuckets) {
801 int newBucketIndex = newBuckets.indexOf(oldBucket);
802
803 if (newBucketIndex != -1) {
804 GroupBucket newBucket = newBuckets.get(newBucketIndex);
805 if (!newBucket.hasSameParameters(oldBucket)) {
806 // Bucket will be updated
807 groupDescUpdated = true;
808 }
809 } else {
810 // Old bucket will remain the same - add it.
811 updatedBucketList.add(oldBucket);
alshabib10580802015-02-18 18:30:33 -0800812 }
813 }
Victor Silvadf1eeae2016-08-12 15:28:57 -0300814
815 // Add all new buckets
816 updatedBucketList.addAll(newBuckets);
817 if (!oldBuckets.containsAll(newBuckets)) {
818 groupDescUpdated = true;
819 }
820
alshabib10580802015-02-18 18:30:33 -0800821 } else if (type == UpdateType.REMOVE) {
Victor Silvadf1eeae2016-08-12 15:28:57 -0300822 List<GroupBucket> bucketsToRemove = buckets.buckets();
823
824 // Check which old buckets should remain
825 for (GroupBucket oldBucket : oldBuckets) {
826 if (!bucketsToRemove.contains(oldBucket)) {
827 updatedBucketList.add(oldBucket);
828 } else {
alshabib10580802015-02-18 18:30:33 -0800829 groupDescUpdated = true;
830 }
831 }
832 }
833
834 if (groupDescUpdated) {
Victor Silvadf1eeae2016-08-12 15:28:57 -0300835 return updatedBucketList;
alshabib10580802015-02-18 18:30:33 -0800836 } else {
837 return null;
838 }
839 }
840
841 /**
842 * Triggers deleting the existing group entry.
843 *
alshabibb0285992016-03-28 23:30:37 -0700844 * @param deviceId the device ID
alshabib10580802015-02-18 18:30:33 -0800845 * @param appCookie the group key
846 */
847 @Override
848 public void deleteGroupDescription(DeviceId deviceId,
849 GroupKey appCookie) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700850 // Check if group to be deleted by a remote instance
851 if (mastershipService.
852 getLocalRole(deviceId) != MastershipRole.MASTER) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700853 log.debug("deleteGroupDescription: Device {} local role is not MASTER",
854 deviceId);
855 if (mastershipService.getMasterFor(deviceId) == null) {
856 log.error("No Master for device {}..."
alshabibb0285992016-03-28 23:30:37 -0700857 + "Can not perform delete group operation",
858 deviceId);
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700859 //TODO: Send Group operation failure event
860 return;
861 }
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700862 GroupStoreMessage groupOp = GroupStoreMessage.
863 createGroupDeleteRequestMsg(deviceId,
864 appCookie);
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700865
Madan Jampani175e8fd2015-05-20 14:10:45 -0700866 clusterCommunicator.unicast(groupOp,
alshabibb0285992016-03-28 23:30:37 -0700867 GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST,
868 clusterMsgSerializer::serialize,
869 mastershipService.getMasterFor(deviceId)).whenComplete((result, error) -> {
870 if (error != null) {
871 log.warn("Failed to send request to master: {} to {}",
872 groupOp,
873 mastershipService.getMasterFor(deviceId), error);
874 }
875 //TODO: Send Group operation failure event
876 });
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700877 return;
878 }
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700879 log.debug("deleteGroupDescription in device {} is getting handled locally",
880 deviceId);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700881 deleteGroupDescriptionInternal(deviceId, appCookie);
882 }
883
884 private void deleteGroupDescriptionInternal(DeviceId deviceId,
885 GroupKey appCookie) {
alshabib10580802015-02-18 18:30:33 -0800886 // Check if a group is existing with the provided key
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700887 StoredGroupEntry existing = getStoredGroupEntry(deviceId, appCookie);
alshabib10580802015-02-18 18:30:33 -0800888 if (existing == null) {
889 return;
890 }
891
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700892 log.debug("deleteGroupDescriptionInternal: group entry {} in device {} moving from {} to PENDING_DELETE",
alshabibb0285992016-03-28 23:30:37 -0700893 existing.id(),
894 existing.deviceId(),
895 existing.state());
alshabib10580802015-02-18 18:30:33 -0800896 synchronized (existing) {
897 existing.setState(GroupState.PENDING_DELETE);
Saurav Das80980c72016-03-23 11:22:49 -0700898 getGroupStoreKeyMap().
alshabibb0285992016-03-28 23:30:37 -0700899 put(new GroupStoreKeyMapKey(existing.deviceId(), existing.appCookie()),
900 existing);
alshabib10580802015-02-18 18:30:33 -0800901 }
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700902 log.debug("deleteGroupDescriptionInternal: in device {} issuing GROUP_REMOVE_REQUESTED",
903 deviceId);
alshabib10580802015-02-18 18:30:33 -0800904 notifyDelegate(new GroupEvent(Type.GROUP_REMOVE_REQUESTED, existing));
905 }
906
907 /**
908 * Stores a new group entry, or updates an existing entry.
909 *
910 * @param group group entry
911 */
912 @Override
913 public void addOrUpdateGroupEntry(Group group) {
914 // check if this new entry is an update to an existing entry
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700915 StoredGroupEntry existing = getStoredGroupEntry(group.deviceId(),
916 group.id());
alshabib10580802015-02-18 18:30:33 -0800917 GroupEvent event = null;
918
919 if (existing != null) {
Saurav Das0fd79d92016-03-07 10:58:36 -0800920 log.trace("addOrUpdateGroupEntry: updating group entry {} in device {}",
alshabibb0285992016-03-28 23:30:37 -0700921 group.id(),
922 group.deviceId());
alshabib10580802015-02-18 18:30:33 -0800923 synchronized (existing) {
alshabibb0285992016-03-28 23:30:37 -0700924 for (GroupBucket bucket : group.buckets().buckets()) {
Sho SHIMIZU30d639b2015-05-05 09:30:35 -0700925 Optional<GroupBucket> matchingBucket =
Srikanth Vavilapalli10e75cd2015-04-13 16:21:24 -0700926 existing.buckets().buckets()
alshabibb0285992016-03-28 23:30:37 -0700927 .stream()
928 .filter((existingBucket) -> (existingBucket.equals(bucket)))
929 .findFirst();
Srikanth Vavilapalli10e75cd2015-04-13 16:21:24 -0700930 if (matchingBucket.isPresent()) {
931 ((StoredGroupBucketEntry) matchingBucket.
932 get()).setPackets(bucket.packets());
933 ((StoredGroupBucketEntry) matchingBucket.
934 get()).setBytes(bucket.bytes());
935 } else {
936 log.warn("addOrUpdateGroupEntry: No matching "
alshabibb0285992016-03-28 23:30:37 -0700937 + "buckets to update stats");
Srikanth Vavilapalli10e75cd2015-04-13 16:21:24 -0700938 }
939 }
alshabib10580802015-02-18 18:30:33 -0800940 existing.setLife(group.life());
941 existing.setPackets(group.packets());
942 existing.setBytes(group.bytes());
alshabibb0285992016-03-28 23:30:37 -0700943 existing.setReferenceCount(group.referenceCount());
Srikanth Vavilapalli5428b6c2015-05-14 20:22:47 -0700944 if ((existing.state() == GroupState.PENDING_ADD) ||
alshabibb0285992016-03-28 23:30:37 -0700945 (existing.state() == GroupState.PENDING_ADD_RETRY)) {
Saurav Das0fd79d92016-03-07 10:58:36 -0800946 log.trace("addOrUpdateGroupEntry: group entry {} in device {} moving from {} to ADDED",
alshabibb0285992016-03-28 23:30:37 -0700947 existing.id(),
948 existing.deviceId(),
949 existing.state());
alshabib10580802015-02-18 18:30:33 -0800950 existing.setState(GroupState.ADDED);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700951 existing.setIsGroupStateAddedFirstTime(true);
alshabib10580802015-02-18 18:30:33 -0800952 event = new GroupEvent(Type.GROUP_ADDED, existing);
953 } else {
Saurav Das0fd79d92016-03-07 10:58:36 -0800954 log.trace("addOrUpdateGroupEntry: group entry {} in device {} moving from {} to ADDED",
alshabibb0285992016-03-28 23:30:37 -0700955 existing.id(),
956 existing.deviceId(),
957 GroupState.PENDING_UPDATE);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700958 existing.setState(GroupState.ADDED);
959 existing.setIsGroupStateAddedFirstTime(false);
alshabib10580802015-02-18 18:30:33 -0800960 event = new GroupEvent(Type.GROUP_UPDATED, existing);
961 }
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700962 //Re-PUT map entries to trigger map update events
963 getGroupStoreKeyMap().
alshabibb0285992016-03-28 23:30:37 -0700964 put(new GroupStoreKeyMapKey(existing.deviceId(),
965 existing.appCookie()), existing);
alshabib10580802015-02-18 18:30:33 -0800966 }
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700967 } else {
968 log.warn("addOrUpdateGroupEntry: Group update "
alshabibb0285992016-03-28 23:30:37 -0700969 + "happening for a non-existing entry in the map");
alshabib10580802015-02-18 18:30:33 -0800970 }
971
972 if (event != null) {
973 notifyDelegate(event);
974 }
975 }
976
977 /**
978 * Removes the group entry from store.
979 *
980 * @param group group entry
981 */
982 @Override
983 public void removeGroupEntry(Group group) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700984 StoredGroupEntry existing = getStoredGroupEntry(group.deviceId(),
985 group.id());
alshabib10580802015-02-18 18:30:33 -0800986
987 if (existing != null) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700988 log.debug("removeGroupEntry: removing group entry {} in device {}",
alshabibb0285992016-03-28 23:30:37 -0700989 group.id(),
990 group.deviceId());
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700991 //Removal from groupid based map will happen in the
992 //map update listener
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700993 getGroupStoreKeyMap().remove(new GroupStoreKeyMapKey(existing.deviceId(),
994 existing.appCookie()));
alshabib10580802015-02-18 18:30:33 -0800995 notifyDelegate(new GroupEvent(Type.GROUP_REMOVED, existing));
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700996 } else {
997 log.warn("removeGroupEntry for {} in device{} is "
alshabibb0285992016-03-28 23:30:37 -0700998 + "not existing in our maps",
999 group.id(),
1000 group.deviceId());
alshabib10580802015-02-18 18:30:33 -08001001 }
1002 }
1003
Victor Silva4e8b7832016-08-17 17:11:19 -03001004 private void purgeGroupEntries(Set<Entry<GroupStoreKeyMapKey, StoredGroupEntry>> entries) {
1005 entries.forEach(entry -> {
1006 groupStoreEntriesByKey.remove(entry.getKey());
1007 });
1008 }
1009
alshabib10580802015-02-18 18:30:33 -08001010 @Override
Charles Chan0c7c43b2016-01-14 17:39:20 -08001011 public void purgeGroupEntry(DeviceId deviceId) {
Victor Silva4e8b7832016-08-17 17:11:19 -03001012 Set<Entry<GroupStoreKeyMapKey, StoredGroupEntry>> entriesPendingRemove =
Charles Chan0c7c43b2016-01-14 17:39:20 -08001013 new HashSet<>();
1014
Madan Jampani0b847532016-03-03 13:44:15 -08001015 getGroupStoreKeyMap().entrySet().stream()
Charles Chan0c7c43b2016-01-14 17:39:20 -08001016 .filter(entry -> entry.getKey().deviceId().equals(deviceId))
Victor Silva4e8b7832016-08-17 17:11:19 -03001017 .forEach(entriesPendingRemove::add);
Charles Chan0c7c43b2016-01-14 17:39:20 -08001018
Victor Silva4e8b7832016-08-17 17:11:19 -03001019 purgeGroupEntries(entriesPendingRemove);
1020 }
1021
1022 @Override
1023 public void purgeGroupEntries() {
1024 purgeGroupEntries(getGroupStoreKeyMap().entrySet());
Charles Chan0c7c43b2016-01-14 17:39:20 -08001025 }
1026
1027 @Override
alshabib10580802015-02-18 18:30:33 -08001028 public void deviceInitialAuditCompleted(DeviceId deviceId,
1029 boolean completed) {
1030 synchronized (deviceAuditStatus) {
1031 if (completed) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001032 log.debug("AUDIT completed for device {}",
1033 deviceId);
alshabib10580802015-02-18 18:30:33 -08001034 deviceAuditStatus.put(deviceId, true);
1035 // Execute all pending group requests
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001036 List<StoredGroupEntry> pendingGroupRequests =
1037 getPendingGroupKeyTable().values()
alshabibb0285992016-03-28 23:30:37 -07001038 .stream()
1039 .filter(g -> g.deviceId().equals(deviceId))
1040 .collect(Collectors.toList());
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001041 log.debug("processing pending group add requests for device {} and number of pending requests {}",
alshabibb0285992016-03-28 23:30:37 -07001042 deviceId,
1043 pendingGroupRequests.size());
1044 for (Group group : pendingGroupRequests) {
alshabib10580802015-02-18 18:30:33 -08001045 GroupDescription tmp = new DefaultGroupDescription(
1046 group.deviceId(),
1047 group.type(),
1048 group.buckets(),
1049 group.appCookie(),
Saurav Das100e3b82015-04-30 11:12:10 -07001050 group.givenGroupId(),
alshabib10580802015-02-18 18:30:33 -08001051 group.appId());
1052 storeGroupDescriptionInternal(tmp);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001053 getPendingGroupKeyTable().
alshabibb0285992016-03-28 23:30:37 -07001054 remove(new GroupStoreKeyMapKey(deviceId, group.appCookie()));
alshabib10580802015-02-18 18:30:33 -08001055 }
alshabib10580802015-02-18 18:30:33 -08001056 } else {
Thomas Vachuskac40d4632015-04-09 16:55:03 -07001057 Boolean audited = deviceAuditStatus.get(deviceId);
1058 if (audited != null && audited) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001059 log.debug("Clearing AUDIT status for device {}", deviceId);
alshabib10580802015-02-18 18:30:33 -08001060 deviceAuditStatus.put(deviceId, false);
1061 }
1062 }
1063 }
1064 }
1065
1066 @Override
1067 public boolean deviceInitialAuditStatus(DeviceId deviceId) {
1068 synchronized (deviceAuditStatus) {
Thomas Vachuskac40d4632015-04-09 16:55:03 -07001069 Boolean audited = deviceAuditStatus.get(deviceId);
1070 return audited != null && audited;
alshabib10580802015-02-18 18:30:33 -08001071 }
1072 }
1073
1074 @Override
1075 public void groupOperationFailed(DeviceId deviceId, GroupOperation operation) {
1076
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001077 StoredGroupEntry existing = getStoredGroupEntry(deviceId,
1078 operation.groupId());
alshabib10580802015-02-18 18:30:33 -08001079
1080 if (existing == null) {
1081 log.warn("No group entry with ID {} found ", operation.groupId());
1082 return;
1083 }
1084
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001085 log.warn("groupOperationFailed: group operation {} failed"
alshabibb0285992016-03-28 23:30:37 -07001086 + "for group {} in device {} with code {}",
1087 operation.opType(),
1088 existing.id(),
1089 existing.deviceId(),
1090 operation.failureCode());
Saurav Das0fd79d92016-03-07 10:58:36 -08001091 if (operation.failureCode() == GroupOperation.GroupMsgErrorCode.GROUP_EXISTS) {
Saurav Das8be4e3a2016-03-11 17:19:07 -08001092 if (operation.buckets().equals(existing.buckets())) {
Saurav Dasc88d4662017-05-15 15:34:25 -07001093 if (existing.state() == GroupState.PENDING_ADD ||
1094 existing.state() == GroupState.PENDING_ADD_RETRY) {
Saurav Das8be4e3a2016-03-11 17:19:07 -08001095 log.info("GROUP_EXISTS: GroupID and Buckets match for group in pending "
alshabibb0285992016-03-28 23:30:37 -07001096 + "add state - moving to ADDED for group {} in device {}",
1097 existing.id(), deviceId);
Saurav Das8be4e3a2016-03-11 17:19:07 -08001098 addOrUpdateGroupEntry(existing);
1099 return;
1100 } else {
Saurav Dasc88d4662017-05-15 15:34:25 -07001101 log.warn("GROUP_EXISTS: GroupId and Buckets match but existing"
1102 + "group in state: {}", existing.state());
Saurav Das8be4e3a2016-03-11 17:19:07 -08001103 }
Saurav Dasc88d4662017-05-15 15:34:25 -07001104 } else {
1105 log.warn("GROUP EXISTS: Group ID matched but buckets did not. "
1106 + "Operation: {} Existing: {}", operation.buckets(),
1107 existing.buckets());
Saurav Das8be4e3a2016-03-11 17:19:07 -08001108 }
Saurav Das0fd79d92016-03-07 10:58:36 -08001109 }
alshabib10580802015-02-18 18:30:33 -08001110 switch (operation.opType()) {
1111 case ADD:
Srikanth Vavilapalli5428b6c2015-05-14 20:22:47 -07001112 if (existing.state() == GroupState.PENDING_ADD) {
Srikanth Vavilapalli5428b6c2015-05-14 20:22:47 -07001113 notifyDelegate(new GroupEvent(Type.GROUP_ADD_FAILED, existing));
1114 log.warn("groupOperationFailed: cleaningup "
alshabibb0285992016-03-28 23:30:37 -07001115 + "group {} from store in device {}....",
1116 existing.id(),
1117 existing.deviceId());
Srikanth Vavilapalli5428b6c2015-05-14 20:22:47 -07001118 //Removal from groupid based map will happen in the
1119 //map update listener
1120 getGroupStoreKeyMap().remove(new GroupStoreKeyMapKey(existing.deviceId(),
1121 existing.appCookie()));
1122 }
alshabib10580802015-02-18 18:30:33 -08001123 break;
1124 case MODIFY:
1125 notifyDelegate(new GroupEvent(Type.GROUP_UPDATE_FAILED, existing));
1126 break;
1127 case DELETE:
1128 notifyDelegate(new GroupEvent(Type.GROUP_REMOVE_FAILED, existing));
1129 break;
1130 default:
1131 log.warn("Unknown group operation type {}", operation.opType());
1132 }
alshabib10580802015-02-18 18:30:33 -08001133 }
1134
1135 @Override
1136 public void addOrUpdateExtraneousGroupEntry(Group group) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001137 log.debug("add/update extraneous group entry {} in device {}",
alshabibb0285992016-03-28 23:30:37 -07001138 group.id(),
1139 group.deviceId());
alshabib10580802015-02-18 18:30:33 -08001140 ConcurrentMap<GroupId, Group> extraneousIdTable =
1141 getExtraneousGroupIdTable(group.deviceId());
1142 extraneousIdTable.put(group.id(), group);
Srikanth Vavilapallie48b3cf2015-07-06 11:43:07 -07001143 // Don't remove the extraneous groups, instead re-use it when
1144 // a group request comes with the same set of buckets
alshabib10580802015-02-18 18:30:33 -08001145 }
1146
1147 @Override
1148 public void removeExtraneousGroupEntry(Group group) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001149 log.debug("remove extraneous group entry {} of device {} from store",
alshabibb0285992016-03-28 23:30:37 -07001150 group.id(),
1151 group.deviceId());
alshabib10580802015-02-18 18:30:33 -08001152 ConcurrentMap<GroupId, Group> extraneousIdTable =
1153 getExtraneousGroupIdTable(group.deviceId());
1154 extraneousIdTable.remove(group.id());
1155 }
1156
1157 @Override
1158 public Iterable<Group> getExtraneousGroups(DeviceId deviceId) {
1159 // flatten and make iterator unmodifiable
1160 return FluentIterable.from(
1161 getExtraneousGroupIdTable(deviceId).values());
1162 }
1163
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001164 /**
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -07001165 * Map handler to receive any events when the group key map is updated.
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001166 */
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -07001167 private class GroupStoreKeyMapListener implements
Madan Jampani0b847532016-03-03 13:44:15 -08001168 MapEventListener<GroupStoreKeyMapKey, StoredGroupEntry> {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001169
1170 @Override
Madan Jampani0b847532016-03-03 13:44:15 -08001171 public void event(MapEvent<GroupStoreKeyMapKey, StoredGroupEntry> mapEvent) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001172 GroupEvent groupEvent = null;
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001173 GroupStoreKeyMapKey key = mapEvent.key();
Madan Jampani0b847532016-03-03 13:44:15 -08001174 StoredGroupEntry group = Versioned.valueOrNull(mapEvent.newValue());
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001175 if ((key == null) && (group == null)) {
1176 log.error("GroupStoreKeyMapListener: Received "
alshabibb0285992016-03-28 23:30:37 -07001177 + "event {} with null entry", mapEvent.type());
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001178 return;
1179 } else if (group == null) {
1180 group = getGroupIdTable(key.deviceId()).values()
1181 .stream()
1182 .filter((storedGroup) -> (storedGroup.appCookie().equals(key.appCookie)))
Yuta HIGUCHI6e5f4702016-11-21 11:42:11 -08001183 .findFirst().orElse(null);
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001184 if (group == null) {
1185 log.error("GroupStoreKeyMapListener: Received "
alshabibb0285992016-03-28 23:30:37 -07001186 + "event {} with null entry... can not process", mapEvent.type());
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001187 return;
1188 }
1189 }
1190 log.trace("received groupid map event {} for id {} in device {}",
1191 mapEvent.type(),
1192 group.id(),
jaegonkim68e080c2016-12-01 22:31:01 +09001193 (key != null ? key.deviceId() : null));
Madan Jampani0b847532016-03-03 13:44:15 -08001194 if (mapEvent.type() == MapEvent.Type.INSERT || mapEvent.type() == MapEvent.Type.UPDATE) {
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -07001195 // Update the group ID table
1196 getGroupIdTable(group.deviceId()).put(group.id(), group);
Madan Jampani0b847532016-03-03 13:44:15 -08001197 StoredGroupEntry value = Versioned.valueOrNull(mapEvent.newValue());
1198 if (value.state() == Group.GroupState.ADDED) {
1199 if (value.isGroupStateAddedFirstTime()) {
1200 groupEvent = new GroupEvent(Type.GROUP_ADDED, value);
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001201 log.trace("Received first time GROUP_ADDED state update for id {} in device {}",
alshabibb0285992016-03-28 23:30:37 -07001202 group.id(),
1203 group.deviceId());
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001204 } else {
Madan Jampani0b847532016-03-03 13:44:15 -08001205 groupEvent = new GroupEvent(Type.GROUP_UPDATED, value);
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001206 log.trace("Received following GROUP_ADDED state update for id {} in device {}",
alshabibb0285992016-03-28 23:30:37 -07001207 group.id(),
1208 group.deviceId());
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001209 }
1210 }
Madan Jampani0b847532016-03-03 13:44:15 -08001211 } else if (mapEvent.type() == MapEvent.Type.REMOVE) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001212 groupEvent = new GroupEvent(Type.GROUP_REMOVED, group);
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -07001213 // Remove the entry from the group ID table
1214 getGroupIdTable(group.deviceId()).remove(group.id(), group);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001215 }
1216
1217 if (groupEvent != null) {
1218 notifyDelegate(groupEvent);
1219 }
1220 }
1221 }
Madan Jampani01e05fb2015-08-13 13:29:36 -07001222
helenyrwua1c41152016-08-18 16:16:14 -07001223 private void processGroupMessage(GroupStoreMessage message) {
1224 if (message.type() == GroupStoreMessage.Type.FAILOVER) {
1225 // FIXME: groupStoreEntriesByKey inaccessible here
1226 getGroupIdTable(message.deviceId()).values()
1227 .stream()
1228 .filter((storedGroup) -> (storedGroup.appCookie().equals(message.appCookie())))
1229 .findFirst().ifPresent(group -> notifyDelegate(new GroupEvent(Type.GROUP_BUCKET_FAILOVER, group)));
1230 }
1231 }
1232
Madan Jampani01e05fb2015-08-13 13:29:36 -07001233 private void process(GroupStoreMessage groupOp) {
1234 log.debug("Received remote group operation {} request for device {}",
alshabibb0285992016-03-28 23:30:37 -07001235 groupOp.type(),
1236 groupOp.deviceId());
1237 if (!mastershipService.isLocalMaster(groupOp.deviceId())) {
1238 log.warn("This node is not MASTER for device {}", groupOp.deviceId());
1239 return;
1240 }
1241 if (groupOp.type() == GroupStoreMessage.Type.ADD) {
1242 storeGroupDescriptionInternal(groupOp.groupDesc());
1243 } else if (groupOp.type() == GroupStoreMessage.Type.UPDATE) {
1244 updateGroupDescriptionInternal(groupOp.deviceId(),
1245 groupOp.appCookie(),
1246 groupOp.updateType(),
1247 groupOp.updateBuckets(),
1248 groupOp.newAppCookie());
1249 } else if (groupOp.type() == GroupStoreMessage.Type.DELETE) {
1250 deleteGroupDescriptionInternal(groupOp.deviceId(),
1251 groupOp.appCookie());
1252 }
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001253 }
1254
1255 /**
1256 * Flattened map key to be used to store group entries.
1257 */
Ray Milkeyb3c5ce22015-08-10 09:07:36 -07001258 protected static class GroupStoreMapKey {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001259 private final DeviceId deviceId;
1260
1261 public GroupStoreMapKey(DeviceId deviceId) {
1262 this.deviceId = deviceId;
1263 }
1264
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001265 public DeviceId deviceId() {
1266 return deviceId;
1267 }
1268
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001269 @Override
1270 public boolean equals(Object o) {
1271 if (this == o) {
1272 return true;
1273 }
1274 if (!(o instanceof GroupStoreMapKey)) {
1275 return false;
1276 }
1277 GroupStoreMapKey that = (GroupStoreMapKey) o;
1278 return this.deviceId.equals(that.deviceId);
1279 }
1280
1281 @Override
1282 public int hashCode() {
1283 int result = 17;
1284
1285 result = 31 * result + Objects.hash(this.deviceId);
1286
1287 return result;
1288 }
1289 }
1290
Ray Milkeyb3c5ce22015-08-10 09:07:36 -07001291 protected static class GroupStoreKeyMapKey extends GroupStoreMapKey {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001292 private final GroupKey appCookie;
alshabibb0285992016-03-28 23:30:37 -07001293
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001294 public GroupStoreKeyMapKey(DeviceId deviceId,
1295 GroupKey appCookie) {
1296 super(deviceId);
1297 this.appCookie = appCookie;
1298 }
1299
1300 @Override
1301 public boolean equals(Object o) {
1302 if (this == o) {
1303 return true;
1304 }
1305 if (!(o instanceof GroupStoreKeyMapKey)) {
1306 return false;
1307 }
1308 GroupStoreKeyMapKey that = (GroupStoreKeyMapKey) o;
1309 return (super.equals(that) &&
1310 this.appCookie.equals(that.appCookie));
1311 }
1312
1313 @Override
1314 public int hashCode() {
1315 int result = 17;
1316
1317 result = 31 * result + super.hashCode() + Objects.hash(this.appCookie);
1318
1319 return result;
1320 }
1321 }
1322
Ray Milkeyb3c5ce22015-08-10 09:07:36 -07001323 protected static class GroupStoreIdMapKey extends GroupStoreMapKey {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001324 private final GroupId groupId;
alshabibb0285992016-03-28 23:30:37 -07001325
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001326 public GroupStoreIdMapKey(DeviceId deviceId,
1327 GroupId groupId) {
1328 super(deviceId);
1329 this.groupId = groupId;
1330 }
1331
1332 @Override
1333 public boolean equals(Object o) {
1334 if (this == o) {
1335 return true;
1336 }
1337 if (!(o instanceof GroupStoreIdMapKey)) {
1338 return false;
1339 }
1340 GroupStoreIdMapKey that = (GroupStoreIdMapKey) o;
1341 return (super.equals(that) &&
1342 this.groupId.equals(that.groupId));
1343 }
1344
1345 @Override
1346 public int hashCode() {
1347 int result = 17;
1348
1349 result = 31 * result + super.hashCode() + Objects.hash(this.groupId);
1350
1351 return result;
1352 }
1353 }
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001354
1355 @Override
1356 public void pushGroupMetrics(DeviceId deviceId,
1357 Collection<Group> groupEntries) {
1358 boolean deviceInitialAuditStatus =
1359 deviceInitialAuditStatus(deviceId);
1360 Set<Group> southboundGroupEntries =
1361 Sets.newHashSet(groupEntries);
1362 Set<StoredGroupEntry> storedGroupEntries =
1363 Sets.newHashSet(getStoredGroups(deviceId));
1364 Set<Group> extraneousStoredEntries =
1365 Sets.newHashSet(getExtraneousGroups(deviceId));
1366
Sho SHIMIZU695bac62016-08-15 12:41:59 -07001367 if (log.isTraceEnabled()) {
1368 log.trace("pushGroupMetrics: Displaying all ({}) southboundGroupEntries for device {}",
1369 southboundGroupEntries.size(),
1370 deviceId);
1371 for (Group group : southboundGroupEntries) {
1372 log.trace("Group {} in device {}", group, deviceId);
1373 }
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001374
Sho SHIMIZU695bac62016-08-15 12:41:59 -07001375 log.trace("Displaying all ({}) stored group entries for device {}",
1376 storedGroupEntries.size(),
1377 deviceId);
1378 for (StoredGroupEntry group : storedGroupEntries) {
1379 log.trace("Stored Group {} for device {}", group, deviceId);
1380 }
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001381 }
1382
alshabibb0285992016-03-28 23:30:37 -07001383 garbageCollect(deviceId, southboundGroupEntries, storedGroupEntries);
1384
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001385 for (Iterator<Group> it2 = southboundGroupEntries.iterator(); it2.hasNext();) {
1386 Group group = it2.next();
1387 if (storedGroupEntries.remove(group)) {
1388 // we both have the group, let's update some info then.
1389 log.trace("Group AUDIT: group {} exists in both planes for device {}",
alshabibb0285992016-03-28 23:30:37 -07001390 group.id(), deviceId);
1391
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001392 groupAdded(group);
1393 it2.remove();
1394 }
1395 }
1396 for (Group group : southboundGroupEntries) {
1397 if (getGroup(group.deviceId(), group.id()) != null) {
1398 // There is a group existing with the same id
1399 // It is possible that group update is
1400 // in progress while we got a stale info from switch
1401 if (!storedGroupEntries.remove(getGroup(
alshabibb0285992016-03-28 23:30:37 -07001402 group.deviceId(), group.id()))) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001403 log.warn("Group AUDIT: Inconsistent state:"
alshabibb0285992016-03-28 23:30:37 -07001404 + "Group exists in ID based table while "
1405 + "not present in key based table");
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001406 }
1407 } else {
1408 // there are groups in the switch that aren't in the store
1409 log.debug("Group AUDIT: extraneous group {} exists in data plane for device {}",
alshabibb0285992016-03-28 23:30:37 -07001410 group.id(), deviceId);
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001411 extraneousStoredEntries.remove(group);
Kavitha Alagesanc56cded2017-01-13 10:48:18 +05301412 if (allowExtraneousGroups) {
1413 extraneousGroup(group);
1414 } else {
1415 notifyDelegate(new GroupEvent(Type.GROUP_REMOVE_REQUESTED, group));
1416 }
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001417 }
1418 }
Charles Chan07f15f22018-05-08 21:35:50 -07001419 for (StoredGroupEntry group : storedGroupEntries) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001420 // there are groups in the store that aren't in the switch
1421 log.debug("Group AUDIT: group {} missing in data plane for device {}",
alshabibb0285992016-03-28 23:30:37 -07001422 group.id(), deviceId);
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001423 groupMissing(group);
1424 }
1425 for (Group group : extraneousStoredEntries) {
1426 // there are groups in the extraneous store that
1427 // aren't in the switch
Saurav Das0fd79d92016-03-07 10:58:36 -08001428 log.debug("Group AUDIT: clearing extraneous group {} from store for device {}",
alshabibb0285992016-03-28 23:30:37 -07001429 group.id(), deviceId);
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001430 removeExtraneousGroupEntry(group);
1431 }
1432
1433 if (!deviceInitialAuditStatus) {
Saurav Das0fd79d92016-03-07 10:58:36 -08001434 log.info("Group AUDIT: Setting device {} initial AUDIT completed",
alshabibb0285992016-03-28 23:30:37 -07001435 deviceId);
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001436 deviceInitialAuditCompleted(deviceId, true);
1437 }
1438 }
1439
helenyrwu89470f12016-08-12 13:18:10 -07001440 @Override
1441 public void notifyOfFailovers(Collection<Group> failoverGroups) {
helenyrwu89470f12016-08-12 13:18:10 -07001442 failoverGroups.forEach(group -> {
1443 if (group.type() == Group.Type.FAILOVER) {
helenyrwua1c41152016-08-18 16:16:14 -07001444 groupTopic.publish(GroupStoreMessage.createGroupFailoverMsg(
1445 group.deviceId(), group));
helenyrwu89470f12016-08-12 13:18:10 -07001446 }
1447 });
helenyrwu89470f12016-08-12 13:18:10 -07001448 }
1449
alshabibb0285992016-03-28 23:30:37 -07001450 private void garbageCollect(DeviceId deviceId,
1451 Set<Group> southboundGroupEntries,
1452 Set<StoredGroupEntry> storedGroupEntries) {
1453 if (!garbageCollect) {
1454 return;
1455 }
1456
1457 Iterator<StoredGroupEntry> it = storedGroupEntries.iterator();
1458 while (it.hasNext()) {
1459 StoredGroupEntry group = it.next();
1460 if (group.state() != GroupState.PENDING_DELETE && checkGroupRefCount(group)) {
1461 log.debug("Garbage collecting group {} on {}", group, deviceId);
1462 deleteGroupDescription(deviceId, group.appCookie());
1463 southboundGroupEntries.remove(group);
1464 it.remove();
1465 }
1466 }
1467 }
1468
1469 private boolean checkGroupRefCount(Group group) {
1470 return (group.referenceCount() == 0 && group.age() >= gcThresh);
1471 }
1472
Charles Chan07f15f22018-05-08 21:35:50 -07001473 private void groupMissing(StoredGroupEntry group) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001474 switch (group.state()) {
1475 case PENDING_DELETE:
1476 log.debug("Group {} delete confirmation from device {}",
1477 group, group.deviceId());
1478 removeGroupEntry(group);
1479 break;
1480 case ADDED:
1481 case PENDING_ADD:
Srikanth Vavilapalli5428b6c2015-05-14 20:22:47 -07001482 case PENDING_ADD_RETRY:
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001483 case PENDING_UPDATE:
Srikanth Vavilapalli5428b6c2015-05-14 20:22:47 -07001484 log.debug("groupMissing: group entry {} in device {} moving from {} to PENDING_ADD_RETRY",
Charles Chan07f15f22018-05-08 21:35:50 -07001485 group.id(),
1486 group.deviceId(),
1487 group.state());
1488 group.setState(Group.GroupState.PENDING_ADD_RETRY);
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001489 //Re-PUT map entries to trigger map update events
Charles Chan07f15f22018-05-08 21:35:50 -07001490 getGroupStoreKeyMap().put(new GroupStoreKeyMapKey(group.deviceId(), group.appCookie()), group);
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001491 notifyDelegate(new GroupEvent(GroupEvent.Type.GROUP_ADD_REQUESTED,
1492 group));
1493 break;
1494 default:
1495 log.debug("Group {} has not been installed.", group);
1496 break;
1497 }
1498 }
1499
1500 private void extraneousGroup(Group group) {
Saurav Das0fd79d92016-03-07 10:58:36 -08001501 log.trace("Group {} is on device {} but not in store.",
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001502 group, group.deviceId());
1503 addOrUpdateExtraneousGroupEntry(group);
1504 }
1505
1506 private void groupAdded(Group group) {
1507 log.trace("Group {} Added or Updated in device {}",
1508 group, group.deviceId());
1509 addOrUpdateGroupEntry(group);
1510 }
alshabib10580802015-02-18 18:30:33 -08001511}