blob: 97333ebfb501305ef5e5ae9dc13f80f3a0c9f681 [file] [log] [blame]
alshabib10580802015-02-18 18:30:33 -08001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.group.impl;
17
Jonathan Hart6ec029a2015-03-24 17:12:35 -070018import com.google.common.collect.FluentIterable;
19import com.google.common.collect.Iterables;
Srikanth Vavilapalli23181912015-05-04 09:48:09 -070020import com.google.common.collect.Sets;
Srikanth Vavilapalli10e75cd2015-04-13 16:21:24 -070021
alshabib10580802015-02-18 18:30:33 -080022import org.apache.felix.scr.annotations.Activate;
23import org.apache.felix.scr.annotations.Component;
24import org.apache.felix.scr.annotations.Deactivate;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070025import org.apache.felix.scr.annotations.Reference;
26import org.apache.felix.scr.annotations.ReferenceCardinality;
alshabib10580802015-02-18 18:30:33 -080027import org.apache.felix.scr.annotations.Service;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070028import org.onlab.util.KryoNamespace;
alshabib10580802015-02-18 18:30:33 -080029import org.onlab.util.NewConcurrentHashMap;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070030import org.onosproject.cluster.ClusterService;
31import org.onosproject.core.DefaultApplicationId;
alshabib10580802015-02-18 18:30:33 -080032import org.onosproject.core.DefaultGroupId;
33import org.onosproject.core.GroupId;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070034import org.onosproject.mastership.MastershipService;
alshabib10580802015-02-18 18:30:33 -080035import org.onosproject.net.DeviceId;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070036import org.onosproject.net.MastershipRole;
37import org.onosproject.net.PortNumber;
38import org.onosproject.net.flow.DefaultTrafficTreatment;
39import org.onosproject.net.flow.FlowRule;
40import org.onosproject.net.flow.instructions.Instructions;
41import org.onosproject.net.flow.instructions.L0ModificationInstruction;
42import org.onosproject.net.flow.instructions.L2ModificationInstruction;
43import org.onosproject.net.flow.instructions.L3ModificationInstruction;
alshabib10580802015-02-18 18:30:33 -080044import org.onosproject.net.group.DefaultGroup;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070045import org.onosproject.net.group.DefaultGroupBucket;
alshabib10580802015-02-18 18:30:33 -080046import org.onosproject.net.group.DefaultGroupDescription;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070047import org.onosproject.net.group.DefaultGroupKey;
alshabib10580802015-02-18 18:30:33 -080048import org.onosproject.net.group.Group;
49import org.onosproject.net.group.Group.GroupState;
50import org.onosproject.net.group.GroupBucket;
51import org.onosproject.net.group.GroupBuckets;
52import org.onosproject.net.group.GroupDescription;
53import org.onosproject.net.group.GroupEvent;
54import org.onosproject.net.group.GroupEvent.Type;
55import org.onosproject.net.group.GroupKey;
56import org.onosproject.net.group.GroupOperation;
57import org.onosproject.net.group.GroupStore;
58import org.onosproject.net.group.GroupStoreDelegate;
Srikanth Vavilapalli10e75cd2015-04-13 16:21:24 -070059import org.onosproject.net.group.StoredGroupBucketEntry;
alshabib10580802015-02-18 18:30:33 -080060import org.onosproject.net.group.StoredGroupEntry;
61import org.onosproject.store.AbstractStore;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070062import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
Jonathan Hart63939a32015-05-08 11:57:03 -070063import org.onosproject.store.service.MultiValuedTimestamp;
Srikanth Vavilapalli23181912015-05-04 09:48:09 -070064import org.onosproject.store.serializers.DeviceIdSerializer;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070065import org.onosproject.store.serializers.KryoNamespaces;
Srikanth Vavilapalli23181912015-05-04 09:48:09 -070066import org.onosproject.store.serializers.URISerializer;
Jonathan Hart6ec029a2015-03-24 17:12:35 -070067import org.onosproject.store.service.EventuallyConsistentMap;
68import org.onosproject.store.service.EventuallyConsistentMapBuilder;
69import org.onosproject.store.service.EventuallyConsistentMapEvent;
70import org.onosproject.store.service.EventuallyConsistentMapListener;
71import org.onosproject.store.service.StorageService;
alshabib10580802015-02-18 18:30:33 -080072import org.slf4j.Logger;
73
Jonathan Hart6ec029a2015-03-24 17:12:35 -070074import java.net.URI;
75import java.util.ArrayList;
Srikanth Vavilapalli23181912015-05-04 09:48:09 -070076import java.util.Collection;
Jonathan Hart6ec029a2015-03-24 17:12:35 -070077import java.util.HashMap;
Srikanth Vavilapalli23181912015-05-04 09:48:09 -070078import java.util.Iterator;
Jonathan Hart6ec029a2015-03-24 17:12:35 -070079import java.util.List;
80import java.util.Objects;
Sho SHIMIZU30d639b2015-05-05 09:30:35 -070081import java.util.Optional;
Srikanth Vavilapalli23181912015-05-04 09:48:09 -070082import java.util.Set;
Jonathan Hart6ec029a2015-03-24 17:12:35 -070083import java.util.concurrent.ConcurrentHashMap;
84import java.util.concurrent.ConcurrentMap;
85import java.util.concurrent.ExecutorService;
86import java.util.concurrent.Executors;
87import java.util.concurrent.atomic.AtomicInteger;
88import java.util.concurrent.atomic.AtomicLong;
89import java.util.stream.Collectors;
90
91import static org.apache.commons.lang3.concurrent.ConcurrentUtils.createIfAbsentUnchecked;
92import static org.onlab.util.Tools.groupedThreads;
93import static org.slf4j.LoggerFactory.getLogger;
alshabib10580802015-02-18 18:30:33 -080094
95/**
96 * Manages inventory of group entries using trivial in-memory implementation.
97 */
98@Component(immediate = true)
99@Service
100public class DistributedGroupStore
101 extends AbstractStore<GroupEvent, GroupStoreDelegate>
102 implements GroupStore {
103
104 private final Logger log = getLogger(getClass());
105
106 private final int dummyId = 0xffffffff;
107 private final GroupId dummyGroupId = new DefaultGroupId(dummyId);
108
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700109 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
110 protected ClusterCommunicationService clusterCommunicator;
111
112 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
113 protected ClusterService clusterService;
114
115 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700116 protected StorageService storageService;
117
118 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700119 protected MastershipService mastershipService;
120
121 // Per device group table with (device id + app cookie) as key
122 private EventuallyConsistentMap<GroupStoreKeyMapKey,
123 StoredGroupEntry> groupStoreEntriesByKey = null;
124 // Per device group table with (device id + group id) as key
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700125 private final ConcurrentMap<DeviceId, ConcurrentMap<GroupId, StoredGroupEntry>>
126 groupEntriesById = new ConcurrentHashMap<>();
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700127 private EventuallyConsistentMap<GroupStoreKeyMapKey,
128 StoredGroupEntry> auditPendingReqQueue = null;
alshabib10580802015-02-18 18:30:33 -0800129 private final ConcurrentMap<DeviceId, ConcurrentMap<GroupId, Group>>
130 extraneousGroupEntriesById = new ConcurrentHashMap<>();
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700131 private ExecutorService messageHandlingExecutor;
132 private static final int MESSAGE_HANDLER_THREAD_POOL_SIZE = 1;
alshabib10580802015-02-18 18:30:33 -0800133
Sho SHIMIZU7a4087b2015-09-10 09:23:16 -0700134 private final HashMap<DeviceId, Boolean> deviceAuditStatus = new HashMap<>();
alshabib10580802015-02-18 18:30:33 -0800135
136 private final AtomicInteger groupIdGen = new AtomicInteger();
137
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700138 private KryoNamespace.Builder kryoBuilder = null;
139
Madan Jampanibcf1a482015-06-24 19:05:56 -0700140 private final AtomicLong sequenceNumber = new AtomicLong(0);
141
alshabib10580802015-02-18 18:30:33 -0800142 @Activate
143 public void activate() {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700144 kryoBuilder = new KryoNamespace.Builder()
145 .register(DefaultGroup.class,
146 DefaultGroupBucket.class,
147 DefaultGroupDescription.class,
148 DefaultGroupKey.class,
149 GroupDescription.Type.class,
150 Group.GroupState.class,
151 GroupBuckets.class,
152 DefaultGroupId.class,
153 GroupStoreMessage.class,
154 GroupStoreMessage.Type.class,
155 UpdateType.class,
156 GroupStoreMessageSubjects.class,
157 MultiValuedTimestamp.class,
158 GroupStoreKeyMapKey.class,
159 GroupStoreIdMapKey.class,
160 GroupStoreMapKey.class
161 )
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700162 .register(new URISerializer(), URI.class)
163 .register(new DeviceIdSerializer(), DeviceId.class)
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700164 .register(PortNumber.class)
165 .register(DefaultApplicationId.class)
166 .register(DefaultTrafficTreatment.class,
167 Instructions.DropInstruction.class,
168 Instructions.OutputInstruction.class,
169 Instructions.GroupInstruction.class,
170 Instructions.TableTypeTransition.class,
171 FlowRule.Type.class,
172 L0ModificationInstruction.class,
173 L0ModificationInstruction.L0SubType.class,
174 L0ModificationInstruction.ModLambdaInstruction.class,
175 L2ModificationInstruction.class,
176 L2ModificationInstruction.L2SubType.class,
177 L2ModificationInstruction.ModEtherInstruction.class,
178 L2ModificationInstruction.PushHeaderInstructions.class,
179 L2ModificationInstruction.ModVlanIdInstruction.class,
180 L2ModificationInstruction.ModVlanPcpInstruction.class,
181 L2ModificationInstruction.ModMplsLabelInstruction.class,
182 L2ModificationInstruction.ModMplsTtlInstruction.class,
183 L3ModificationInstruction.class,
184 L3ModificationInstruction.L3SubType.class,
185 L3ModificationInstruction.ModIPInstruction.class,
186 L3ModificationInstruction.ModIPv6FlowLabelInstruction.class,
187 L3ModificationInstruction.ModTtlInstruction.class,
188 org.onlab.packet.MplsLabel.class
189 )
190 .register(org.onosproject.cluster.NodeId.class)
191 .register(KryoNamespaces.BASIC)
192 .register(KryoNamespaces.MISC);
193
194 messageHandlingExecutor = Executors.
195 newFixedThreadPool(MESSAGE_HANDLER_THREAD_POOL_SIZE,
196 groupedThreads("onos/store/group",
197 "message-handlers"));
Madan Jampani01e05fb2015-08-13 13:29:36 -0700198
199 clusterCommunicator.addSubscriber(GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST,
200 kryoBuilder.build()::deserialize,
201 this::process,
202 messageHandlingExecutor);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700203
204 log.debug("Creating EC map groupstorekeymap");
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700205 EventuallyConsistentMapBuilder<GroupStoreKeyMapKey, StoredGroupEntry>
206 keyMapBuilder = storageService.eventuallyConsistentMapBuilder();
207
208 groupStoreEntriesByKey = keyMapBuilder
209 .withName("groupstorekeymap")
210 .withSerializer(kryoBuilder)
Madan Jampanibcf1a482015-06-24 19:05:56 -0700211 .withTimestampProvider((k, v) -> new MultiValuedTimestamp<>(System.currentTimeMillis(),
212 sequenceNumber.getAndIncrement()))
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700213 .build();
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700214 groupStoreEntriesByKey.addListener(new GroupStoreKeyMapListener());
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700215 log.debug("Current size of groupstorekeymap:{}",
216 groupStoreEntriesByKey.size());
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700217
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700218 log.debug("Creating EC map pendinggroupkeymap");
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700219 EventuallyConsistentMapBuilder<GroupStoreKeyMapKey, StoredGroupEntry>
220 auditMapBuilder = storageService.eventuallyConsistentMapBuilder();
221
222 auditPendingReqQueue = auditMapBuilder
223 .withName("pendinggroupkeymap")
224 .withSerializer(kryoBuilder)
Madan Jampanibcf1a482015-06-24 19:05:56 -0700225 .withTimestampProvider((k, v) -> new MultiValuedTimestamp<>(System.currentTimeMillis(),
226 sequenceNumber.getAndIncrement()))
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700227 .build();
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700228 log.debug("Current size of pendinggroupkeymap:{}",
229 auditPendingReqQueue.size());
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700230
alshabib10580802015-02-18 18:30:33 -0800231 log.info("Started");
232 }
233
234 @Deactivate
235 public void deactivate() {
Thomas Vachuska152f9fd2015-04-02 16:28:13 -0700236 groupStoreEntriesByKey.destroy();
Thomas Vachuska152f9fd2015-04-02 16:28:13 -0700237 auditPendingReqQueue.destroy();
alshabib10580802015-02-18 18:30:33 -0800238 log.info("Stopped");
239 }
240
alshabib10580802015-02-18 18:30:33 -0800241 private static NewConcurrentHashMap<GroupId, Group>
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700242 lazyEmptyExtraneousGroupIdTable() {
alshabib10580802015-02-18 18:30:33 -0800243 return NewConcurrentHashMap.<GroupId, Group>ifNeeded();
244 }
245
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700246 private static NewConcurrentHashMap<GroupId, StoredGroupEntry>
247 lazyEmptyGroupIdTable() {
248 return NewConcurrentHashMap.<GroupId, StoredGroupEntry>ifNeeded();
249 }
250
alshabib10580802015-02-18 18:30:33 -0800251 /**
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700252 * Returns the group store eventual consistent key map.
alshabib10580802015-02-18 18:30:33 -0800253 *
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700254 * @return Map representing group key table.
alshabib10580802015-02-18 18:30:33 -0800255 */
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700256 private EventuallyConsistentMap<GroupStoreKeyMapKey, StoredGroupEntry>
257 getGroupStoreKeyMap() {
258 return groupStoreEntriesByKey;
alshabib10580802015-02-18 18:30:33 -0800259 }
260
261 /**
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700262 * Returns the group id table for specified device.
alshabib10580802015-02-18 18:30:33 -0800263 *
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700264 * @param deviceId identifier of the device
265 * @return Map representing group key table of given device.
alshabib10580802015-02-18 18:30:33 -0800266 */
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700267 private ConcurrentMap<GroupId, StoredGroupEntry> getGroupIdTable(DeviceId deviceId) {
268 return createIfAbsentUnchecked(groupEntriesById,
269 deviceId, lazyEmptyGroupIdTable());
alshabib10580802015-02-18 18:30:33 -0800270 }
271
272 /**
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700273 * Returns the pending group request table.
alshabib10580802015-02-18 18:30:33 -0800274 *
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700275 * @return Map representing group key table.
alshabib10580802015-02-18 18:30:33 -0800276 */
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700277 private EventuallyConsistentMap<GroupStoreKeyMapKey, StoredGroupEntry>
278 getPendingGroupKeyTable() {
279 return auditPendingReqQueue;
alshabib10580802015-02-18 18:30:33 -0800280 }
281
282 /**
283 * Returns the extraneous group id table for specified device.
284 *
285 * @param deviceId identifier of the device
286 * @return Map representing group key table of given device.
287 */
288 private ConcurrentMap<GroupId, Group>
289 getExtraneousGroupIdTable(DeviceId deviceId) {
290 return createIfAbsentUnchecked(extraneousGroupEntriesById,
291 deviceId,
292 lazyEmptyExtraneousGroupIdTable());
293 }
294
295 /**
296 * Returns the number of groups for the specified device in the store.
297 *
298 * @return number of groups for the specified device
299 */
300 @Override
301 public int getGroupCount(DeviceId deviceId) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700302 return (getGroups(deviceId) != null) ?
303 Iterables.size(getGroups(deviceId)) : 0;
alshabib10580802015-02-18 18:30:33 -0800304 }
305
306 /**
307 * Returns the groups associated with a device.
308 *
309 * @param deviceId the device ID
310 *
311 * @return the group entries
312 */
313 @Override
314 public Iterable<Group> getGroups(DeviceId deviceId) {
315 // flatten and make iterator unmodifiable
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700316 log.debug("getGroups: for device {} total number of groups {}",
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700317 deviceId, getGroupStoreKeyMap().values().size());
318 return FluentIterable.from(getGroupStoreKeyMap().values())
319 .filter(input -> input.deviceId().equals(deviceId))
320 .transform(input -> input);
alshabib10580802015-02-18 18:30:33 -0800321 }
322
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700323 private Iterable<StoredGroupEntry> getStoredGroups(DeviceId deviceId) {
324 // flatten and make iterator unmodifiable
325 log.debug("getGroups: for device {} total number of groups {}",
326 deviceId, getGroupStoreKeyMap().values().size());
327 return FluentIterable.from(getGroupStoreKeyMap().values())
328 .filter(input -> input.deviceId().equals(deviceId));
329 }
330
alshabib10580802015-02-18 18:30:33 -0800331 /**
332 * Returns the stored group entry.
333 *
334 * @param deviceId the device ID
335 * @param appCookie the group key
336 *
337 * @return a group associated with the key
338 */
339 @Override
340 public Group getGroup(DeviceId deviceId, GroupKey appCookie) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700341 return getStoredGroupEntry(deviceId, appCookie);
342 }
343
344 private StoredGroupEntry getStoredGroupEntry(DeviceId deviceId,
345 GroupKey appCookie) {
346 return getGroupStoreKeyMap().get(new GroupStoreKeyMapKey(deviceId,
347 appCookie));
348 }
349
350 @Override
351 public Group getGroup(DeviceId deviceId, GroupId groupId) {
352 return getStoredGroupEntry(deviceId, groupId);
353 }
354
355 private StoredGroupEntry getStoredGroupEntry(DeviceId deviceId,
356 GroupId groupId) {
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700357 return getGroupIdTable(deviceId).get(groupId);
alshabib10580802015-02-18 18:30:33 -0800358 }
359
360 private int getFreeGroupIdValue(DeviceId deviceId) {
361 int freeId = groupIdGen.incrementAndGet();
362
363 while (true) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700364 Group existing = getGroup(deviceId, new DefaultGroupId(freeId));
alshabib10580802015-02-18 18:30:33 -0800365 if (existing == null) {
366 existing = (
367 extraneousGroupEntriesById.get(deviceId) != null) ?
368 extraneousGroupEntriesById.get(deviceId).
369 get(new DefaultGroupId(freeId)) :
370 null;
371 }
372 if (existing != null) {
373 freeId = groupIdGen.incrementAndGet();
374 } else {
375 break;
376 }
377 }
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700378 log.debug("getFreeGroupIdValue: Next Free ID is {}", freeId);
alshabib10580802015-02-18 18:30:33 -0800379 return freeId;
380 }
381
382 /**
383 * Stores a new group entry using the information from group description.
384 *
385 * @param groupDesc group description to be used to create group entry
386 */
387 @Override
388 public void storeGroupDescription(GroupDescription groupDesc) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700389 log.debug("In storeGroupDescription");
alshabib10580802015-02-18 18:30:33 -0800390 // Check if a group is existing with the same key
391 if (getGroup(groupDesc.deviceId(), groupDesc.appCookie()) != null) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700392 log.warn("Group already exists with the same key {}",
393 groupDesc.appCookie());
alshabib10580802015-02-18 18:30:33 -0800394 return;
395 }
396
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700397 // Check if group to be created by a remote instance
Madan Jampani175e8fd2015-05-20 14:10:45 -0700398 if (mastershipService.getLocalRole(groupDesc.deviceId()) != MastershipRole.MASTER) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700399 log.debug("storeGroupDescription: Device {} local role is not MASTER",
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700400 groupDesc.deviceId());
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700401 if (mastershipService.getMasterFor(groupDesc.deviceId()) == null) {
402 log.error("No Master for device {}..."
403 + "Can not perform add group operation",
404 groupDesc.deviceId());
405 //TODO: Send Group operation failure event
406 return;
407 }
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700408 GroupStoreMessage groupOp = GroupStoreMessage.
409 createGroupAddRequestMsg(groupDesc.deviceId(),
410 groupDesc);
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700411
Madan Jampani175e8fd2015-05-20 14:10:45 -0700412 clusterCommunicator.unicast(groupOp,
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700413 GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST,
414 m -> kryoBuilder.build().serialize(m),
Madan Jampani175e8fd2015-05-20 14:10:45 -0700415 mastershipService.getMasterFor(groupDesc.deviceId())).whenComplete((result, error) -> {
416 if (error != null) {
417 log.warn("Failed to send request to master: {} to {}",
418 groupOp,
419 mastershipService.getMasterFor(groupDesc.deviceId()));
420 //TODO: Send Group operation failure event
421 } else {
422 log.debug("Sent Group operation request for device {} "
423 + "to remote MASTER {}",
424 groupDesc.deviceId(),
425 mastershipService.getMasterFor(groupDesc.deviceId()));
426 }
427 });
alshabib10580802015-02-18 18:30:33 -0800428 return;
429 }
430
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700431 log.debug("Store group for device {} is getting handled locally",
432 groupDesc.deviceId());
alshabib10580802015-02-18 18:30:33 -0800433 storeGroupDescriptionInternal(groupDesc);
434 }
435
Srikanth Vavilapallie48b3cf2015-07-06 11:43:07 -0700436 private Group getMatchingExtraneousGroupbyId(DeviceId deviceId, Integer groupId) {
437 ConcurrentMap<GroupId, Group> extraneousMap =
438 extraneousGroupEntriesById.get(deviceId);
439 if (extraneousMap == null) {
440 return null;
441 }
442 return extraneousMap.get(new DefaultGroupId(groupId));
443 }
444
445 private Group getMatchingExtraneousGroupbyBuckets(DeviceId deviceId,
446 GroupBuckets buckets) {
447 ConcurrentMap<GroupId, Group> extraneousMap =
448 extraneousGroupEntriesById.get(deviceId);
449 if (extraneousMap == null) {
450 return null;
451 }
452
453 for (Group extraneousGroup:extraneousMap.values()) {
454 if (extraneousGroup.buckets().equals(buckets)) {
455 return extraneousGroup;
456 }
457 }
458 return null;
459 }
460
alshabib10580802015-02-18 18:30:33 -0800461 private void storeGroupDescriptionInternal(GroupDescription groupDesc) {
462 // Check if a group is existing with the same key
463 if (getGroup(groupDesc.deviceId(), groupDesc.appCookie()) != null) {
464 return;
465 }
466
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700467 if (deviceAuditStatus.get(groupDesc.deviceId()) == null) {
468 // Device group audit has not completed yet
469 // Add this group description to pending group key table
470 // Create a group entry object with Dummy Group ID
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700471 log.debug("storeGroupDescriptionInternal: Device {} AUDIT pending...Queuing Group ADD request",
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700472 groupDesc.deviceId());
473 StoredGroupEntry group = new DefaultGroup(dummyGroupId, groupDesc);
474 group.setState(GroupState.WAITING_AUDIT_COMPLETE);
475 EventuallyConsistentMap<GroupStoreKeyMapKey, StoredGroupEntry> pendingKeyTable =
476 getPendingGroupKeyTable();
477 pendingKeyTable.put(new GroupStoreKeyMapKey(groupDesc.deviceId(),
478 groupDesc.appCookie()),
479 group);
480 return;
481 }
482
Srikanth Vavilapallie48b3cf2015-07-06 11:43:07 -0700483 Group matchingExtraneousGroup = null;
484 if (groupDesc.givenGroupId() != null) {
485 //Check if there is a extraneous group existing with the same Id
486 matchingExtraneousGroup = getMatchingExtraneousGroupbyId(
487 groupDesc.deviceId(), groupDesc.givenGroupId());
488 if (matchingExtraneousGroup != null) {
489 log.debug("storeGroupDescriptionInternal: Matching extraneous group found in Device {} for group id {}",
490 groupDesc.deviceId(),
491 groupDesc.givenGroupId());
492 //Check if the group buckets matches with user provided buckets
493 if (matchingExtraneousGroup.buckets().equals(groupDesc.buckets())) {
494 //Group is already existing with the same buckets and Id
495 // Create a group entry object
496 log.debug("storeGroupDescriptionInternal: Buckets also matching in Device {} for group id {}",
497 groupDesc.deviceId(),
498 groupDesc.givenGroupId());
499 StoredGroupEntry group = new DefaultGroup(
500 matchingExtraneousGroup.id(), groupDesc);
501 // Insert the newly created group entry into key and id maps
502 getGroupStoreKeyMap().
503 put(new GroupStoreKeyMapKey(groupDesc.deviceId(),
504 groupDesc.appCookie()), group);
505 // Ensure it also inserted into group id based table to
506 // avoid any chances of duplication in group id generation
507 getGroupIdTable(groupDesc.deviceId()).
508 put(matchingExtraneousGroup.id(), group);
509 addOrUpdateGroupEntry(matchingExtraneousGroup);
510 removeExtraneousGroupEntry(matchingExtraneousGroup);
511 return;
512 } else {
513 //Group buckets are not matching. Update group
514 //with user provided buckets.
515 //TODO
516 log.debug("storeGroupDescriptionInternal: Buckets are not matching in Device {} for group id {}",
517 groupDesc.deviceId(),
518 groupDesc.givenGroupId());
519 }
520 }
521 } else {
522 //Check if there is an extraneous group with user provided buckets
523 matchingExtraneousGroup = getMatchingExtraneousGroupbyBuckets(
524 groupDesc.deviceId(), groupDesc.buckets());
525 if (matchingExtraneousGroup != null) {
526 //Group is already existing with the same buckets.
527 //So reuse this group.
528 log.debug("storeGroupDescriptionInternal: Matching extraneous group found in Device {}",
529 groupDesc.deviceId());
530 //Create a group entry object
531 StoredGroupEntry group = new DefaultGroup(
532 matchingExtraneousGroup.id(), groupDesc);
533 // Insert the newly created group entry into key and id maps
534 getGroupStoreKeyMap().
535 put(new GroupStoreKeyMapKey(groupDesc.deviceId(),
536 groupDesc.appCookie()), group);
537 // Ensure it also inserted into group id based table to
538 // avoid any chances of duplication in group id generation
539 getGroupIdTable(groupDesc.deviceId()).
540 put(matchingExtraneousGroup.id(), group);
541 addOrUpdateGroupEntry(matchingExtraneousGroup);
542 removeExtraneousGroupEntry(matchingExtraneousGroup);
543 return;
544 } else {
545 //TODO: Check if there are any empty groups that can be used here
546 log.debug("storeGroupDescriptionInternal: No matching extraneous groups found in Device {}",
547 groupDesc.deviceId());
548 }
549 }
550
Saurav Das100e3b82015-04-30 11:12:10 -0700551 GroupId id = null;
552 if (groupDesc.givenGroupId() == null) {
553 // Get a new group identifier
554 id = new DefaultGroupId(getFreeGroupIdValue(groupDesc.deviceId()));
555 } else {
556 id = new DefaultGroupId(groupDesc.givenGroupId());
557 }
alshabib10580802015-02-18 18:30:33 -0800558 // Create a group entry object
559 StoredGroupEntry group = new DefaultGroup(id, groupDesc);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700560 // Insert the newly created group entry into key and id maps
561 getGroupStoreKeyMap().
562 put(new GroupStoreKeyMapKey(groupDesc.deviceId(),
563 groupDesc.appCookie()), group);
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700564 // Ensure it also inserted into group id based table to
565 // avoid any chances of duplication in group id generation
566 getGroupIdTable(groupDesc.deviceId()).
567 put(id, group);
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700568 log.debug("storeGroupDescriptionInternal: Processing Group ADD request for Id {} in device {}",
569 id,
570 groupDesc.deviceId());
alshabib10580802015-02-18 18:30:33 -0800571 notifyDelegate(new GroupEvent(GroupEvent.Type.GROUP_ADD_REQUESTED,
572 group));
573 }
574
575 /**
576 * Updates the existing group entry with the information
577 * from group description.
578 *
579 * @param deviceId the device ID
580 * @param oldAppCookie the current group key
581 * @param type update type
582 * @param newBuckets group buckets for updates
583 * @param newAppCookie optional new group key
584 */
585 @Override
586 public void updateGroupDescription(DeviceId deviceId,
587 GroupKey oldAppCookie,
588 UpdateType type,
589 GroupBuckets newBuckets,
590 GroupKey newAppCookie) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700591 // Check if group update to be done by a remote instance
sangho52abe3a2015-05-05 14:13:34 -0700592 if (mastershipService.getMasterFor(deviceId) != null &&
593 mastershipService.getLocalRole(deviceId) != MastershipRole.MASTER) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700594 log.debug("updateGroupDescription: Device {} local role is not MASTER",
595 deviceId);
596 if (mastershipService.getMasterFor(deviceId) == null) {
597 log.error("No Master for device {}..."
598 + "Can not perform update group operation",
599 deviceId);
600 //TODO: Send Group operation failure event
601 return;
602 }
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700603 GroupStoreMessage groupOp = GroupStoreMessage.
604 createGroupUpdateRequestMsg(deviceId,
605 oldAppCookie,
606 type,
607 newBuckets,
608 newAppCookie);
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700609
Madan Jampani175e8fd2015-05-20 14:10:45 -0700610 clusterCommunicator.unicast(groupOp,
611 GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST,
612 m -> kryoBuilder.build().serialize(m),
613 mastershipService.getMasterFor(deviceId)).whenComplete((result, error) -> {
614 if (error != null) {
615 log.warn("Failed to send request to master: {} to {}",
616 groupOp,
617 mastershipService.getMasterFor(deviceId), error);
618 }
619 //TODO: Send Group operation failure event
620 });
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700621 return;
622 }
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700623 log.debug("updateGroupDescription for device {} is getting handled locally",
624 deviceId);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700625 updateGroupDescriptionInternal(deviceId,
626 oldAppCookie,
627 type,
628 newBuckets,
629 newAppCookie);
630 }
631
632 private void updateGroupDescriptionInternal(DeviceId deviceId,
633 GroupKey oldAppCookie,
634 UpdateType type,
635 GroupBuckets newBuckets,
636 GroupKey newAppCookie) {
alshabib10580802015-02-18 18:30:33 -0800637 // Check if a group is existing with the provided key
638 Group oldGroup = getGroup(deviceId, oldAppCookie);
639 if (oldGroup == null) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700640 log.warn("updateGroupDescriptionInternal: Group not found...strange");
alshabib10580802015-02-18 18:30:33 -0800641 return;
642 }
643
644 List<GroupBucket> newBucketList = getUpdatedBucketList(oldGroup,
645 type,
646 newBuckets);
647 if (newBucketList != null) {
648 // Create a new group object from the old group
649 GroupBuckets updatedBuckets = new GroupBuckets(newBucketList);
650 GroupKey newCookie = (newAppCookie != null) ? newAppCookie : oldAppCookie;
651 GroupDescription updatedGroupDesc = new DefaultGroupDescription(
652 oldGroup.deviceId(),
653 oldGroup.type(),
654 updatedBuckets,
655 newCookie,
Saurav Das100e3b82015-04-30 11:12:10 -0700656 oldGroup.givenGroupId(),
alshabib10580802015-02-18 18:30:33 -0800657 oldGroup.appId());
658 StoredGroupEntry newGroup = new DefaultGroup(oldGroup.id(),
659 updatedGroupDesc);
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700660 log.debug("updateGroupDescriptionInternal: group entry {} in device {} moving from {} to PENDING_UPDATE",
661 oldGroup.id(),
662 oldGroup.deviceId(),
663 oldGroup.state());
alshabib10580802015-02-18 18:30:33 -0800664 newGroup.setState(GroupState.PENDING_UPDATE);
665 newGroup.setLife(oldGroup.life());
666 newGroup.setPackets(oldGroup.packets());
667 newGroup.setBytes(oldGroup.bytes());
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700668 //Update the group entry in groupkey based map.
669 //Update to groupid based map will happen in the
670 //groupkey based map update listener
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700671 log.debug("updateGroupDescriptionInternal with type {}: Group updated with buckets",
672 type);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700673 getGroupStoreKeyMap().
674 put(new GroupStoreKeyMapKey(newGroup.deviceId(),
675 newGroup.appCookie()), newGroup);
alshabib10580802015-02-18 18:30:33 -0800676 notifyDelegate(new GroupEvent(Type.GROUP_UPDATE_REQUESTED, newGroup));
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700677 } else {
678 log.warn("updateGroupDescriptionInternal with type {}: No "
679 + "change in the buckets in update", type);
alshabib10580802015-02-18 18:30:33 -0800680 }
681 }
682
683 private List<GroupBucket> getUpdatedBucketList(Group oldGroup,
684 UpdateType type,
685 GroupBuckets buckets) {
686 GroupBuckets oldBuckets = oldGroup.buckets();
Sho SHIMIZU7a4087b2015-09-10 09:23:16 -0700687 List<GroupBucket> newBucketList = new ArrayList<>(oldBuckets.buckets());
alshabib10580802015-02-18 18:30:33 -0800688 boolean groupDescUpdated = false;
689
690 if (type == UpdateType.ADD) {
691 // Check if the any of the new buckets are part of
692 // the old bucket list
693 for (GroupBucket addBucket:buckets.buckets()) {
694 if (!newBucketList.contains(addBucket)) {
695 newBucketList.add(addBucket);
696 groupDescUpdated = true;
697 }
698 }
699 } else if (type == UpdateType.REMOVE) {
700 // Check if the to be removed buckets are part of the
701 // old bucket list
702 for (GroupBucket removeBucket:buckets.buckets()) {
703 if (newBucketList.contains(removeBucket)) {
704 newBucketList.remove(removeBucket);
705 groupDescUpdated = true;
706 }
707 }
708 }
709
710 if (groupDescUpdated) {
711 return newBucketList;
712 } else {
713 return null;
714 }
715 }
716
717 /**
718 * Triggers deleting the existing group entry.
719 *
720 * @param deviceId the device ID
721 * @param appCookie the group key
722 */
723 @Override
724 public void deleteGroupDescription(DeviceId deviceId,
725 GroupKey appCookie) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700726 // Check if group to be deleted by a remote instance
727 if (mastershipService.
728 getLocalRole(deviceId) != MastershipRole.MASTER) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700729 log.debug("deleteGroupDescription: Device {} local role is not MASTER",
730 deviceId);
731 if (mastershipService.getMasterFor(deviceId) == null) {
732 log.error("No Master for device {}..."
733 + "Can not perform delete group operation",
734 deviceId);
735 //TODO: Send Group operation failure event
736 return;
737 }
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700738 GroupStoreMessage groupOp = GroupStoreMessage.
739 createGroupDeleteRequestMsg(deviceId,
740 appCookie);
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700741
Madan Jampani175e8fd2015-05-20 14:10:45 -0700742 clusterCommunicator.unicast(groupOp,
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700743 GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST,
744 m -> kryoBuilder.build().serialize(m),
Madan Jampani175e8fd2015-05-20 14:10:45 -0700745 mastershipService.getMasterFor(deviceId)).whenComplete((result, error) -> {
746 if (error != null) {
747 log.warn("Failed to send request to master: {} to {}",
748 groupOp,
749 mastershipService.getMasterFor(deviceId), error);
750 }
751 //TODO: Send Group operation failure event
752 });
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700753 return;
754 }
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700755 log.debug("deleteGroupDescription in device {} is getting handled locally",
756 deviceId);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700757 deleteGroupDescriptionInternal(deviceId, appCookie);
758 }
759
760 private void deleteGroupDescriptionInternal(DeviceId deviceId,
761 GroupKey appCookie) {
alshabib10580802015-02-18 18:30:33 -0800762 // Check if a group is existing with the provided key
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700763 StoredGroupEntry existing = getStoredGroupEntry(deviceId, appCookie);
alshabib10580802015-02-18 18:30:33 -0800764 if (existing == null) {
765 return;
766 }
767
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700768 log.debug("deleteGroupDescriptionInternal: group entry {} in device {} moving from {} to PENDING_DELETE",
769 existing.id(),
770 existing.deviceId(),
771 existing.state());
alshabib10580802015-02-18 18:30:33 -0800772 synchronized (existing) {
773 existing.setState(GroupState.PENDING_DELETE);
774 }
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700775 log.debug("deleteGroupDescriptionInternal: in device {} issuing GROUP_REMOVE_REQUESTED",
776 deviceId);
alshabib10580802015-02-18 18:30:33 -0800777 notifyDelegate(new GroupEvent(Type.GROUP_REMOVE_REQUESTED, existing));
778 }
779
780 /**
781 * Stores a new group entry, or updates an existing entry.
782 *
783 * @param group group entry
784 */
785 @Override
786 public void addOrUpdateGroupEntry(Group group) {
787 // check if this new entry is an update to an existing entry
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700788 StoredGroupEntry existing = getStoredGroupEntry(group.deviceId(),
789 group.id());
alshabib10580802015-02-18 18:30:33 -0800790 GroupEvent event = null;
791
792 if (existing != null) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700793 log.debug("addOrUpdateGroupEntry: updating group entry {} in device {}",
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700794 group.id(),
795 group.deviceId());
alshabib10580802015-02-18 18:30:33 -0800796 synchronized (existing) {
Srikanth Vavilapalli10e75cd2015-04-13 16:21:24 -0700797 for (GroupBucket bucket:group.buckets().buckets()) {
Sho SHIMIZU30d639b2015-05-05 09:30:35 -0700798 Optional<GroupBucket> matchingBucket =
Srikanth Vavilapalli10e75cd2015-04-13 16:21:24 -0700799 existing.buckets().buckets()
800 .stream()
801 .filter((existingBucket)->(existingBucket.equals(bucket)))
802 .findFirst();
803 if (matchingBucket.isPresent()) {
804 ((StoredGroupBucketEntry) matchingBucket.
805 get()).setPackets(bucket.packets());
806 ((StoredGroupBucketEntry) matchingBucket.
807 get()).setBytes(bucket.bytes());
808 } else {
809 log.warn("addOrUpdateGroupEntry: No matching "
810 + "buckets to update stats");
811 }
812 }
alshabib10580802015-02-18 18:30:33 -0800813 existing.setLife(group.life());
814 existing.setPackets(group.packets());
815 existing.setBytes(group.bytes());
Srikanth Vavilapalli5428b6c2015-05-14 20:22:47 -0700816 if ((existing.state() == GroupState.PENDING_ADD) ||
817 (existing.state() == GroupState.PENDING_ADD_RETRY)) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700818 log.debug("addOrUpdateGroupEntry: group entry {} in device {} moving from {} to ADDED",
819 existing.id(),
820 existing.deviceId(),
Srikanth Vavilapalli5428b6c2015-05-14 20:22:47 -0700821 existing.state());
alshabib10580802015-02-18 18:30:33 -0800822 existing.setState(GroupState.ADDED);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700823 existing.setIsGroupStateAddedFirstTime(true);
alshabib10580802015-02-18 18:30:33 -0800824 event = new GroupEvent(Type.GROUP_ADDED, existing);
825 } else {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700826 log.debug("addOrUpdateGroupEntry: group entry {} in device {} moving from {} to ADDED",
827 existing.id(),
828 existing.deviceId(),
829 GroupState.PENDING_UPDATE);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700830 existing.setState(GroupState.ADDED);
831 existing.setIsGroupStateAddedFirstTime(false);
alshabib10580802015-02-18 18:30:33 -0800832 event = new GroupEvent(Type.GROUP_UPDATED, existing);
833 }
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700834 //Re-PUT map entries to trigger map update events
835 getGroupStoreKeyMap().
836 put(new GroupStoreKeyMapKey(existing.deviceId(),
837 existing.appCookie()), existing);
alshabib10580802015-02-18 18:30:33 -0800838 }
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700839 } else {
840 log.warn("addOrUpdateGroupEntry: Group update "
841 + "happening for a non-existing entry in the map");
alshabib10580802015-02-18 18:30:33 -0800842 }
843
844 if (event != null) {
845 notifyDelegate(event);
846 }
847 }
848
849 /**
850 * Removes the group entry from store.
851 *
852 * @param group group entry
853 */
854 @Override
855 public void removeGroupEntry(Group group) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700856 StoredGroupEntry existing = getStoredGroupEntry(group.deviceId(),
857 group.id());
alshabib10580802015-02-18 18:30:33 -0800858
859 if (existing != null) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700860 log.debug("removeGroupEntry: removing group entry {} in device {}",
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700861 group.id(),
862 group.deviceId());
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700863 //Removal from groupid based map will happen in the
864 //map update listener
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700865 getGroupStoreKeyMap().remove(new GroupStoreKeyMapKey(existing.deviceId(),
866 existing.appCookie()));
alshabib10580802015-02-18 18:30:33 -0800867 notifyDelegate(new GroupEvent(Type.GROUP_REMOVED, existing));
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700868 } else {
869 log.warn("removeGroupEntry for {} in device{} is "
870 + "not existing in our maps",
871 group.id(),
872 group.deviceId());
alshabib10580802015-02-18 18:30:33 -0800873 }
874 }
875
876 @Override
877 public void deviceInitialAuditCompleted(DeviceId deviceId,
878 boolean completed) {
879 synchronized (deviceAuditStatus) {
880 if (completed) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700881 log.debug("AUDIT completed for device {}",
882 deviceId);
alshabib10580802015-02-18 18:30:33 -0800883 deviceAuditStatus.put(deviceId, true);
884 // Execute all pending group requests
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700885 List<StoredGroupEntry> pendingGroupRequests =
886 getPendingGroupKeyTable().values()
887 .stream()
888 .filter(g-> g.deviceId().equals(deviceId))
889 .collect(Collectors.toList());
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700890 log.debug("processing pending group add requests for device {} and number of pending requests {}",
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700891 deviceId,
892 pendingGroupRequests.size());
893 for (Group group:pendingGroupRequests) {
alshabib10580802015-02-18 18:30:33 -0800894 GroupDescription tmp = new DefaultGroupDescription(
895 group.deviceId(),
896 group.type(),
897 group.buckets(),
898 group.appCookie(),
Saurav Das100e3b82015-04-30 11:12:10 -0700899 group.givenGroupId(),
alshabib10580802015-02-18 18:30:33 -0800900 group.appId());
901 storeGroupDescriptionInternal(tmp);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700902 getPendingGroupKeyTable().
903 remove(new GroupStoreKeyMapKey(deviceId, group.appCookie()));
alshabib10580802015-02-18 18:30:33 -0800904 }
alshabib10580802015-02-18 18:30:33 -0800905 } else {
Thomas Vachuskac40d4632015-04-09 16:55:03 -0700906 Boolean audited = deviceAuditStatus.get(deviceId);
907 if (audited != null && audited) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700908 log.debug("Clearing AUDIT status for device {}", deviceId);
alshabib10580802015-02-18 18:30:33 -0800909 deviceAuditStatus.put(deviceId, false);
910 }
911 }
912 }
913 }
914
915 @Override
916 public boolean deviceInitialAuditStatus(DeviceId deviceId) {
917 synchronized (deviceAuditStatus) {
Thomas Vachuskac40d4632015-04-09 16:55:03 -0700918 Boolean audited = deviceAuditStatus.get(deviceId);
919 return audited != null && audited;
alshabib10580802015-02-18 18:30:33 -0800920 }
921 }
922
923 @Override
924 public void groupOperationFailed(DeviceId deviceId, GroupOperation operation) {
925
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700926 StoredGroupEntry existing = getStoredGroupEntry(deviceId,
927 operation.groupId());
alshabib10580802015-02-18 18:30:33 -0800928
929 if (existing == null) {
930 log.warn("No group entry with ID {} found ", operation.groupId());
931 return;
932 }
933
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700934 log.warn("groupOperationFailed: group operation {} failed"
935 + "for group {} in device {}",
936 operation.opType(),
937 existing.id(),
938 existing.deviceId());
alshabib10580802015-02-18 18:30:33 -0800939 switch (operation.opType()) {
940 case ADD:
Srikanth Vavilapalli5428b6c2015-05-14 20:22:47 -0700941 if (existing.state() == GroupState.PENDING_ADD) {
942 //TODO: Need to add support for passing the group
943 //operation failure reason from group provider.
944 //If the error type is anything other than GROUP_EXISTS,
945 //then the GROUP_ADD_FAILED event should be raised even
946 //in PENDING_ADD_RETRY state also.
947 notifyDelegate(new GroupEvent(Type.GROUP_ADD_FAILED, existing));
948 log.warn("groupOperationFailed: cleaningup "
949 + "group {} from store in device {}....",
950 existing.id(),
951 existing.deviceId());
952 //Removal from groupid based map will happen in the
953 //map update listener
954 getGroupStoreKeyMap().remove(new GroupStoreKeyMapKey(existing.deviceId(),
955 existing.appCookie()));
956 }
alshabib10580802015-02-18 18:30:33 -0800957 break;
958 case MODIFY:
959 notifyDelegate(new GroupEvent(Type.GROUP_UPDATE_FAILED, existing));
960 break;
961 case DELETE:
962 notifyDelegate(new GroupEvent(Type.GROUP_REMOVE_FAILED, existing));
963 break;
964 default:
965 log.warn("Unknown group operation type {}", operation.opType());
966 }
alshabib10580802015-02-18 18:30:33 -0800967 }
968
969 @Override
970 public void addOrUpdateExtraneousGroupEntry(Group group) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700971 log.debug("add/update extraneous group entry {} in device {}",
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700972 group.id(),
973 group.deviceId());
alshabib10580802015-02-18 18:30:33 -0800974 ConcurrentMap<GroupId, Group> extraneousIdTable =
975 getExtraneousGroupIdTable(group.deviceId());
976 extraneousIdTable.put(group.id(), group);
Srikanth Vavilapallie48b3cf2015-07-06 11:43:07 -0700977 // Don't remove the extraneous groups, instead re-use it when
978 // a group request comes with the same set of buckets
alshabib10580802015-02-18 18:30:33 -0800979 }
980
981 @Override
982 public void removeExtraneousGroupEntry(Group group) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700983 log.debug("remove extraneous group entry {} of device {} from store",
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700984 group.id(),
985 group.deviceId());
alshabib10580802015-02-18 18:30:33 -0800986 ConcurrentMap<GroupId, Group> extraneousIdTable =
987 getExtraneousGroupIdTable(group.deviceId());
988 extraneousIdTable.remove(group.id());
989 }
990
991 @Override
992 public Iterable<Group> getExtraneousGroups(DeviceId deviceId) {
993 // flatten and make iterator unmodifiable
994 return FluentIterable.from(
995 getExtraneousGroupIdTable(deviceId).values());
996 }
997
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700998 /**
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700999 * Map handler to receive any events when the group key map is updated.
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001000 */
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -07001001 private class GroupStoreKeyMapListener implements
1002 EventuallyConsistentMapListener<GroupStoreKeyMapKey, StoredGroupEntry> {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001003
1004 @Override
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -07001005 public void event(EventuallyConsistentMapEvent<GroupStoreKeyMapKey,
Jonathan Hart6ec029a2015-03-24 17:12:35 -07001006 StoredGroupEntry> mapEvent) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001007 GroupEvent groupEvent = null;
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001008 GroupStoreKeyMapKey key = mapEvent.key();
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -07001009 StoredGroupEntry group = mapEvent.value();
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001010 if ((key == null) && (group == null)) {
1011 log.error("GroupStoreKeyMapListener: Received "
1012 + "event {} with null entry", mapEvent.type());
1013 return;
1014 } else if (group == null) {
1015 group = getGroupIdTable(key.deviceId()).values()
1016 .stream()
1017 .filter((storedGroup) -> (storedGroup.appCookie().equals(key.appCookie)))
1018 .findFirst().get();
1019 if (group == null) {
1020 log.error("GroupStoreKeyMapListener: Received "
1021 + "event {} with null entry... can not process", mapEvent.type());
1022 return;
1023 }
1024 }
1025 log.trace("received groupid map event {} for id {} in device {}",
1026 mapEvent.type(),
1027 group.id(),
1028 key.deviceId());
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001029 if (mapEvent.type() == EventuallyConsistentMapEvent.Type.PUT) {
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -07001030 // Update the group ID table
1031 getGroupIdTable(group.deviceId()).put(group.id(), group);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001032 if (mapEvent.value().state() == Group.GroupState.ADDED) {
1033 if (mapEvent.value().isGroupStateAddedFirstTime()) {
1034 groupEvent = new GroupEvent(Type.GROUP_ADDED,
1035 mapEvent.value());
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001036 log.trace("Received first time GROUP_ADDED state update for id {} in device {}",
1037 group.id(),
1038 group.deviceId());
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001039 } else {
1040 groupEvent = new GroupEvent(Type.GROUP_UPDATED,
1041 mapEvent.value());
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001042 log.trace("Received following GROUP_ADDED state update for id {} in device {}",
1043 group.id(),
1044 group.deviceId());
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001045 }
1046 }
1047 } else if (mapEvent.type() == EventuallyConsistentMapEvent.Type.REMOVE) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001048 groupEvent = new GroupEvent(Type.GROUP_REMOVED, group);
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -07001049 // Remove the entry from the group ID table
1050 getGroupIdTable(group.deviceId()).remove(group.id(), group);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001051 }
1052
1053 if (groupEvent != null) {
1054 notifyDelegate(groupEvent);
1055 }
1056 }
1057 }
Madan Jampani01e05fb2015-08-13 13:29:36 -07001058
1059 private void process(GroupStoreMessage groupOp) {
1060 log.debug("Received remote group operation {} request for device {}",
1061 groupOp.type(),
1062 groupOp.deviceId());
1063 if (!mastershipService.isLocalMaster(groupOp.deviceId())) {
1064 log.warn("This node is not MASTER for device {}", groupOp.deviceId());
1065 return;
1066 }
1067 if (groupOp.type() == GroupStoreMessage.Type.ADD) {
1068 storeGroupDescriptionInternal(groupOp.groupDesc());
1069 } else if (groupOp.type() == GroupStoreMessage.Type.UPDATE) {
1070 updateGroupDescriptionInternal(groupOp.deviceId(),
1071 groupOp.appCookie(),
1072 groupOp.updateType(),
1073 groupOp.updateBuckets(),
1074 groupOp.newAppCookie());
1075 } else if (groupOp.type() == GroupStoreMessage.Type.DELETE) {
1076 deleteGroupDescriptionInternal(groupOp.deviceId(),
1077 groupOp.appCookie());
1078 }
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001079 }
1080
1081 /**
1082 * Flattened map key to be used to store group entries.
1083 */
Ray Milkeyb3c5ce22015-08-10 09:07:36 -07001084 protected static class GroupStoreMapKey {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001085 private final DeviceId deviceId;
1086
1087 public GroupStoreMapKey(DeviceId deviceId) {
1088 this.deviceId = deviceId;
1089 }
1090
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001091 public DeviceId deviceId() {
1092 return deviceId;
1093 }
1094
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001095 @Override
1096 public boolean equals(Object o) {
1097 if (this == o) {
1098 return true;
1099 }
1100 if (!(o instanceof GroupStoreMapKey)) {
1101 return false;
1102 }
1103 GroupStoreMapKey that = (GroupStoreMapKey) o;
1104 return this.deviceId.equals(that.deviceId);
1105 }
1106
1107 @Override
1108 public int hashCode() {
1109 int result = 17;
1110
1111 result = 31 * result + Objects.hash(this.deviceId);
1112
1113 return result;
1114 }
1115 }
1116
Ray Milkeyb3c5ce22015-08-10 09:07:36 -07001117 protected static class GroupStoreKeyMapKey extends GroupStoreMapKey {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001118 private final GroupKey appCookie;
1119 public GroupStoreKeyMapKey(DeviceId deviceId,
1120 GroupKey appCookie) {
1121 super(deviceId);
1122 this.appCookie = appCookie;
1123 }
1124
1125 @Override
1126 public boolean equals(Object o) {
1127 if (this == o) {
1128 return true;
1129 }
1130 if (!(o instanceof GroupStoreKeyMapKey)) {
1131 return false;
1132 }
1133 GroupStoreKeyMapKey that = (GroupStoreKeyMapKey) o;
1134 return (super.equals(that) &&
1135 this.appCookie.equals(that.appCookie));
1136 }
1137
1138 @Override
1139 public int hashCode() {
1140 int result = 17;
1141
1142 result = 31 * result + super.hashCode() + Objects.hash(this.appCookie);
1143
1144 return result;
1145 }
1146 }
1147
Ray Milkeyb3c5ce22015-08-10 09:07:36 -07001148 protected static class GroupStoreIdMapKey extends GroupStoreMapKey {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001149 private final GroupId groupId;
1150 public GroupStoreIdMapKey(DeviceId deviceId,
1151 GroupId groupId) {
1152 super(deviceId);
1153 this.groupId = groupId;
1154 }
1155
1156 @Override
1157 public boolean equals(Object o) {
1158 if (this == o) {
1159 return true;
1160 }
1161 if (!(o instanceof GroupStoreIdMapKey)) {
1162 return false;
1163 }
1164 GroupStoreIdMapKey that = (GroupStoreIdMapKey) o;
1165 return (super.equals(that) &&
1166 this.groupId.equals(that.groupId));
1167 }
1168
1169 @Override
1170 public int hashCode() {
1171 int result = 17;
1172
1173 result = 31 * result + super.hashCode() + Objects.hash(this.groupId);
1174
1175 return result;
1176 }
1177 }
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001178
1179 @Override
1180 public void pushGroupMetrics(DeviceId deviceId,
1181 Collection<Group> groupEntries) {
1182 boolean deviceInitialAuditStatus =
1183 deviceInitialAuditStatus(deviceId);
1184 Set<Group> southboundGroupEntries =
1185 Sets.newHashSet(groupEntries);
1186 Set<StoredGroupEntry> storedGroupEntries =
1187 Sets.newHashSet(getStoredGroups(deviceId));
1188 Set<Group> extraneousStoredEntries =
1189 Sets.newHashSet(getExtraneousGroups(deviceId));
1190
1191 log.trace("pushGroupMetrics: Displaying all ({}) southboundGroupEntries for device {}",
1192 southboundGroupEntries.size(),
1193 deviceId);
1194 for (Iterator<Group> it = southboundGroupEntries.iterator(); it.hasNext();) {
1195 Group group = it.next();
1196 log.trace("Group {} in device {}", group, deviceId);
1197 }
1198
1199 log.trace("Displaying all ({}) stored group entries for device {}",
1200 storedGroupEntries.size(),
1201 deviceId);
1202 for (Iterator<StoredGroupEntry> it1 = storedGroupEntries.iterator();
1203 it1.hasNext();) {
1204 Group group = it1.next();
1205 log.trace("Stored Group {} for device {}", group, deviceId);
1206 }
1207
1208 for (Iterator<Group> it2 = southboundGroupEntries.iterator(); it2.hasNext();) {
1209 Group group = it2.next();
1210 if (storedGroupEntries.remove(group)) {
1211 // we both have the group, let's update some info then.
1212 log.trace("Group AUDIT: group {} exists in both planes for device {}",
1213 group.id(), deviceId);
1214 groupAdded(group);
1215 it2.remove();
1216 }
1217 }
1218 for (Group group : southboundGroupEntries) {
1219 if (getGroup(group.deviceId(), group.id()) != null) {
1220 // There is a group existing with the same id
1221 // It is possible that group update is
1222 // in progress while we got a stale info from switch
1223 if (!storedGroupEntries.remove(getGroup(
1224 group.deviceId(), group.id()))) {
1225 log.warn("Group AUDIT: Inconsistent state:"
1226 + "Group exists in ID based table while "
1227 + "not present in key based table");
1228 }
1229 } else {
1230 // there are groups in the switch that aren't in the store
1231 log.debug("Group AUDIT: extraneous group {} exists in data plane for device {}",
1232 group.id(), deviceId);
1233 extraneousStoredEntries.remove(group);
1234 extraneousGroup(group);
1235 }
1236 }
1237 for (Group group : storedGroupEntries) {
1238 // there are groups in the store that aren't in the switch
1239 log.debug("Group AUDIT: group {} missing in data plane for device {}",
1240 group.id(), deviceId);
1241 groupMissing(group);
1242 }
1243 for (Group group : extraneousStoredEntries) {
1244 // there are groups in the extraneous store that
1245 // aren't in the switch
1246 log.debug("Group AUDIT: clearing extransoeus group {} from store for device {}",
1247 group.id(), deviceId);
1248 removeExtraneousGroupEntry(group);
1249 }
1250
1251 if (!deviceInitialAuditStatus) {
1252 log.debug("Group AUDIT: Setting device {} initial AUDIT completed",
1253 deviceId);
1254 deviceInitialAuditCompleted(deviceId, true);
1255 }
1256 }
1257
1258 private void groupMissing(Group group) {
1259 switch (group.state()) {
1260 case PENDING_DELETE:
1261 log.debug("Group {} delete confirmation from device {}",
1262 group, group.deviceId());
1263 removeGroupEntry(group);
1264 break;
1265 case ADDED:
1266 case PENDING_ADD:
Srikanth Vavilapalli5428b6c2015-05-14 20:22:47 -07001267 case PENDING_ADD_RETRY:
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001268 case PENDING_UPDATE:
1269 log.debug("Group {} is in store but not on device {}",
1270 group, group.deviceId());
1271 StoredGroupEntry existing =
1272 getStoredGroupEntry(group.deviceId(), group.id());
Srikanth Vavilapalli5428b6c2015-05-14 20:22:47 -07001273 log.debug("groupMissing: group entry {} in device {} moving from {} to PENDING_ADD_RETRY",
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001274 existing.id(),
1275 existing.deviceId(),
1276 existing.state());
Srikanth Vavilapalli5428b6c2015-05-14 20:22:47 -07001277 existing.setState(Group.GroupState.PENDING_ADD_RETRY);
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001278 //Re-PUT map entries to trigger map update events
1279 getGroupStoreKeyMap().
1280 put(new GroupStoreKeyMapKey(existing.deviceId(),
1281 existing.appCookie()), existing);
1282 notifyDelegate(new GroupEvent(GroupEvent.Type.GROUP_ADD_REQUESTED,
1283 group));
1284 break;
1285 default:
1286 log.debug("Group {} has not been installed.", group);
1287 break;
1288 }
1289 }
1290
1291 private void extraneousGroup(Group group) {
1292 log.debug("Group {} is on device {} but not in store.",
1293 group, group.deviceId());
1294 addOrUpdateExtraneousGroupEntry(group);
1295 }
1296
1297 private void groupAdded(Group group) {
1298 log.trace("Group {} Added or Updated in device {}",
1299 group, group.deviceId());
1300 addOrUpdateGroupEntry(group);
1301 }
alshabib10580802015-02-18 18:30:33 -08001302}