blob: cf48dcb804e2c8fb820fdc7b13447be3ba7f5144 [file] [log] [blame]
alshabib10580802015-02-18 18:30:33 -08001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.group.impl;
17
Jonathan Hart6ec029a2015-03-24 17:12:35 -070018import com.google.common.collect.FluentIterable;
19import com.google.common.collect.Iterables;
Srikanth Vavilapalli23181912015-05-04 09:48:09 -070020import com.google.common.collect.Sets;
Srikanth Vavilapalli10e75cd2015-04-13 16:21:24 -070021
alshabib10580802015-02-18 18:30:33 -080022import org.apache.felix.scr.annotations.Activate;
23import org.apache.felix.scr.annotations.Component;
24import org.apache.felix.scr.annotations.Deactivate;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070025import org.apache.felix.scr.annotations.Reference;
26import org.apache.felix.scr.annotations.ReferenceCardinality;
alshabib10580802015-02-18 18:30:33 -080027import org.apache.felix.scr.annotations.Service;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070028import org.onlab.util.KryoNamespace;
alshabib10580802015-02-18 18:30:33 -080029import org.onlab.util.NewConcurrentHashMap;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070030import org.onosproject.cluster.ClusterService;
31import org.onosproject.core.DefaultApplicationId;
alshabib10580802015-02-18 18:30:33 -080032import org.onosproject.core.DefaultGroupId;
33import org.onosproject.core.GroupId;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070034import org.onosproject.mastership.MastershipService;
alshabib10580802015-02-18 18:30:33 -080035import org.onosproject.net.DeviceId;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070036import org.onosproject.net.MastershipRole;
37import org.onosproject.net.PortNumber;
38import org.onosproject.net.flow.DefaultTrafficTreatment;
39import org.onosproject.net.flow.FlowRule;
40import org.onosproject.net.flow.instructions.Instructions;
41import org.onosproject.net.flow.instructions.L0ModificationInstruction;
42import org.onosproject.net.flow.instructions.L2ModificationInstruction;
43import org.onosproject.net.flow.instructions.L3ModificationInstruction;
alshabib10580802015-02-18 18:30:33 -080044import org.onosproject.net.group.DefaultGroup;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070045import org.onosproject.net.group.DefaultGroupBucket;
alshabib10580802015-02-18 18:30:33 -080046import org.onosproject.net.group.DefaultGroupDescription;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070047import org.onosproject.net.group.DefaultGroupKey;
alshabib10580802015-02-18 18:30:33 -080048import org.onosproject.net.group.Group;
49import org.onosproject.net.group.Group.GroupState;
50import org.onosproject.net.group.GroupBucket;
51import org.onosproject.net.group.GroupBuckets;
52import org.onosproject.net.group.GroupDescription;
53import org.onosproject.net.group.GroupEvent;
54import org.onosproject.net.group.GroupEvent.Type;
55import org.onosproject.net.group.GroupKey;
56import org.onosproject.net.group.GroupOperation;
57import org.onosproject.net.group.GroupStore;
58import org.onosproject.net.group.GroupStoreDelegate;
Srikanth Vavilapalli10e75cd2015-04-13 16:21:24 -070059import org.onosproject.net.group.StoredGroupBucketEntry;
alshabib10580802015-02-18 18:30:33 -080060import org.onosproject.net.group.StoredGroupEntry;
61import org.onosproject.store.AbstractStore;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070062import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
Jonathan Hart63939a32015-05-08 11:57:03 -070063import org.onosproject.store.service.MultiValuedTimestamp;
Srikanth Vavilapalli23181912015-05-04 09:48:09 -070064import org.onosproject.store.serializers.DeviceIdSerializer;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070065import org.onosproject.store.serializers.KryoNamespaces;
Srikanth Vavilapalli23181912015-05-04 09:48:09 -070066import org.onosproject.store.serializers.URISerializer;
Jonathan Hart6ec029a2015-03-24 17:12:35 -070067import org.onosproject.store.service.EventuallyConsistentMap;
68import org.onosproject.store.service.EventuallyConsistentMapBuilder;
69import org.onosproject.store.service.EventuallyConsistentMapEvent;
70import org.onosproject.store.service.EventuallyConsistentMapListener;
71import org.onosproject.store.service.StorageService;
alshabib10580802015-02-18 18:30:33 -080072import org.slf4j.Logger;
73
Jonathan Hart6ec029a2015-03-24 17:12:35 -070074import java.net.URI;
75import java.util.ArrayList;
Srikanth Vavilapalli23181912015-05-04 09:48:09 -070076import java.util.Collection;
Jonathan Hart6ec029a2015-03-24 17:12:35 -070077import java.util.HashMap;
Srikanth Vavilapalli23181912015-05-04 09:48:09 -070078import java.util.Iterator;
Jonathan Hart6ec029a2015-03-24 17:12:35 -070079import java.util.List;
80import java.util.Objects;
Sho SHIMIZU30d639b2015-05-05 09:30:35 -070081import java.util.Optional;
Srikanth Vavilapalli23181912015-05-04 09:48:09 -070082import java.util.Set;
Jonathan Hart6ec029a2015-03-24 17:12:35 -070083import java.util.concurrent.ConcurrentHashMap;
84import java.util.concurrent.ConcurrentMap;
85import java.util.concurrent.ExecutorService;
86import java.util.concurrent.Executors;
87import java.util.concurrent.atomic.AtomicInteger;
88import java.util.concurrent.atomic.AtomicLong;
89import java.util.stream.Collectors;
90
91import static org.apache.commons.lang3.concurrent.ConcurrentUtils.createIfAbsentUnchecked;
92import static org.onlab.util.Tools.groupedThreads;
93import static org.slf4j.LoggerFactory.getLogger;
alshabib10580802015-02-18 18:30:33 -080094
95/**
96 * Manages inventory of group entries using trivial in-memory implementation.
97 */
98@Component(immediate = true)
99@Service
100public class DistributedGroupStore
101 extends AbstractStore<GroupEvent, GroupStoreDelegate>
102 implements GroupStore {
103
104 private final Logger log = getLogger(getClass());
105
106 private final int dummyId = 0xffffffff;
107 private final GroupId dummyGroupId = new DefaultGroupId(dummyId);
108
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700109 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
110 protected ClusterCommunicationService clusterCommunicator;
111
112 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
113 protected ClusterService clusterService;
114
115 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700116 protected StorageService storageService;
117
118 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700119 protected MastershipService mastershipService;
120
121 // Per device group table with (device id + app cookie) as key
122 private EventuallyConsistentMap<GroupStoreKeyMapKey,
123 StoredGroupEntry> groupStoreEntriesByKey = null;
124 // Per device group table with (device id + group id) as key
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700125 private final ConcurrentMap<DeviceId, ConcurrentMap<GroupId, StoredGroupEntry>>
126 groupEntriesById = new ConcurrentHashMap<>();
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700127 private EventuallyConsistentMap<GroupStoreKeyMapKey,
128 StoredGroupEntry> auditPendingReqQueue = null;
alshabib10580802015-02-18 18:30:33 -0800129 private final ConcurrentMap<DeviceId, ConcurrentMap<GroupId, Group>>
130 extraneousGroupEntriesById = new ConcurrentHashMap<>();
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700131 private ExecutorService messageHandlingExecutor;
132 private static final int MESSAGE_HANDLER_THREAD_POOL_SIZE = 1;
alshabib10580802015-02-18 18:30:33 -0800133
134 private final HashMap<DeviceId, Boolean> deviceAuditStatus =
135 new HashMap<DeviceId, Boolean>();
136
137 private final AtomicInteger groupIdGen = new AtomicInteger();
138
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700139 private KryoNamespace.Builder kryoBuilder = null;
140
Madan Jampanibcf1a482015-06-24 19:05:56 -0700141 private final AtomicLong sequenceNumber = new AtomicLong(0);
142
alshabib10580802015-02-18 18:30:33 -0800143 @Activate
144 public void activate() {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700145 kryoBuilder = new KryoNamespace.Builder()
146 .register(DefaultGroup.class,
147 DefaultGroupBucket.class,
148 DefaultGroupDescription.class,
149 DefaultGroupKey.class,
150 GroupDescription.Type.class,
151 Group.GroupState.class,
152 GroupBuckets.class,
153 DefaultGroupId.class,
154 GroupStoreMessage.class,
155 GroupStoreMessage.Type.class,
156 UpdateType.class,
157 GroupStoreMessageSubjects.class,
158 MultiValuedTimestamp.class,
159 GroupStoreKeyMapKey.class,
160 GroupStoreIdMapKey.class,
161 GroupStoreMapKey.class
162 )
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700163 .register(new URISerializer(), URI.class)
164 .register(new DeviceIdSerializer(), DeviceId.class)
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700165 .register(PortNumber.class)
166 .register(DefaultApplicationId.class)
167 .register(DefaultTrafficTreatment.class,
168 Instructions.DropInstruction.class,
169 Instructions.OutputInstruction.class,
170 Instructions.GroupInstruction.class,
171 Instructions.TableTypeTransition.class,
172 FlowRule.Type.class,
173 L0ModificationInstruction.class,
174 L0ModificationInstruction.L0SubType.class,
175 L0ModificationInstruction.ModLambdaInstruction.class,
176 L2ModificationInstruction.class,
177 L2ModificationInstruction.L2SubType.class,
178 L2ModificationInstruction.ModEtherInstruction.class,
179 L2ModificationInstruction.PushHeaderInstructions.class,
180 L2ModificationInstruction.ModVlanIdInstruction.class,
181 L2ModificationInstruction.ModVlanPcpInstruction.class,
182 L2ModificationInstruction.ModMplsLabelInstruction.class,
183 L2ModificationInstruction.ModMplsTtlInstruction.class,
184 L3ModificationInstruction.class,
185 L3ModificationInstruction.L3SubType.class,
186 L3ModificationInstruction.ModIPInstruction.class,
187 L3ModificationInstruction.ModIPv6FlowLabelInstruction.class,
188 L3ModificationInstruction.ModTtlInstruction.class,
189 org.onlab.packet.MplsLabel.class
190 )
191 .register(org.onosproject.cluster.NodeId.class)
192 .register(KryoNamespaces.BASIC)
193 .register(KryoNamespaces.MISC);
194
195 messageHandlingExecutor = Executors.
196 newFixedThreadPool(MESSAGE_HANDLER_THREAD_POOL_SIZE,
197 groupedThreads("onos/store/group",
198 "message-handlers"));
Madan Jampani01e05fb2015-08-13 13:29:36 -0700199
200 clusterCommunicator.addSubscriber(GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST,
201 kryoBuilder.build()::deserialize,
202 this::process,
203 messageHandlingExecutor);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700204
205 log.debug("Creating EC map groupstorekeymap");
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700206 EventuallyConsistentMapBuilder<GroupStoreKeyMapKey, StoredGroupEntry>
207 keyMapBuilder = storageService.eventuallyConsistentMapBuilder();
208
209 groupStoreEntriesByKey = keyMapBuilder
210 .withName("groupstorekeymap")
211 .withSerializer(kryoBuilder)
Madan Jampanibcf1a482015-06-24 19:05:56 -0700212 .withTimestampProvider((k, v) -> new MultiValuedTimestamp<>(System.currentTimeMillis(),
213 sequenceNumber.getAndIncrement()))
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700214 .build();
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700215 groupStoreEntriesByKey.addListener(new GroupStoreKeyMapListener());
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700216 log.debug("Current size of groupstorekeymap:{}",
217 groupStoreEntriesByKey.size());
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700218
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700219 log.debug("Creating EC map pendinggroupkeymap");
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700220 EventuallyConsistentMapBuilder<GroupStoreKeyMapKey, StoredGroupEntry>
221 auditMapBuilder = storageService.eventuallyConsistentMapBuilder();
222
223 auditPendingReqQueue = auditMapBuilder
224 .withName("pendinggroupkeymap")
225 .withSerializer(kryoBuilder)
Madan Jampanibcf1a482015-06-24 19:05:56 -0700226 .withTimestampProvider((k, v) -> new MultiValuedTimestamp<>(System.currentTimeMillis(),
227 sequenceNumber.getAndIncrement()))
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700228 .build();
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700229 log.debug("Current size of pendinggroupkeymap:{}",
230 auditPendingReqQueue.size());
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700231
alshabib10580802015-02-18 18:30:33 -0800232 log.info("Started");
233 }
234
235 @Deactivate
236 public void deactivate() {
Thomas Vachuska152f9fd2015-04-02 16:28:13 -0700237 groupStoreEntriesByKey.destroy();
Thomas Vachuska152f9fd2015-04-02 16:28:13 -0700238 auditPendingReqQueue.destroy();
alshabib10580802015-02-18 18:30:33 -0800239 log.info("Stopped");
240 }
241
alshabib10580802015-02-18 18:30:33 -0800242 private static NewConcurrentHashMap<GroupId, Group>
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700243 lazyEmptyExtraneousGroupIdTable() {
alshabib10580802015-02-18 18:30:33 -0800244 return NewConcurrentHashMap.<GroupId, Group>ifNeeded();
245 }
246
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700247 private static NewConcurrentHashMap<GroupId, StoredGroupEntry>
248 lazyEmptyGroupIdTable() {
249 return NewConcurrentHashMap.<GroupId, StoredGroupEntry>ifNeeded();
250 }
251
alshabib10580802015-02-18 18:30:33 -0800252 /**
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700253 * Returns the group store eventual consistent key map.
alshabib10580802015-02-18 18:30:33 -0800254 *
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700255 * @return Map representing group key table.
alshabib10580802015-02-18 18:30:33 -0800256 */
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700257 private EventuallyConsistentMap<GroupStoreKeyMapKey, StoredGroupEntry>
258 getGroupStoreKeyMap() {
259 return groupStoreEntriesByKey;
alshabib10580802015-02-18 18:30:33 -0800260 }
261
262 /**
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700263 * Returns the group id table for specified device.
alshabib10580802015-02-18 18:30:33 -0800264 *
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700265 * @param deviceId identifier of the device
266 * @return Map representing group key table of given device.
alshabib10580802015-02-18 18:30:33 -0800267 */
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700268 private ConcurrentMap<GroupId, StoredGroupEntry> getGroupIdTable(DeviceId deviceId) {
269 return createIfAbsentUnchecked(groupEntriesById,
270 deviceId, lazyEmptyGroupIdTable());
alshabib10580802015-02-18 18:30:33 -0800271 }
272
273 /**
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700274 * Returns the pending group request table.
alshabib10580802015-02-18 18:30:33 -0800275 *
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700276 * @return Map representing group key table.
alshabib10580802015-02-18 18:30:33 -0800277 */
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700278 private EventuallyConsistentMap<GroupStoreKeyMapKey, StoredGroupEntry>
279 getPendingGroupKeyTable() {
280 return auditPendingReqQueue;
alshabib10580802015-02-18 18:30:33 -0800281 }
282
283 /**
284 * Returns the extraneous group id table for specified device.
285 *
286 * @param deviceId identifier of the device
287 * @return Map representing group key table of given device.
288 */
289 private ConcurrentMap<GroupId, Group>
290 getExtraneousGroupIdTable(DeviceId deviceId) {
291 return createIfAbsentUnchecked(extraneousGroupEntriesById,
292 deviceId,
293 lazyEmptyExtraneousGroupIdTable());
294 }
295
296 /**
297 * Returns the number of groups for the specified device in the store.
298 *
299 * @return number of groups for the specified device
300 */
301 @Override
302 public int getGroupCount(DeviceId deviceId) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700303 return (getGroups(deviceId) != null) ?
304 Iterables.size(getGroups(deviceId)) : 0;
alshabib10580802015-02-18 18:30:33 -0800305 }
306
307 /**
308 * Returns the groups associated with a device.
309 *
310 * @param deviceId the device ID
311 *
312 * @return the group entries
313 */
314 @Override
315 public Iterable<Group> getGroups(DeviceId deviceId) {
316 // flatten and make iterator unmodifiable
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700317 log.debug("getGroups: for device {} total number of groups {}",
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700318 deviceId, getGroupStoreKeyMap().values().size());
319 return FluentIterable.from(getGroupStoreKeyMap().values())
320 .filter(input -> input.deviceId().equals(deviceId))
321 .transform(input -> input);
alshabib10580802015-02-18 18:30:33 -0800322 }
323
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700324 private Iterable<StoredGroupEntry> getStoredGroups(DeviceId deviceId) {
325 // flatten and make iterator unmodifiable
326 log.debug("getGroups: for device {} total number of groups {}",
327 deviceId, getGroupStoreKeyMap().values().size());
328 return FluentIterable.from(getGroupStoreKeyMap().values())
329 .filter(input -> input.deviceId().equals(deviceId));
330 }
331
alshabib10580802015-02-18 18:30:33 -0800332 /**
333 * Returns the stored group entry.
334 *
335 * @param deviceId the device ID
336 * @param appCookie the group key
337 *
338 * @return a group associated with the key
339 */
340 @Override
341 public Group getGroup(DeviceId deviceId, GroupKey appCookie) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700342 return getStoredGroupEntry(deviceId, appCookie);
343 }
344
345 private StoredGroupEntry getStoredGroupEntry(DeviceId deviceId,
346 GroupKey appCookie) {
347 return getGroupStoreKeyMap().get(new GroupStoreKeyMapKey(deviceId,
348 appCookie));
349 }
350
351 @Override
352 public Group getGroup(DeviceId deviceId, GroupId groupId) {
353 return getStoredGroupEntry(deviceId, groupId);
354 }
355
356 private StoredGroupEntry getStoredGroupEntry(DeviceId deviceId,
357 GroupId groupId) {
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700358 return getGroupIdTable(deviceId).get(groupId);
alshabib10580802015-02-18 18:30:33 -0800359 }
360
361 private int getFreeGroupIdValue(DeviceId deviceId) {
362 int freeId = groupIdGen.incrementAndGet();
363
364 while (true) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700365 Group existing = getGroup(deviceId, new DefaultGroupId(freeId));
alshabib10580802015-02-18 18:30:33 -0800366 if (existing == null) {
367 existing = (
368 extraneousGroupEntriesById.get(deviceId) != null) ?
369 extraneousGroupEntriesById.get(deviceId).
370 get(new DefaultGroupId(freeId)) :
371 null;
372 }
373 if (existing != null) {
374 freeId = groupIdGen.incrementAndGet();
375 } else {
376 break;
377 }
378 }
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700379 log.debug("getFreeGroupIdValue: Next Free ID is {}", freeId);
alshabib10580802015-02-18 18:30:33 -0800380 return freeId;
381 }
382
383 /**
384 * Stores a new group entry using the information from group description.
385 *
386 * @param groupDesc group description to be used to create group entry
387 */
388 @Override
389 public void storeGroupDescription(GroupDescription groupDesc) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700390 log.debug("In storeGroupDescription");
alshabib10580802015-02-18 18:30:33 -0800391 // Check if a group is existing with the same key
392 if (getGroup(groupDesc.deviceId(), groupDesc.appCookie()) != null) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700393 log.warn("Group already exists with the same key {}",
394 groupDesc.appCookie());
alshabib10580802015-02-18 18:30:33 -0800395 return;
396 }
397
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700398 // Check if group to be created by a remote instance
Madan Jampani175e8fd2015-05-20 14:10:45 -0700399 if (mastershipService.getLocalRole(groupDesc.deviceId()) != MastershipRole.MASTER) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700400 log.debug("storeGroupDescription: Device {} local role is not MASTER",
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700401 groupDesc.deviceId());
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700402 if (mastershipService.getMasterFor(groupDesc.deviceId()) == null) {
403 log.error("No Master for device {}..."
404 + "Can not perform add group operation",
405 groupDesc.deviceId());
406 //TODO: Send Group operation failure event
407 return;
408 }
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700409 GroupStoreMessage groupOp = GroupStoreMessage.
410 createGroupAddRequestMsg(groupDesc.deviceId(),
411 groupDesc);
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700412
Madan Jampani175e8fd2015-05-20 14:10:45 -0700413 clusterCommunicator.unicast(groupOp,
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700414 GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST,
415 m -> kryoBuilder.build().serialize(m),
Madan Jampani175e8fd2015-05-20 14:10:45 -0700416 mastershipService.getMasterFor(groupDesc.deviceId())).whenComplete((result, error) -> {
417 if (error != null) {
418 log.warn("Failed to send request to master: {} to {}",
419 groupOp,
420 mastershipService.getMasterFor(groupDesc.deviceId()));
421 //TODO: Send Group operation failure event
422 } else {
423 log.debug("Sent Group operation request for device {} "
424 + "to remote MASTER {}",
425 groupDesc.deviceId(),
426 mastershipService.getMasterFor(groupDesc.deviceId()));
427 }
428 });
alshabib10580802015-02-18 18:30:33 -0800429 return;
430 }
431
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700432 log.debug("Store group for device {} is getting handled locally",
433 groupDesc.deviceId());
alshabib10580802015-02-18 18:30:33 -0800434 storeGroupDescriptionInternal(groupDesc);
435 }
436
Srikanth Vavilapallie48b3cf2015-07-06 11:43:07 -0700437 private Group getMatchingExtraneousGroupbyId(DeviceId deviceId, Integer groupId) {
438 ConcurrentMap<GroupId, Group> extraneousMap =
439 extraneousGroupEntriesById.get(deviceId);
440 if (extraneousMap == null) {
441 return null;
442 }
443 return extraneousMap.get(new DefaultGroupId(groupId));
444 }
445
446 private Group getMatchingExtraneousGroupbyBuckets(DeviceId deviceId,
447 GroupBuckets buckets) {
448 ConcurrentMap<GroupId, Group> extraneousMap =
449 extraneousGroupEntriesById.get(deviceId);
450 if (extraneousMap == null) {
451 return null;
452 }
453
454 for (Group extraneousGroup:extraneousMap.values()) {
455 if (extraneousGroup.buckets().equals(buckets)) {
456 return extraneousGroup;
457 }
458 }
459 return null;
460 }
461
alshabib10580802015-02-18 18:30:33 -0800462 private void storeGroupDescriptionInternal(GroupDescription groupDesc) {
463 // Check if a group is existing with the same key
464 if (getGroup(groupDesc.deviceId(), groupDesc.appCookie()) != null) {
465 return;
466 }
467
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700468 if (deviceAuditStatus.get(groupDesc.deviceId()) == null) {
469 // Device group audit has not completed yet
470 // Add this group description to pending group key table
471 // Create a group entry object with Dummy Group ID
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700472 log.debug("storeGroupDescriptionInternal: Device {} AUDIT pending...Queuing Group ADD request",
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700473 groupDesc.deviceId());
474 StoredGroupEntry group = new DefaultGroup(dummyGroupId, groupDesc);
475 group.setState(GroupState.WAITING_AUDIT_COMPLETE);
476 EventuallyConsistentMap<GroupStoreKeyMapKey, StoredGroupEntry> pendingKeyTable =
477 getPendingGroupKeyTable();
478 pendingKeyTable.put(new GroupStoreKeyMapKey(groupDesc.deviceId(),
479 groupDesc.appCookie()),
480 group);
481 return;
482 }
483
Srikanth Vavilapallie48b3cf2015-07-06 11:43:07 -0700484 Group matchingExtraneousGroup = null;
485 if (groupDesc.givenGroupId() != null) {
486 //Check if there is a extraneous group existing with the same Id
487 matchingExtraneousGroup = getMatchingExtraneousGroupbyId(
488 groupDesc.deviceId(), groupDesc.givenGroupId());
489 if (matchingExtraneousGroup != null) {
490 log.debug("storeGroupDescriptionInternal: Matching extraneous group found in Device {} for group id {}",
491 groupDesc.deviceId(),
492 groupDesc.givenGroupId());
493 //Check if the group buckets matches with user provided buckets
494 if (matchingExtraneousGroup.buckets().equals(groupDesc.buckets())) {
495 //Group is already existing with the same buckets and Id
496 // Create a group entry object
497 log.debug("storeGroupDescriptionInternal: Buckets also matching in Device {} for group id {}",
498 groupDesc.deviceId(),
499 groupDesc.givenGroupId());
500 StoredGroupEntry group = new DefaultGroup(
501 matchingExtraneousGroup.id(), groupDesc);
502 // Insert the newly created group entry into key and id maps
503 getGroupStoreKeyMap().
504 put(new GroupStoreKeyMapKey(groupDesc.deviceId(),
505 groupDesc.appCookie()), group);
506 // Ensure it also inserted into group id based table to
507 // avoid any chances of duplication in group id generation
508 getGroupIdTable(groupDesc.deviceId()).
509 put(matchingExtraneousGroup.id(), group);
510 addOrUpdateGroupEntry(matchingExtraneousGroup);
511 removeExtraneousGroupEntry(matchingExtraneousGroup);
512 return;
513 } else {
514 //Group buckets are not matching. Update group
515 //with user provided buckets.
516 //TODO
517 log.debug("storeGroupDescriptionInternal: Buckets are not matching in Device {} for group id {}",
518 groupDesc.deviceId(),
519 groupDesc.givenGroupId());
520 }
521 }
522 } else {
523 //Check if there is an extraneous group with user provided buckets
524 matchingExtraneousGroup = getMatchingExtraneousGroupbyBuckets(
525 groupDesc.deviceId(), groupDesc.buckets());
526 if (matchingExtraneousGroup != null) {
527 //Group is already existing with the same buckets.
528 //So reuse this group.
529 log.debug("storeGroupDescriptionInternal: Matching extraneous group found in Device {}",
530 groupDesc.deviceId());
531 //Create a group entry object
532 StoredGroupEntry group = new DefaultGroup(
533 matchingExtraneousGroup.id(), groupDesc);
534 // Insert the newly created group entry into key and id maps
535 getGroupStoreKeyMap().
536 put(new GroupStoreKeyMapKey(groupDesc.deviceId(),
537 groupDesc.appCookie()), group);
538 // Ensure it also inserted into group id based table to
539 // avoid any chances of duplication in group id generation
540 getGroupIdTable(groupDesc.deviceId()).
541 put(matchingExtraneousGroup.id(), group);
542 addOrUpdateGroupEntry(matchingExtraneousGroup);
543 removeExtraneousGroupEntry(matchingExtraneousGroup);
544 return;
545 } else {
546 //TODO: Check if there are any empty groups that can be used here
547 log.debug("storeGroupDescriptionInternal: No matching extraneous groups found in Device {}",
548 groupDesc.deviceId());
549 }
550 }
551
Saurav Das100e3b82015-04-30 11:12:10 -0700552 GroupId id = null;
553 if (groupDesc.givenGroupId() == null) {
554 // Get a new group identifier
555 id = new DefaultGroupId(getFreeGroupIdValue(groupDesc.deviceId()));
556 } else {
557 id = new DefaultGroupId(groupDesc.givenGroupId());
558 }
alshabib10580802015-02-18 18:30:33 -0800559 // Create a group entry object
560 StoredGroupEntry group = new DefaultGroup(id, groupDesc);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700561 // Insert the newly created group entry into key and id maps
562 getGroupStoreKeyMap().
563 put(new GroupStoreKeyMapKey(groupDesc.deviceId(),
564 groupDesc.appCookie()), group);
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700565 // Ensure it also inserted into group id based table to
566 // avoid any chances of duplication in group id generation
567 getGroupIdTable(groupDesc.deviceId()).
568 put(id, group);
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700569 log.debug("storeGroupDescriptionInternal: Processing Group ADD request for Id {} in device {}",
570 id,
571 groupDesc.deviceId());
alshabib10580802015-02-18 18:30:33 -0800572 notifyDelegate(new GroupEvent(GroupEvent.Type.GROUP_ADD_REQUESTED,
573 group));
574 }
575
576 /**
577 * Updates the existing group entry with the information
578 * from group description.
579 *
580 * @param deviceId the device ID
581 * @param oldAppCookie the current group key
582 * @param type update type
583 * @param newBuckets group buckets for updates
584 * @param newAppCookie optional new group key
585 */
586 @Override
587 public void updateGroupDescription(DeviceId deviceId,
588 GroupKey oldAppCookie,
589 UpdateType type,
590 GroupBuckets newBuckets,
591 GroupKey newAppCookie) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700592 // Check if group update to be done by a remote instance
sangho52abe3a2015-05-05 14:13:34 -0700593 if (mastershipService.getMasterFor(deviceId) != null &&
594 mastershipService.getLocalRole(deviceId) != MastershipRole.MASTER) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700595 log.debug("updateGroupDescription: Device {} local role is not MASTER",
596 deviceId);
597 if (mastershipService.getMasterFor(deviceId) == null) {
598 log.error("No Master for device {}..."
599 + "Can not perform update group operation",
600 deviceId);
601 //TODO: Send Group operation failure event
602 return;
603 }
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700604 GroupStoreMessage groupOp = GroupStoreMessage.
605 createGroupUpdateRequestMsg(deviceId,
606 oldAppCookie,
607 type,
608 newBuckets,
609 newAppCookie);
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700610
Madan Jampani175e8fd2015-05-20 14:10:45 -0700611 clusterCommunicator.unicast(groupOp,
612 GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST,
613 m -> kryoBuilder.build().serialize(m),
614 mastershipService.getMasterFor(deviceId)).whenComplete((result, error) -> {
615 if (error != null) {
616 log.warn("Failed to send request to master: {} to {}",
617 groupOp,
618 mastershipService.getMasterFor(deviceId), error);
619 }
620 //TODO: Send Group operation failure event
621 });
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700622 return;
623 }
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700624 log.debug("updateGroupDescription for device {} is getting handled locally",
625 deviceId);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700626 updateGroupDescriptionInternal(deviceId,
627 oldAppCookie,
628 type,
629 newBuckets,
630 newAppCookie);
631 }
632
633 private void updateGroupDescriptionInternal(DeviceId deviceId,
634 GroupKey oldAppCookie,
635 UpdateType type,
636 GroupBuckets newBuckets,
637 GroupKey newAppCookie) {
alshabib10580802015-02-18 18:30:33 -0800638 // Check if a group is existing with the provided key
639 Group oldGroup = getGroup(deviceId, oldAppCookie);
640 if (oldGroup == null) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700641 log.warn("updateGroupDescriptionInternal: Group not found...strange");
alshabib10580802015-02-18 18:30:33 -0800642 return;
643 }
644
645 List<GroupBucket> newBucketList = getUpdatedBucketList(oldGroup,
646 type,
647 newBuckets);
648 if (newBucketList != null) {
649 // Create a new group object from the old group
650 GroupBuckets updatedBuckets = new GroupBuckets(newBucketList);
651 GroupKey newCookie = (newAppCookie != null) ? newAppCookie : oldAppCookie;
652 GroupDescription updatedGroupDesc = new DefaultGroupDescription(
653 oldGroup.deviceId(),
654 oldGroup.type(),
655 updatedBuckets,
656 newCookie,
Saurav Das100e3b82015-04-30 11:12:10 -0700657 oldGroup.givenGroupId(),
alshabib10580802015-02-18 18:30:33 -0800658 oldGroup.appId());
659 StoredGroupEntry newGroup = new DefaultGroup(oldGroup.id(),
660 updatedGroupDesc);
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700661 log.debug("updateGroupDescriptionInternal: group entry {} in device {} moving from {} to PENDING_UPDATE",
662 oldGroup.id(),
663 oldGroup.deviceId(),
664 oldGroup.state());
alshabib10580802015-02-18 18:30:33 -0800665 newGroup.setState(GroupState.PENDING_UPDATE);
666 newGroup.setLife(oldGroup.life());
667 newGroup.setPackets(oldGroup.packets());
668 newGroup.setBytes(oldGroup.bytes());
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700669 //Update the group entry in groupkey based map.
670 //Update to groupid based map will happen in the
671 //groupkey based map update listener
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700672 log.debug("updateGroupDescriptionInternal with type {}: Group updated with buckets",
673 type);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700674 getGroupStoreKeyMap().
675 put(new GroupStoreKeyMapKey(newGroup.deviceId(),
676 newGroup.appCookie()), newGroup);
alshabib10580802015-02-18 18:30:33 -0800677 notifyDelegate(new GroupEvent(Type.GROUP_UPDATE_REQUESTED, newGroup));
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700678 } else {
679 log.warn("updateGroupDescriptionInternal with type {}: No "
680 + "change in the buckets in update", type);
alshabib10580802015-02-18 18:30:33 -0800681 }
682 }
683
684 private List<GroupBucket> getUpdatedBucketList(Group oldGroup,
685 UpdateType type,
686 GroupBuckets buckets) {
687 GroupBuckets oldBuckets = oldGroup.buckets();
688 List<GroupBucket> newBucketList = new ArrayList<GroupBucket>(
689 oldBuckets.buckets());
690 boolean groupDescUpdated = false;
691
692 if (type == UpdateType.ADD) {
693 // Check if the any of the new buckets are part of
694 // the old bucket list
695 for (GroupBucket addBucket:buckets.buckets()) {
696 if (!newBucketList.contains(addBucket)) {
697 newBucketList.add(addBucket);
698 groupDescUpdated = true;
699 }
700 }
701 } else if (type == UpdateType.REMOVE) {
702 // Check if the to be removed buckets are part of the
703 // old bucket list
704 for (GroupBucket removeBucket:buckets.buckets()) {
705 if (newBucketList.contains(removeBucket)) {
706 newBucketList.remove(removeBucket);
707 groupDescUpdated = true;
708 }
709 }
710 }
711
712 if (groupDescUpdated) {
713 return newBucketList;
714 } else {
715 return null;
716 }
717 }
718
719 /**
720 * Triggers deleting the existing group entry.
721 *
722 * @param deviceId the device ID
723 * @param appCookie the group key
724 */
725 @Override
726 public void deleteGroupDescription(DeviceId deviceId,
727 GroupKey appCookie) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700728 // Check if group to be deleted by a remote instance
729 if (mastershipService.
730 getLocalRole(deviceId) != MastershipRole.MASTER) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700731 log.debug("deleteGroupDescription: Device {} local role is not MASTER",
732 deviceId);
733 if (mastershipService.getMasterFor(deviceId) == null) {
734 log.error("No Master for device {}..."
735 + "Can not perform delete group operation",
736 deviceId);
737 //TODO: Send Group operation failure event
738 return;
739 }
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700740 GroupStoreMessage groupOp = GroupStoreMessage.
741 createGroupDeleteRequestMsg(deviceId,
742 appCookie);
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700743
Madan Jampani175e8fd2015-05-20 14:10:45 -0700744 clusterCommunicator.unicast(groupOp,
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700745 GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST,
746 m -> kryoBuilder.build().serialize(m),
Madan Jampani175e8fd2015-05-20 14:10:45 -0700747 mastershipService.getMasterFor(deviceId)).whenComplete((result, error) -> {
748 if (error != null) {
749 log.warn("Failed to send request to master: {} to {}",
750 groupOp,
751 mastershipService.getMasterFor(deviceId), error);
752 }
753 //TODO: Send Group operation failure event
754 });
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700755 return;
756 }
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700757 log.debug("deleteGroupDescription in device {} is getting handled locally",
758 deviceId);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700759 deleteGroupDescriptionInternal(deviceId, appCookie);
760 }
761
762 private void deleteGroupDescriptionInternal(DeviceId deviceId,
763 GroupKey appCookie) {
alshabib10580802015-02-18 18:30:33 -0800764 // Check if a group is existing with the provided key
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700765 StoredGroupEntry existing = getStoredGroupEntry(deviceId, appCookie);
alshabib10580802015-02-18 18:30:33 -0800766 if (existing == null) {
767 return;
768 }
769
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700770 log.debug("deleteGroupDescriptionInternal: group entry {} in device {} moving from {} to PENDING_DELETE",
771 existing.id(),
772 existing.deviceId(),
773 existing.state());
alshabib10580802015-02-18 18:30:33 -0800774 synchronized (existing) {
775 existing.setState(GroupState.PENDING_DELETE);
776 }
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700777 log.debug("deleteGroupDescriptionInternal: in device {} issuing GROUP_REMOVE_REQUESTED",
778 deviceId);
alshabib10580802015-02-18 18:30:33 -0800779 notifyDelegate(new GroupEvent(Type.GROUP_REMOVE_REQUESTED, existing));
780 }
781
782 /**
783 * Stores a new group entry, or updates an existing entry.
784 *
785 * @param group group entry
786 */
787 @Override
788 public void addOrUpdateGroupEntry(Group group) {
789 // check if this new entry is an update to an existing entry
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700790 StoredGroupEntry existing = getStoredGroupEntry(group.deviceId(),
791 group.id());
alshabib10580802015-02-18 18:30:33 -0800792 GroupEvent event = null;
793
794 if (existing != null) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700795 log.debug("addOrUpdateGroupEntry: updating group entry {} in device {}",
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700796 group.id(),
797 group.deviceId());
alshabib10580802015-02-18 18:30:33 -0800798 synchronized (existing) {
Srikanth Vavilapalli10e75cd2015-04-13 16:21:24 -0700799 for (GroupBucket bucket:group.buckets().buckets()) {
Sho SHIMIZU30d639b2015-05-05 09:30:35 -0700800 Optional<GroupBucket> matchingBucket =
Srikanth Vavilapalli10e75cd2015-04-13 16:21:24 -0700801 existing.buckets().buckets()
802 .stream()
803 .filter((existingBucket)->(existingBucket.equals(bucket)))
804 .findFirst();
805 if (matchingBucket.isPresent()) {
806 ((StoredGroupBucketEntry) matchingBucket.
807 get()).setPackets(bucket.packets());
808 ((StoredGroupBucketEntry) matchingBucket.
809 get()).setBytes(bucket.bytes());
810 } else {
811 log.warn("addOrUpdateGroupEntry: No matching "
812 + "buckets to update stats");
813 }
814 }
alshabib10580802015-02-18 18:30:33 -0800815 existing.setLife(group.life());
816 existing.setPackets(group.packets());
817 existing.setBytes(group.bytes());
Srikanth Vavilapalli5428b6c2015-05-14 20:22:47 -0700818 if ((existing.state() == GroupState.PENDING_ADD) ||
819 (existing.state() == GroupState.PENDING_ADD_RETRY)) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700820 log.debug("addOrUpdateGroupEntry: group entry {} in device {} moving from {} to ADDED",
821 existing.id(),
822 existing.deviceId(),
Srikanth Vavilapalli5428b6c2015-05-14 20:22:47 -0700823 existing.state());
alshabib10580802015-02-18 18:30:33 -0800824 existing.setState(GroupState.ADDED);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700825 existing.setIsGroupStateAddedFirstTime(true);
alshabib10580802015-02-18 18:30:33 -0800826 event = new GroupEvent(Type.GROUP_ADDED, existing);
827 } else {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700828 log.debug("addOrUpdateGroupEntry: group entry {} in device {} moving from {} to ADDED",
829 existing.id(),
830 existing.deviceId(),
831 GroupState.PENDING_UPDATE);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700832 existing.setState(GroupState.ADDED);
833 existing.setIsGroupStateAddedFirstTime(false);
alshabib10580802015-02-18 18:30:33 -0800834 event = new GroupEvent(Type.GROUP_UPDATED, existing);
835 }
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700836 //Re-PUT map entries to trigger map update events
837 getGroupStoreKeyMap().
838 put(new GroupStoreKeyMapKey(existing.deviceId(),
839 existing.appCookie()), existing);
alshabib10580802015-02-18 18:30:33 -0800840 }
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700841 } else {
842 log.warn("addOrUpdateGroupEntry: Group update "
843 + "happening for a non-existing entry in the map");
alshabib10580802015-02-18 18:30:33 -0800844 }
845
846 if (event != null) {
847 notifyDelegate(event);
848 }
849 }
850
851 /**
852 * Removes the group entry from store.
853 *
854 * @param group group entry
855 */
856 @Override
857 public void removeGroupEntry(Group group) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700858 StoredGroupEntry existing = getStoredGroupEntry(group.deviceId(),
859 group.id());
alshabib10580802015-02-18 18:30:33 -0800860
861 if (existing != null) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700862 log.debug("removeGroupEntry: removing group entry {} in device {}",
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700863 group.id(),
864 group.deviceId());
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700865 //Removal from groupid based map will happen in the
866 //map update listener
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700867 getGroupStoreKeyMap().remove(new GroupStoreKeyMapKey(existing.deviceId(),
868 existing.appCookie()));
alshabib10580802015-02-18 18:30:33 -0800869 notifyDelegate(new GroupEvent(Type.GROUP_REMOVED, existing));
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700870 } else {
871 log.warn("removeGroupEntry for {} in device{} is "
872 + "not existing in our maps",
873 group.id(),
874 group.deviceId());
alshabib10580802015-02-18 18:30:33 -0800875 }
876 }
877
878 @Override
879 public void deviceInitialAuditCompleted(DeviceId deviceId,
880 boolean completed) {
881 synchronized (deviceAuditStatus) {
882 if (completed) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700883 log.debug("AUDIT completed for device {}",
884 deviceId);
alshabib10580802015-02-18 18:30:33 -0800885 deviceAuditStatus.put(deviceId, true);
886 // Execute all pending group requests
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700887 List<StoredGroupEntry> pendingGroupRequests =
888 getPendingGroupKeyTable().values()
889 .stream()
890 .filter(g-> g.deviceId().equals(deviceId))
891 .collect(Collectors.toList());
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700892 log.debug("processing pending group add requests for device {} and number of pending requests {}",
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700893 deviceId,
894 pendingGroupRequests.size());
895 for (Group group:pendingGroupRequests) {
alshabib10580802015-02-18 18:30:33 -0800896 GroupDescription tmp = new DefaultGroupDescription(
897 group.deviceId(),
898 group.type(),
899 group.buckets(),
900 group.appCookie(),
Saurav Das100e3b82015-04-30 11:12:10 -0700901 group.givenGroupId(),
alshabib10580802015-02-18 18:30:33 -0800902 group.appId());
903 storeGroupDescriptionInternal(tmp);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700904 getPendingGroupKeyTable().
905 remove(new GroupStoreKeyMapKey(deviceId, group.appCookie()));
alshabib10580802015-02-18 18:30:33 -0800906 }
alshabib10580802015-02-18 18:30:33 -0800907 } else {
Thomas Vachuskac40d4632015-04-09 16:55:03 -0700908 Boolean audited = deviceAuditStatus.get(deviceId);
909 if (audited != null && audited) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700910 log.debug("Clearing AUDIT status for device {}", deviceId);
alshabib10580802015-02-18 18:30:33 -0800911 deviceAuditStatus.put(deviceId, false);
912 }
913 }
914 }
915 }
916
917 @Override
918 public boolean deviceInitialAuditStatus(DeviceId deviceId) {
919 synchronized (deviceAuditStatus) {
Thomas Vachuskac40d4632015-04-09 16:55:03 -0700920 Boolean audited = deviceAuditStatus.get(deviceId);
921 return audited != null && audited;
alshabib10580802015-02-18 18:30:33 -0800922 }
923 }
924
925 @Override
926 public void groupOperationFailed(DeviceId deviceId, GroupOperation operation) {
927
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700928 StoredGroupEntry existing = getStoredGroupEntry(deviceId,
929 operation.groupId());
alshabib10580802015-02-18 18:30:33 -0800930
931 if (existing == null) {
932 log.warn("No group entry with ID {} found ", operation.groupId());
933 return;
934 }
935
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700936 log.warn("groupOperationFailed: group operation {} failed"
937 + "for group {} in device {}",
938 operation.opType(),
939 existing.id(),
940 existing.deviceId());
alshabib10580802015-02-18 18:30:33 -0800941 switch (operation.opType()) {
942 case ADD:
Srikanth Vavilapalli5428b6c2015-05-14 20:22:47 -0700943 if (existing.state() == GroupState.PENDING_ADD) {
944 //TODO: Need to add support for passing the group
945 //operation failure reason from group provider.
946 //If the error type is anything other than GROUP_EXISTS,
947 //then the GROUP_ADD_FAILED event should be raised even
948 //in PENDING_ADD_RETRY state also.
949 notifyDelegate(new GroupEvent(Type.GROUP_ADD_FAILED, existing));
950 log.warn("groupOperationFailed: cleaningup "
951 + "group {} from store in device {}....",
952 existing.id(),
953 existing.deviceId());
954 //Removal from groupid based map will happen in the
955 //map update listener
956 getGroupStoreKeyMap().remove(new GroupStoreKeyMapKey(existing.deviceId(),
957 existing.appCookie()));
958 }
alshabib10580802015-02-18 18:30:33 -0800959 break;
960 case MODIFY:
961 notifyDelegate(new GroupEvent(Type.GROUP_UPDATE_FAILED, existing));
962 break;
963 case DELETE:
964 notifyDelegate(new GroupEvent(Type.GROUP_REMOVE_FAILED, existing));
965 break;
966 default:
967 log.warn("Unknown group operation type {}", operation.opType());
968 }
alshabib10580802015-02-18 18:30:33 -0800969 }
970
971 @Override
972 public void addOrUpdateExtraneousGroupEntry(Group group) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700973 log.debug("add/update extraneous group entry {} in device {}",
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700974 group.id(),
975 group.deviceId());
alshabib10580802015-02-18 18:30:33 -0800976 ConcurrentMap<GroupId, Group> extraneousIdTable =
977 getExtraneousGroupIdTable(group.deviceId());
978 extraneousIdTable.put(group.id(), group);
Srikanth Vavilapallie48b3cf2015-07-06 11:43:07 -0700979 // Don't remove the extraneous groups, instead re-use it when
980 // a group request comes with the same set of buckets
alshabib10580802015-02-18 18:30:33 -0800981 }
982
983 @Override
984 public void removeExtraneousGroupEntry(Group group) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700985 log.debug("remove extraneous group entry {} of device {} from store",
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700986 group.id(),
987 group.deviceId());
alshabib10580802015-02-18 18:30:33 -0800988 ConcurrentMap<GroupId, Group> extraneousIdTable =
989 getExtraneousGroupIdTable(group.deviceId());
990 extraneousIdTable.remove(group.id());
991 }
992
993 @Override
994 public Iterable<Group> getExtraneousGroups(DeviceId deviceId) {
995 // flatten and make iterator unmodifiable
996 return FluentIterable.from(
997 getExtraneousGroupIdTable(deviceId).values());
998 }
999
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001000 /**
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -07001001 * Map handler to receive any events when the group key map is updated.
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001002 */
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -07001003 private class GroupStoreKeyMapListener implements
1004 EventuallyConsistentMapListener<GroupStoreKeyMapKey, StoredGroupEntry> {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001005
1006 @Override
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -07001007 public void event(EventuallyConsistentMapEvent<GroupStoreKeyMapKey,
Jonathan Hart6ec029a2015-03-24 17:12:35 -07001008 StoredGroupEntry> mapEvent) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001009 GroupEvent groupEvent = null;
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001010 GroupStoreKeyMapKey key = mapEvent.key();
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -07001011 StoredGroupEntry group = mapEvent.value();
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001012 if ((key == null) && (group == null)) {
1013 log.error("GroupStoreKeyMapListener: Received "
1014 + "event {} with null entry", mapEvent.type());
1015 return;
1016 } else if (group == null) {
1017 group = getGroupIdTable(key.deviceId()).values()
1018 .stream()
1019 .filter((storedGroup) -> (storedGroup.appCookie().equals(key.appCookie)))
1020 .findFirst().get();
1021 if (group == null) {
1022 log.error("GroupStoreKeyMapListener: Received "
1023 + "event {} with null entry... can not process", mapEvent.type());
1024 return;
1025 }
1026 }
1027 log.trace("received groupid map event {} for id {} in device {}",
1028 mapEvent.type(),
1029 group.id(),
1030 key.deviceId());
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001031 if (mapEvent.type() == EventuallyConsistentMapEvent.Type.PUT) {
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -07001032 // Update the group ID table
1033 getGroupIdTable(group.deviceId()).put(group.id(), group);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001034 if (mapEvent.value().state() == Group.GroupState.ADDED) {
1035 if (mapEvent.value().isGroupStateAddedFirstTime()) {
1036 groupEvent = new GroupEvent(Type.GROUP_ADDED,
1037 mapEvent.value());
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001038 log.trace("Received first time GROUP_ADDED state update for id {} in device {}",
1039 group.id(),
1040 group.deviceId());
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001041 } else {
1042 groupEvent = new GroupEvent(Type.GROUP_UPDATED,
1043 mapEvent.value());
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001044 log.trace("Received following GROUP_ADDED state update for id {} in device {}",
1045 group.id(),
1046 group.deviceId());
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001047 }
1048 }
1049 } else if (mapEvent.type() == EventuallyConsistentMapEvent.Type.REMOVE) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001050 groupEvent = new GroupEvent(Type.GROUP_REMOVED, group);
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -07001051 // Remove the entry from the group ID table
1052 getGroupIdTable(group.deviceId()).remove(group.id(), group);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001053 }
1054
1055 if (groupEvent != null) {
1056 notifyDelegate(groupEvent);
1057 }
1058 }
1059 }
Madan Jampani01e05fb2015-08-13 13:29:36 -07001060
1061 private void process(GroupStoreMessage groupOp) {
1062 log.debug("Received remote group operation {} request for device {}",
1063 groupOp.type(),
1064 groupOp.deviceId());
1065 if (!mastershipService.isLocalMaster(groupOp.deviceId())) {
1066 log.warn("This node is not MASTER for device {}", groupOp.deviceId());
1067 return;
1068 }
1069 if (groupOp.type() == GroupStoreMessage.Type.ADD) {
1070 storeGroupDescriptionInternal(groupOp.groupDesc());
1071 } else if (groupOp.type() == GroupStoreMessage.Type.UPDATE) {
1072 updateGroupDescriptionInternal(groupOp.deviceId(),
1073 groupOp.appCookie(),
1074 groupOp.updateType(),
1075 groupOp.updateBuckets(),
1076 groupOp.newAppCookie());
1077 } else if (groupOp.type() == GroupStoreMessage.Type.DELETE) {
1078 deleteGroupDescriptionInternal(groupOp.deviceId(),
1079 groupOp.appCookie());
1080 }
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001081 }
1082
1083 /**
1084 * Flattened map key to be used to store group entries.
1085 */
Ray Milkeyb3c5ce22015-08-10 09:07:36 -07001086 protected static class GroupStoreMapKey {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001087 private final DeviceId deviceId;
1088
1089 public GroupStoreMapKey(DeviceId deviceId) {
1090 this.deviceId = deviceId;
1091 }
1092
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001093 public DeviceId deviceId() {
1094 return deviceId;
1095 }
1096
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001097 @Override
1098 public boolean equals(Object o) {
1099 if (this == o) {
1100 return true;
1101 }
1102 if (!(o instanceof GroupStoreMapKey)) {
1103 return false;
1104 }
1105 GroupStoreMapKey that = (GroupStoreMapKey) o;
1106 return this.deviceId.equals(that.deviceId);
1107 }
1108
1109 @Override
1110 public int hashCode() {
1111 int result = 17;
1112
1113 result = 31 * result + Objects.hash(this.deviceId);
1114
1115 return result;
1116 }
1117 }
1118
Ray Milkeyb3c5ce22015-08-10 09:07:36 -07001119 protected static class GroupStoreKeyMapKey extends GroupStoreMapKey {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001120 private final GroupKey appCookie;
1121 public GroupStoreKeyMapKey(DeviceId deviceId,
1122 GroupKey appCookie) {
1123 super(deviceId);
1124 this.appCookie = appCookie;
1125 }
1126
1127 @Override
1128 public boolean equals(Object o) {
1129 if (this == o) {
1130 return true;
1131 }
1132 if (!(o instanceof GroupStoreKeyMapKey)) {
1133 return false;
1134 }
1135 GroupStoreKeyMapKey that = (GroupStoreKeyMapKey) o;
1136 return (super.equals(that) &&
1137 this.appCookie.equals(that.appCookie));
1138 }
1139
1140 @Override
1141 public int hashCode() {
1142 int result = 17;
1143
1144 result = 31 * result + super.hashCode() + Objects.hash(this.appCookie);
1145
1146 return result;
1147 }
1148 }
1149
Ray Milkeyb3c5ce22015-08-10 09:07:36 -07001150 protected static class GroupStoreIdMapKey extends GroupStoreMapKey {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001151 private final GroupId groupId;
1152 public GroupStoreIdMapKey(DeviceId deviceId,
1153 GroupId groupId) {
1154 super(deviceId);
1155 this.groupId = groupId;
1156 }
1157
1158 @Override
1159 public boolean equals(Object o) {
1160 if (this == o) {
1161 return true;
1162 }
1163 if (!(o instanceof GroupStoreIdMapKey)) {
1164 return false;
1165 }
1166 GroupStoreIdMapKey that = (GroupStoreIdMapKey) o;
1167 return (super.equals(that) &&
1168 this.groupId.equals(that.groupId));
1169 }
1170
1171 @Override
1172 public int hashCode() {
1173 int result = 17;
1174
1175 result = 31 * result + super.hashCode() + Objects.hash(this.groupId);
1176
1177 return result;
1178 }
1179 }
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001180
1181 @Override
1182 public void pushGroupMetrics(DeviceId deviceId,
1183 Collection<Group> groupEntries) {
1184 boolean deviceInitialAuditStatus =
1185 deviceInitialAuditStatus(deviceId);
1186 Set<Group> southboundGroupEntries =
1187 Sets.newHashSet(groupEntries);
1188 Set<StoredGroupEntry> storedGroupEntries =
1189 Sets.newHashSet(getStoredGroups(deviceId));
1190 Set<Group> extraneousStoredEntries =
1191 Sets.newHashSet(getExtraneousGroups(deviceId));
1192
1193 log.trace("pushGroupMetrics: Displaying all ({}) southboundGroupEntries for device {}",
1194 southboundGroupEntries.size(),
1195 deviceId);
1196 for (Iterator<Group> it = southboundGroupEntries.iterator(); it.hasNext();) {
1197 Group group = it.next();
1198 log.trace("Group {} in device {}", group, deviceId);
1199 }
1200
1201 log.trace("Displaying all ({}) stored group entries for device {}",
1202 storedGroupEntries.size(),
1203 deviceId);
1204 for (Iterator<StoredGroupEntry> it1 = storedGroupEntries.iterator();
1205 it1.hasNext();) {
1206 Group group = it1.next();
1207 log.trace("Stored Group {} for device {}", group, deviceId);
1208 }
1209
1210 for (Iterator<Group> it2 = southboundGroupEntries.iterator(); it2.hasNext();) {
1211 Group group = it2.next();
1212 if (storedGroupEntries.remove(group)) {
1213 // we both have the group, let's update some info then.
1214 log.trace("Group AUDIT: group {} exists in both planes for device {}",
1215 group.id(), deviceId);
1216 groupAdded(group);
1217 it2.remove();
1218 }
1219 }
1220 for (Group group : southboundGroupEntries) {
1221 if (getGroup(group.deviceId(), group.id()) != null) {
1222 // There is a group existing with the same id
1223 // It is possible that group update is
1224 // in progress while we got a stale info from switch
1225 if (!storedGroupEntries.remove(getGroup(
1226 group.deviceId(), group.id()))) {
1227 log.warn("Group AUDIT: Inconsistent state:"
1228 + "Group exists in ID based table while "
1229 + "not present in key based table");
1230 }
1231 } else {
1232 // there are groups in the switch that aren't in the store
1233 log.debug("Group AUDIT: extraneous group {} exists in data plane for device {}",
1234 group.id(), deviceId);
1235 extraneousStoredEntries.remove(group);
1236 extraneousGroup(group);
1237 }
1238 }
1239 for (Group group : storedGroupEntries) {
1240 // there are groups in the store that aren't in the switch
1241 log.debug("Group AUDIT: group {} missing in data plane for device {}",
1242 group.id(), deviceId);
1243 groupMissing(group);
1244 }
1245 for (Group group : extraneousStoredEntries) {
1246 // there are groups in the extraneous store that
1247 // aren't in the switch
1248 log.debug("Group AUDIT: clearing extransoeus group {} from store for device {}",
1249 group.id(), deviceId);
1250 removeExtraneousGroupEntry(group);
1251 }
1252
1253 if (!deviceInitialAuditStatus) {
1254 log.debug("Group AUDIT: Setting device {} initial AUDIT completed",
1255 deviceId);
1256 deviceInitialAuditCompleted(deviceId, true);
1257 }
1258 }
1259
1260 private void groupMissing(Group group) {
1261 switch (group.state()) {
1262 case PENDING_DELETE:
1263 log.debug("Group {} delete confirmation from device {}",
1264 group, group.deviceId());
1265 removeGroupEntry(group);
1266 break;
1267 case ADDED:
1268 case PENDING_ADD:
Srikanth Vavilapalli5428b6c2015-05-14 20:22:47 -07001269 case PENDING_ADD_RETRY:
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001270 case PENDING_UPDATE:
1271 log.debug("Group {} is in store but not on device {}",
1272 group, group.deviceId());
1273 StoredGroupEntry existing =
1274 getStoredGroupEntry(group.deviceId(), group.id());
Srikanth Vavilapalli5428b6c2015-05-14 20:22:47 -07001275 log.debug("groupMissing: group entry {} in device {} moving from {} to PENDING_ADD_RETRY",
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001276 existing.id(),
1277 existing.deviceId(),
1278 existing.state());
Srikanth Vavilapalli5428b6c2015-05-14 20:22:47 -07001279 existing.setState(Group.GroupState.PENDING_ADD_RETRY);
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001280 //Re-PUT map entries to trigger map update events
1281 getGroupStoreKeyMap().
1282 put(new GroupStoreKeyMapKey(existing.deviceId(),
1283 existing.appCookie()), existing);
1284 notifyDelegate(new GroupEvent(GroupEvent.Type.GROUP_ADD_REQUESTED,
1285 group));
1286 break;
1287 default:
1288 log.debug("Group {} has not been installed.", group);
1289 break;
1290 }
1291 }
1292
1293 private void extraneousGroup(Group group) {
1294 log.debug("Group {} is on device {} but not in store.",
1295 group, group.deviceId());
1296 addOrUpdateExtraneousGroupEntry(group);
1297 }
1298
1299 private void groupAdded(Group group) {
1300 log.trace("Group {} Added or Updated in device {}",
1301 group, group.deviceId());
1302 addOrUpdateGroupEntry(group);
1303 }
alshabib10580802015-02-18 18:30:33 -08001304}