blob: e91031a0ed0984219803880822c818f2a1d4a6bd [file] [log] [blame]
alshabib10580802015-02-18 18:30:33 -08001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.group.impl;
17
Jonathan Hart6ec029a2015-03-24 17:12:35 -070018import com.google.common.collect.FluentIterable;
19import com.google.common.collect.Iterables;
Srikanth Vavilapalli23181912015-05-04 09:48:09 -070020import com.google.common.collect.Sets;
Srikanth Vavilapalli10e75cd2015-04-13 16:21:24 -070021
alshabib10580802015-02-18 18:30:33 -080022import org.apache.felix.scr.annotations.Activate;
23import org.apache.felix.scr.annotations.Component;
24import org.apache.felix.scr.annotations.Deactivate;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070025import org.apache.felix.scr.annotations.Reference;
26import org.apache.felix.scr.annotations.ReferenceCardinality;
alshabib10580802015-02-18 18:30:33 -080027import org.apache.felix.scr.annotations.Service;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070028import org.onlab.util.KryoNamespace;
alshabib10580802015-02-18 18:30:33 -080029import org.onlab.util.NewConcurrentHashMap;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070030import org.onosproject.cluster.ClusterService;
31import org.onosproject.core.DefaultApplicationId;
alshabib10580802015-02-18 18:30:33 -080032import org.onosproject.core.DefaultGroupId;
33import org.onosproject.core.GroupId;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070034import org.onosproject.mastership.MastershipService;
alshabib10580802015-02-18 18:30:33 -080035import org.onosproject.net.DeviceId;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070036import org.onosproject.net.MastershipRole;
37import org.onosproject.net.PortNumber;
38import org.onosproject.net.flow.DefaultTrafficTreatment;
39import org.onosproject.net.flow.FlowRule;
40import org.onosproject.net.flow.instructions.Instructions;
41import org.onosproject.net.flow.instructions.L0ModificationInstruction;
42import org.onosproject.net.flow.instructions.L2ModificationInstruction;
43import org.onosproject.net.flow.instructions.L3ModificationInstruction;
alshabib10580802015-02-18 18:30:33 -080044import org.onosproject.net.group.DefaultGroup;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070045import org.onosproject.net.group.DefaultGroupBucket;
alshabib10580802015-02-18 18:30:33 -080046import org.onosproject.net.group.DefaultGroupDescription;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070047import org.onosproject.net.group.DefaultGroupKey;
alshabib10580802015-02-18 18:30:33 -080048import org.onosproject.net.group.Group;
49import org.onosproject.net.group.Group.GroupState;
50import org.onosproject.net.group.GroupBucket;
51import org.onosproject.net.group.GroupBuckets;
52import org.onosproject.net.group.GroupDescription;
53import org.onosproject.net.group.GroupEvent;
54import org.onosproject.net.group.GroupEvent.Type;
55import org.onosproject.net.group.GroupKey;
56import org.onosproject.net.group.GroupOperation;
57import org.onosproject.net.group.GroupStore;
58import org.onosproject.net.group.GroupStoreDelegate;
Srikanth Vavilapalli10e75cd2015-04-13 16:21:24 -070059import org.onosproject.net.group.StoredGroupBucketEntry;
alshabib10580802015-02-18 18:30:33 -080060import org.onosproject.net.group.StoredGroupEntry;
61import org.onosproject.store.AbstractStore;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070062import org.onosproject.store.Timestamp;
63import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
64import org.onosproject.store.cluster.messaging.ClusterMessage;
65import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
Jonathan Hart63939a32015-05-08 11:57:03 -070066import org.onosproject.store.service.MultiValuedTimestamp;
Srikanth Vavilapalli23181912015-05-04 09:48:09 -070067import org.onosproject.store.serializers.DeviceIdSerializer;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070068import org.onosproject.store.serializers.KryoNamespaces;
Srikanth Vavilapalli23181912015-05-04 09:48:09 -070069import org.onosproject.store.serializers.URISerializer;
Jonathan Hart6ec029a2015-03-24 17:12:35 -070070import org.onosproject.store.service.ClockService;
71import org.onosproject.store.service.EventuallyConsistentMap;
72import org.onosproject.store.service.EventuallyConsistentMapBuilder;
73import org.onosproject.store.service.EventuallyConsistentMapEvent;
74import org.onosproject.store.service.EventuallyConsistentMapListener;
75import org.onosproject.store.service.StorageService;
alshabib10580802015-02-18 18:30:33 -080076import org.slf4j.Logger;
77
Jonathan Hart6ec029a2015-03-24 17:12:35 -070078import java.net.URI;
79import java.util.ArrayList;
Srikanth Vavilapalli23181912015-05-04 09:48:09 -070080import java.util.Collection;
Jonathan Hart6ec029a2015-03-24 17:12:35 -070081import java.util.HashMap;
Srikanth Vavilapalli23181912015-05-04 09:48:09 -070082import java.util.Iterator;
Jonathan Hart6ec029a2015-03-24 17:12:35 -070083import java.util.List;
84import java.util.Objects;
Sho SHIMIZU30d639b2015-05-05 09:30:35 -070085import java.util.Optional;
Srikanth Vavilapalli23181912015-05-04 09:48:09 -070086import java.util.Set;
Jonathan Hart6ec029a2015-03-24 17:12:35 -070087import java.util.concurrent.ConcurrentHashMap;
88import java.util.concurrent.ConcurrentMap;
89import java.util.concurrent.ExecutorService;
90import java.util.concurrent.Executors;
91import java.util.concurrent.atomic.AtomicInteger;
92import java.util.concurrent.atomic.AtomicLong;
93import java.util.stream.Collectors;
94
95import static org.apache.commons.lang3.concurrent.ConcurrentUtils.createIfAbsentUnchecked;
96import static org.onlab.util.Tools.groupedThreads;
97import static org.slf4j.LoggerFactory.getLogger;
alshabib10580802015-02-18 18:30:33 -080098
99/**
100 * Manages inventory of group entries using trivial in-memory implementation.
101 */
102@Component(immediate = true)
103@Service
104public class DistributedGroupStore
105 extends AbstractStore<GroupEvent, GroupStoreDelegate>
106 implements GroupStore {
107
108 private final Logger log = getLogger(getClass());
109
110 private final int dummyId = 0xffffffff;
111 private final GroupId dummyGroupId = new DefaultGroupId(dummyId);
112
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700113 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
114 protected ClusterCommunicationService clusterCommunicator;
115
116 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
117 protected ClusterService clusterService;
118
119 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700120 protected StorageService storageService;
121
122 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700123 protected MastershipService mastershipService;
124
125 // Per device group table with (device id + app cookie) as key
126 private EventuallyConsistentMap<GroupStoreKeyMapKey,
127 StoredGroupEntry> groupStoreEntriesByKey = null;
128 // Per device group table with (device id + group id) as key
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700129 private final ConcurrentMap<DeviceId, ConcurrentMap<GroupId, StoredGroupEntry>>
130 groupEntriesById = new ConcurrentHashMap<>();
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700131 private EventuallyConsistentMap<GroupStoreKeyMapKey,
132 StoredGroupEntry> auditPendingReqQueue = null;
alshabib10580802015-02-18 18:30:33 -0800133 private final ConcurrentMap<DeviceId, ConcurrentMap<GroupId, Group>>
134 extraneousGroupEntriesById = new ConcurrentHashMap<>();
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700135 private ExecutorService messageHandlingExecutor;
136 private static final int MESSAGE_HANDLER_THREAD_POOL_SIZE = 1;
alshabib10580802015-02-18 18:30:33 -0800137
138 private final HashMap<DeviceId, Boolean> deviceAuditStatus =
139 new HashMap<DeviceId, Boolean>();
140
141 private final AtomicInteger groupIdGen = new AtomicInteger();
142
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700143 private KryoNamespace.Builder kryoBuilder = null;
144
alshabib10580802015-02-18 18:30:33 -0800145 @Activate
146 public void activate() {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700147 kryoBuilder = new KryoNamespace.Builder()
148 .register(DefaultGroup.class,
149 DefaultGroupBucket.class,
150 DefaultGroupDescription.class,
151 DefaultGroupKey.class,
152 GroupDescription.Type.class,
153 Group.GroupState.class,
154 GroupBuckets.class,
155 DefaultGroupId.class,
156 GroupStoreMessage.class,
157 GroupStoreMessage.Type.class,
158 UpdateType.class,
159 GroupStoreMessageSubjects.class,
160 MultiValuedTimestamp.class,
161 GroupStoreKeyMapKey.class,
162 GroupStoreIdMapKey.class,
163 GroupStoreMapKey.class
164 )
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700165 .register(new URISerializer(), URI.class)
166 .register(new DeviceIdSerializer(), DeviceId.class)
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700167 .register(PortNumber.class)
168 .register(DefaultApplicationId.class)
169 .register(DefaultTrafficTreatment.class,
170 Instructions.DropInstruction.class,
171 Instructions.OutputInstruction.class,
172 Instructions.GroupInstruction.class,
173 Instructions.TableTypeTransition.class,
174 FlowRule.Type.class,
175 L0ModificationInstruction.class,
176 L0ModificationInstruction.L0SubType.class,
177 L0ModificationInstruction.ModLambdaInstruction.class,
178 L2ModificationInstruction.class,
179 L2ModificationInstruction.L2SubType.class,
180 L2ModificationInstruction.ModEtherInstruction.class,
181 L2ModificationInstruction.PushHeaderInstructions.class,
182 L2ModificationInstruction.ModVlanIdInstruction.class,
183 L2ModificationInstruction.ModVlanPcpInstruction.class,
184 L2ModificationInstruction.ModMplsLabelInstruction.class,
185 L2ModificationInstruction.ModMplsTtlInstruction.class,
186 L3ModificationInstruction.class,
187 L3ModificationInstruction.L3SubType.class,
188 L3ModificationInstruction.ModIPInstruction.class,
189 L3ModificationInstruction.ModIPv6FlowLabelInstruction.class,
190 L3ModificationInstruction.ModTtlInstruction.class,
191 org.onlab.packet.MplsLabel.class
192 )
193 .register(org.onosproject.cluster.NodeId.class)
194 .register(KryoNamespaces.BASIC)
195 .register(KryoNamespaces.MISC);
196
197 messageHandlingExecutor = Executors.
198 newFixedThreadPool(MESSAGE_HANDLER_THREAD_POOL_SIZE,
199 groupedThreads("onos/store/group",
200 "message-handlers"));
201 clusterCommunicator.
202 addSubscriber(GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST,
203 new ClusterGroupMsgHandler(),
204 messageHandlingExecutor);
205
206 log.debug("Creating EC map groupstorekeymap");
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700207 EventuallyConsistentMapBuilder<GroupStoreKeyMapKey, StoredGroupEntry>
208 keyMapBuilder = storageService.eventuallyConsistentMapBuilder();
209
210 groupStoreEntriesByKey = keyMapBuilder
211 .withName("groupstorekeymap")
212 .withSerializer(kryoBuilder)
213 .withClockService(new GroupStoreLogicalClockManager<>())
214 .build();
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700215 groupStoreEntriesByKey.addListener(new GroupStoreKeyMapListener());
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700216 log.debug("Current size of groupstorekeymap:{}",
217 groupStoreEntriesByKey.size());
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700218
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700219 log.debug("Creating EC map pendinggroupkeymap");
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700220 EventuallyConsistentMapBuilder<GroupStoreKeyMapKey, StoredGroupEntry>
221 auditMapBuilder = storageService.eventuallyConsistentMapBuilder();
222
223 auditPendingReqQueue = auditMapBuilder
224 .withName("pendinggroupkeymap")
225 .withSerializer(kryoBuilder)
226 .withClockService(new GroupStoreLogicalClockManager<>())
227 .build();
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700228 log.debug("Current size of pendinggroupkeymap:{}",
229 auditPendingReqQueue.size());
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700230
alshabib10580802015-02-18 18:30:33 -0800231 log.info("Started");
232 }
233
234 @Deactivate
235 public void deactivate() {
Thomas Vachuska152f9fd2015-04-02 16:28:13 -0700236 groupStoreEntriesByKey.destroy();
Thomas Vachuska152f9fd2015-04-02 16:28:13 -0700237 auditPendingReqQueue.destroy();
alshabib10580802015-02-18 18:30:33 -0800238 log.info("Stopped");
239 }
240
alshabib10580802015-02-18 18:30:33 -0800241 private static NewConcurrentHashMap<GroupId, Group>
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700242 lazyEmptyExtraneousGroupIdTable() {
alshabib10580802015-02-18 18:30:33 -0800243 return NewConcurrentHashMap.<GroupId, Group>ifNeeded();
244 }
245
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700246 private static NewConcurrentHashMap<GroupId, StoredGroupEntry>
247 lazyEmptyGroupIdTable() {
248 return NewConcurrentHashMap.<GroupId, StoredGroupEntry>ifNeeded();
249 }
250
alshabib10580802015-02-18 18:30:33 -0800251 /**
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700252 * Returns the group store eventual consistent key map.
alshabib10580802015-02-18 18:30:33 -0800253 *
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700254 * @return Map representing group key table.
alshabib10580802015-02-18 18:30:33 -0800255 */
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700256 private EventuallyConsistentMap<GroupStoreKeyMapKey, StoredGroupEntry>
257 getGroupStoreKeyMap() {
258 return groupStoreEntriesByKey;
alshabib10580802015-02-18 18:30:33 -0800259 }
260
261 /**
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700262 * Returns the group id table for specified device.
alshabib10580802015-02-18 18:30:33 -0800263 *
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700264 * @param deviceId identifier of the device
265 * @return Map representing group key table of given device.
alshabib10580802015-02-18 18:30:33 -0800266 */
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700267 private ConcurrentMap<GroupId, StoredGroupEntry> getGroupIdTable(DeviceId deviceId) {
268 return createIfAbsentUnchecked(groupEntriesById,
269 deviceId, lazyEmptyGroupIdTable());
alshabib10580802015-02-18 18:30:33 -0800270 }
271
272 /**
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700273 * Returns the pending group request table.
alshabib10580802015-02-18 18:30:33 -0800274 *
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700275 * @return Map representing group key table.
alshabib10580802015-02-18 18:30:33 -0800276 */
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700277 private EventuallyConsistentMap<GroupStoreKeyMapKey, StoredGroupEntry>
278 getPendingGroupKeyTable() {
279 return auditPendingReqQueue;
alshabib10580802015-02-18 18:30:33 -0800280 }
281
282 /**
283 * Returns the extraneous group id table for specified device.
284 *
285 * @param deviceId identifier of the device
286 * @return Map representing group key table of given device.
287 */
288 private ConcurrentMap<GroupId, Group>
289 getExtraneousGroupIdTable(DeviceId deviceId) {
290 return createIfAbsentUnchecked(extraneousGroupEntriesById,
291 deviceId,
292 lazyEmptyExtraneousGroupIdTable());
293 }
294
295 /**
296 * Returns the number of groups for the specified device in the store.
297 *
298 * @return number of groups for the specified device
299 */
300 @Override
301 public int getGroupCount(DeviceId deviceId) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700302 return (getGroups(deviceId) != null) ?
303 Iterables.size(getGroups(deviceId)) : 0;
alshabib10580802015-02-18 18:30:33 -0800304 }
305
306 /**
307 * Returns the groups associated with a device.
308 *
309 * @param deviceId the device ID
310 *
311 * @return the group entries
312 */
313 @Override
314 public Iterable<Group> getGroups(DeviceId deviceId) {
315 // flatten and make iterator unmodifiable
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700316 log.debug("getGroups: for device {} total number of groups {}",
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700317 deviceId, getGroupStoreKeyMap().values().size());
318 return FluentIterable.from(getGroupStoreKeyMap().values())
319 .filter(input -> input.deviceId().equals(deviceId))
320 .transform(input -> input);
alshabib10580802015-02-18 18:30:33 -0800321 }
322
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700323 private Iterable<StoredGroupEntry> getStoredGroups(DeviceId deviceId) {
324 // flatten and make iterator unmodifiable
325 log.debug("getGroups: for device {} total number of groups {}",
326 deviceId, getGroupStoreKeyMap().values().size());
327 return FluentIterable.from(getGroupStoreKeyMap().values())
328 .filter(input -> input.deviceId().equals(deviceId));
329 }
330
alshabib10580802015-02-18 18:30:33 -0800331 /**
332 * Returns the stored group entry.
333 *
334 * @param deviceId the device ID
335 * @param appCookie the group key
336 *
337 * @return a group associated with the key
338 */
339 @Override
340 public Group getGroup(DeviceId deviceId, GroupKey appCookie) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700341 return getStoredGroupEntry(deviceId, appCookie);
342 }
343
344 private StoredGroupEntry getStoredGroupEntry(DeviceId deviceId,
345 GroupKey appCookie) {
346 return getGroupStoreKeyMap().get(new GroupStoreKeyMapKey(deviceId,
347 appCookie));
348 }
349
350 @Override
351 public Group getGroup(DeviceId deviceId, GroupId groupId) {
352 return getStoredGroupEntry(deviceId, groupId);
353 }
354
355 private StoredGroupEntry getStoredGroupEntry(DeviceId deviceId,
356 GroupId groupId) {
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700357 return getGroupIdTable(deviceId).get(groupId);
alshabib10580802015-02-18 18:30:33 -0800358 }
359
360 private int getFreeGroupIdValue(DeviceId deviceId) {
361 int freeId = groupIdGen.incrementAndGet();
362
363 while (true) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700364 Group existing = getGroup(deviceId, new DefaultGroupId(freeId));
alshabib10580802015-02-18 18:30:33 -0800365 if (existing == null) {
366 existing = (
367 extraneousGroupEntriesById.get(deviceId) != null) ?
368 extraneousGroupEntriesById.get(deviceId).
369 get(new DefaultGroupId(freeId)) :
370 null;
371 }
372 if (existing != null) {
373 freeId = groupIdGen.incrementAndGet();
374 } else {
375 break;
376 }
377 }
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700378 log.debug("getFreeGroupIdValue: Next Free ID is {}", freeId);
alshabib10580802015-02-18 18:30:33 -0800379 return freeId;
380 }
381
382 /**
383 * Stores a new group entry using the information from group description.
384 *
385 * @param groupDesc group description to be used to create group entry
386 */
387 @Override
388 public void storeGroupDescription(GroupDescription groupDesc) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700389 log.debug("In storeGroupDescription");
alshabib10580802015-02-18 18:30:33 -0800390 // Check if a group is existing with the same key
391 if (getGroup(groupDesc.deviceId(), groupDesc.appCookie()) != null) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700392 log.warn("Group already exists with the same key {}",
393 groupDesc.appCookie());
alshabib10580802015-02-18 18:30:33 -0800394 return;
395 }
396
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700397 // Check if group to be created by a remote instance
Madan Jampani175e8fd2015-05-20 14:10:45 -0700398 if (mastershipService.getLocalRole(groupDesc.deviceId()) != MastershipRole.MASTER) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700399 log.debug("storeGroupDescription: Device {} local role is not MASTER",
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700400 groupDesc.deviceId());
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700401 if (mastershipService.getMasterFor(groupDesc.deviceId()) == null) {
402 log.error("No Master for device {}..."
403 + "Can not perform add group operation",
404 groupDesc.deviceId());
405 //TODO: Send Group operation failure event
406 return;
407 }
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700408 GroupStoreMessage groupOp = GroupStoreMessage.
409 createGroupAddRequestMsg(groupDesc.deviceId(),
410 groupDesc);
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700411
Madan Jampani175e8fd2015-05-20 14:10:45 -0700412 clusterCommunicator.unicast(groupOp,
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700413 GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST,
414 m -> kryoBuilder.build().serialize(m),
Madan Jampani175e8fd2015-05-20 14:10:45 -0700415 mastershipService.getMasterFor(groupDesc.deviceId())).whenComplete((result, error) -> {
416 if (error != null) {
417 log.warn("Failed to send request to master: {} to {}",
418 groupOp,
419 mastershipService.getMasterFor(groupDesc.deviceId()));
420 //TODO: Send Group operation failure event
421 } else {
422 log.debug("Sent Group operation request for device {} "
423 + "to remote MASTER {}",
424 groupDesc.deviceId(),
425 mastershipService.getMasterFor(groupDesc.deviceId()));
426 }
427 });
alshabib10580802015-02-18 18:30:33 -0800428 return;
429 }
430
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700431 log.debug("Store group for device {} is getting handled locally",
432 groupDesc.deviceId());
alshabib10580802015-02-18 18:30:33 -0800433 storeGroupDescriptionInternal(groupDesc);
434 }
435
436 private void storeGroupDescriptionInternal(GroupDescription groupDesc) {
437 // Check if a group is existing with the same key
438 if (getGroup(groupDesc.deviceId(), groupDesc.appCookie()) != null) {
439 return;
440 }
441
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700442 if (deviceAuditStatus.get(groupDesc.deviceId()) == null) {
443 // Device group audit has not completed yet
444 // Add this group description to pending group key table
445 // Create a group entry object with Dummy Group ID
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700446 log.debug("storeGroupDescriptionInternal: Device {} AUDIT pending...Queuing Group ADD request",
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700447 groupDesc.deviceId());
448 StoredGroupEntry group = new DefaultGroup(dummyGroupId, groupDesc);
449 group.setState(GroupState.WAITING_AUDIT_COMPLETE);
450 EventuallyConsistentMap<GroupStoreKeyMapKey, StoredGroupEntry> pendingKeyTable =
451 getPendingGroupKeyTable();
452 pendingKeyTable.put(new GroupStoreKeyMapKey(groupDesc.deviceId(),
453 groupDesc.appCookie()),
454 group);
455 return;
456 }
457
Saurav Das100e3b82015-04-30 11:12:10 -0700458 GroupId id = null;
459 if (groupDesc.givenGroupId() == null) {
460 // Get a new group identifier
461 id = new DefaultGroupId(getFreeGroupIdValue(groupDesc.deviceId()));
462 } else {
463 id = new DefaultGroupId(groupDesc.givenGroupId());
464 }
alshabib10580802015-02-18 18:30:33 -0800465 // Create a group entry object
466 StoredGroupEntry group = new DefaultGroup(id, groupDesc);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700467 // Insert the newly created group entry into key and id maps
468 getGroupStoreKeyMap().
469 put(new GroupStoreKeyMapKey(groupDesc.deviceId(),
470 groupDesc.appCookie()), group);
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700471 // Ensure it also inserted into group id based table to
472 // avoid any chances of duplication in group id generation
473 getGroupIdTable(groupDesc.deviceId()).
474 put(id, group);
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700475 log.debug("storeGroupDescriptionInternal: Processing Group ADD request for Id {} in device {}",
476 id,
477 groupDesc.deviceId());
alshabib10580802015-02-18 18:30:33 -0800478 notifyDelegate(new GroupEvent(GroupEvent.Type.GROUP_ADD_REQUESTED,
479 group));
480 }
481
482 /**
483 * Updates the existing group entry with the information
484 * from group description.
485 *
486 * @param deviceId the device ID
487 * @param oldAppCookie the current group key
488 * @param type update type
489 * @param newBuckets group buckets for updates
490 * @param newAppCookie optional new group key
491 */
492 @Override
493 public void updateGroupDescription(DeviceId deviceId,
494 GroupKey oldAppCookie,
495 UpdateType type,
496 GroupBuckets newBuckets,
497 GroupKey newAppCookie) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700498 // Check if group update to be done by a remote instance
sangho52abe3a2015-05-05 14:13:34 -0700499 if (mastershipService.getMasterFor(deviceId) != null &&
500 mastershipService.getLocalRole(deviceId) != MastershipRole.MASTER) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700501 log.debug("updateGroupDescription: Device {} local role is not MASTER",
502 deviceId);
503 if (mastershipService.getMasterFor(deviceId) == null) {
504 log.error("No Master for device {}..."
505 + "Can not perform update group operation",
506 deviceId);
507 //TODO: Send Group operation failure event
508 return;
509 }
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700510 GroupStoreMessage groupOp = GroupStoreMessage.
511 createGroupUpdateRequestMsg(deviceId,
512 oldAppCookie,
513 type,
514 newBuckets,
515 newAppCookie);
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700516
Madan Jampani175e8fd2015-05-20 14:10:45 -0700517 clusterCommunicator.unicast(groupOp,
518 GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST,
519 m -> kryoBuilder.build().serialize(m),
520 mastershipService.getMasterFor(deviceId)).whenComplete((result, error) -> {
521 if (error != null) {
522 log.warn("Failed to send request to master: {} to {}",
523 groupOp,
524 mastershipService.getMasterFor(deviceId), error);
525 }
526 //TODO: Send Group operation failure event
527 });
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700528 return;
529 }
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700530 log.debug("updateGroupDescription for device {} is getting handled locally",
531 deviceId);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700532 updateGroupDescriptionInternal(deviceId,
533 oldAppCookie,
534 type,
535 newBuckets,
536 newAppCookie);
537 }
538
539 private void updateGroupDescriptionInternal(DeviceId deviceId,
540 GroupKey oldAppCookie,
541 UpdateType type,
542 GroupBuckets newBuckets,
543 GroupKey newAppCookie) {
alshabib10580802015-02-18 18:30:33 -0800544 // Check if a group is existing with the provided key
545 Group oldGroup = getGroup(deviceId, oldAppCookie);
546 if (oldGroup == null) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700547 log.warn("updateGroupDescriptionInternal: Group not found...strange");
alshabib10580802015-02-18 18:30:33 -0800548 return;
549 }
550
551 List<GroupBucket> newBucketList = getUpdatedBucketList(oldGroup,
552 type,
553 newBuckets);
554 if (newBucketList != null) {
555 // Create a new group object from the old group
556 GroupBuckets updatedBuckets = new GroupBuckets(newBucketList);
557 GroupKey newCookie = (newAppCookie != null) ? newAppCookie : oldAppCookie;
558 GroupDescription updatedGroupDesc = new DefaultGroupDescription(
559 oldGroup.deviceId(),
560 oldGroup.type(),
561 updatedBuckets,
562 newCookie,
Saurav Das100e3b82015-04-30 11:12:10 -0700563 oldGroup.givenGroupId(),
alshabib10580802015-02-18 18:30:33 -0800564 oldGroup.appId());
565 StoredGroupEntry newGroup = new DefaultGroup(oldGroup.id(),
566 updatedGroupDesc);
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700567 log.debug("updateGroupDescriptionInternal: group entry {} in device {} moving from {} to PENDING_UPDATE",
568 oldGroup.id(),
569 oldGroup.deviceId(),
570 oldGroup.state());
alshabib10580802015-02-18 18:30:33 -0800571 newGroup.setState(GroupState.PENDING_UPDATE);
572 newGroup.setLife(oldGroup.life());
573 newGroup.setPackets(oldGroup.packets());
574 newGroup.setBytes(oldGroup.bytes());
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700575 //Update the group entry in groupkey based map.
576 //Update to groupid based map will happen in the
577 //groupkey based map update listener
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700578 log.debug("updateGroupDescriptionInternal with type {}: Group updated with buckets",
579 type);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700580 getGroupStoreKeyMap().
581 put(new GroupStoreKeyMapKey(newGroup.deviceId(),
582 newGroup.appCookie()), newGroup);
alshabib10580802015-02-18 18:30:33 -0800583 notifyDelegate(new GroupEvent(Type.GROUP_UPDATE_REQUESTED, newGroup));
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700584 } else {
585 log.warn("updateGroupDescriptionInternal with type {}: No "
586 + "change in the buckets in update", type);
alshabib10580802015-02-18 18:30:33 -0800587 }
588 }
589
590 private List<GroupBucket> getUpdatedBucketList(Group oldGroup,
591 UpdateType type,
592 GroupBuckets buckets) {
593 GroupBuckets oldBuckets = oldGroup.buckets();
594 List<GroupBucket> newBucketList = new ArrayList<GroupBucket>(
595 oldBuckets.buckets());
596 boolean groupDescUpdated = false;
597
598 if (type == UpdateType.ADD) {
599 // Check if the any of the new buckets are part of
600 // the old bucket list
601 for (GroupBucket addBucket:buckets.buckets()) {
602 if (!newBucketList.contains(addBucket)) {
603 newBucketList.add(addBucket);
604 groupDescUpdated = true;
605 }
606 }
607 } else if (type == UpdateType.REMOVE) {
608 // Check if the to be removed buckets are part of the
609 // old bucket list
610 for (GroupBucket removeBucket:buckets.buckets()) {
611 if (newBucketList.contains(removeBucket)) {
612 newBucketList.remove(removeBucket);
613 groupDescUpdated = true;
614 }
615 }
616 }
617
618 if (groupDescUpdated) {
619 return newBucketList;
620 } else {
621 return null;
622 }
623 }
624
625 /**
626 * Triggers deleting the existing group entry.
627 *
628 * @param deviceId the device ID
629 * @param appCookie the group key
630 */
631 @Override
632 public void deleteGroupDescription(DeviceId deviceId,
633 GroupKey appCookie) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700634 // Check if group to be deleted by a remote instance
635 if (mastershipService.
636 getLocalRole(deviceId) != MastershipRole.MASTER) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700637 log.debug("deleteGroupDescription: Device {} local role is not MASTER",
638 deviceId);
639 if (mastershipService.getMasterFor(deviceId) == null) {
640 log.error("No Master for device {}..."
641 + "Can not perform delete group operation",
642 deviceId);
643 //TODO: Send Group operation failure event
644 return;
645 }
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700646 GroupStoreMessage groupOp = GroupStoreMessage.
647 createGroupDeleteRequestMsg(deviceId,
648 appCookie);
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700649
Madan Jampani175e8fd2015-05-20 14:10:45 -0700650 clusterCommunicator.unicast(groupOp,
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700651 GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST,
652 m -> kryoBuilder.build().serialize(m),
Madan Jampani175e8fd2015-05-20 14:10:45 -0700653 mastershipService.getMasterFor(deviceId)).whenComplete((result, error) -> {
654 if (error != null) {
655 log.warn("Failed to send request to master: {} to {}",
656 groupOp,
657 mastershipService.getMasterFor(deviceId), error);
658 }
659 //TODO: Send Group operation failure event
660 });
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700661 return;
662 }
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700663 log.debug("deleteGroupDescription in device {} is getting handled locally",
664 deviceId);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700665 deleteGroupDescriptionInternal(deviceId, appCookie);
666 }
667
668 private void deleteGroupDescriptionInternal(DeviceId deviceId,
669 GroupKey appCookie) {
alshabib10580802015-02-18 18:30:33 -0800670 // Check if a group is existing with the provided key
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700671 StoredGroupEntry existing = getStoredGroupEntry(deviceId, appCookie);
alshabib10580802015-02-18 18:30:33 -0800672 if (existing == null) {
673 return;
674 }
675
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700676 log.debug("deleteGroupDescriptionInternal: group entry {} in device {} moving from {} to PENDING_DELETE",
677 existing.id(),
678 existing.deviceId(),
679 existing.state());
alshabib10580802015-02-18 18:30:33 -0800680 synchronized (existing) {
681 existing.setState(GroupState.PENDING_DELETE);
682 }
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700683 log.debug("deleteGroupDescriptionInternal: in device {} issuing GROUP_REMOVE_REQUESTED",
684 deviceId);
alshabib10580802015-02-18 18:30:33 -0800685 notifyDelegate(new GroupEvent(Type.GROUP_REMOVE_REQUESTED, existing));
686 }
687
688 /**
689 * Stores a new group entry, or updates an existing entry.
690 *
691 * @param group group entry
692 */
693 @Override
694 public void addOrUpdateGroupEntry(Group group) {
695 // check if this new entry is an update to an existing entry
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700696 StoredGroupEntry existing = getStoredGroupEntry(group.deviceId(),
697 group.id());
alshabib10580802015-02-18 18:30:33 -0800698 GroupEvent event = null;
699
700 if (existing != null) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700701 log.debug("addOrUpdateGroupEntry: updating group entry {} in device {}",
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700702 group.id(),
703 group.deviceId());
alshabib10580802015-02-18 18:30:33 -0800704 synchronized (existing) {
Srikanth Vavilapalli10e75cd2015-04-13 16:21:24 -0700705 for (GroupBucket bucket:group.buckets().buckets()) {
Sho SHIMIZU30d639b2015-05-05 09:30:35 -0700706 Optional<GroupBucket> matchingBucket =
Srikanth Vavilapalli10e75cd2015-04-13 16:21:24 -0700707 existing.buckets().buckets()
708 .stream()
709 .filter((existingBucket)->(existingBucket.equals(bucket)))
710 .findFirst();
711 if (matchingBucket.isPresent()) {
712 ((StoredGroupBucketEntry) matchingBucket.
713 get()).setPackets(bucket.packets());
714 ((StoredGroupBucketEntry) matchingBucket.
715 get()).setBytes(bucket.bytes());
716 } else {
717 log.warn("addOrUpdateGroupEntry: No matching "
718 + "buckets to update stats");
719 }
720 }
alshabib10580802015-02-18 18:30:33 -0800721 existing.setLife(group.life());
722 existing.setPackets(group.packets());
723 existing.setBytes(group.bytes());
Srikanth Vavilapalli5428b6c2015-05-14 20:22:47 -0700724 if ((existing.state() == GroupState.PENDING_ADD) ||
725 (existing.state() == GroupState.PENDING_ADD_RETRY)) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700726 log.debug("addOrUpdateGroupEntry: group entry {} in device {} moving from {} to ADDED",
727 existing.id(),
728 existing.deviceId(),
Srikanth Vavilapalli5428b6c2015-05-14 20:22:47 -0700729 existing.state());
alshabib10580802015-02-18 18:30:33 -0800730 existing.setState(GroupState.ADDED);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700731 existing.setIsGroupStateAddedFirstTime(true);
alshabib10580802015-02-18 18:30:33 -0800732 event = new GroupEvent(Type.GROUP_ADDED, existing);
733 } else {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700734 log.debug("addOrUpdateGroupEntry: group entry {} in device {} moving from {} to ADDED",
735 existing.id(),
736 existing.deviceId(),
737 GroupState.PENDING_UPDATE);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700738 existing.setState(GroupState.ADDED);
739 existing.setIsGroupStateAddedFirstTime(false);
alshabib10580802015-02-18 18:30:33 -0800740 event = new GroupEvent(Type.GROUP_UPDATED, existing);
741 }
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700742 //Re-PUT map entries to trigger map update events
743 getGroupStoreKeyMap().
744 put(new GroupStoreKeyMapKey(existing.deviceId(),
745 existing.appCookie()), existing);
alshabib10580802015-02-18 18:30:33 -0800746 }
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700747 } else {
748 log.warn("addOrUpdateGroupEntry: Group update "
749 + "happening for a non-existing entry in the map");
alshabib10580802015-02-18 18:30:33 -0800750 }
751
752 if (event != null) {
753 notifyDelegate(event);
754 }
755 }
756
757 /**
758 * Removes the group entry from store.
759 *
760 * @param group group entry
761 */
762 @Override
763 public void removeGroupEntry(Group group) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700764 StoredGroupEntry existing = getStoredGroupEntry(group.deviceId(),
765 group.id());
alshabib10580802015-02-18 18:30:33 -0800766
767 if (existing != null) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700768 log.debug("removeGroupEntry: removing group entry {} in device {}",
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700769 group.id(),
770 group.deviceId());
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700771 //Removal from groupid based map will happen in the
772 //map update listener
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700773 getGroupStoreKeyMap().remove(new GroupStoreKeyMapKey(existing.deviceId(),
774 existing.appCookie()));
alshabib10580802015-02-18 18:30:33 -0800775 notifyDelegate(new GroupEvent(Type.GROUP_REMOVED, existing));
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700776 } else {
777 log.warn("removeGroupEntry for {} in device{} is "
778 + "not existing in our maps",
779 group.id(),
780 group.deviceId());
alshabib10580802015-02-18 18:30:33 -0800781 }
782 }
783
784 @Override
785 public void deviceInitialAuditCompleted(DeviceId deviceId,
786 boolean completed) {
787 synchronized (deviceAuditStatus) {
788 if (completed) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700789 log.debug("AUDIT completed for device {}",
790 deviceId);
alshabib10580802015-02-18 18:30:33 -0800791 deviceAuditStatus.put(deviceId, true);
792 // Execute all pending group requests
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700793 List<StoredGroupEntry> pendingGroupRequests =
794 getPendingGroupKeyTable().values()
795 .stream()
796 .filter(g-> g.deviceId().equals(deviceId))
797 .collect(Collectors.toList());
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700798 log.debug("processing pending group add requests for device {} and number of pending requests {}",
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700799 deviceId,
800 pendingGroupRequests.size());
801 for (Group group:pendingGroupRequests) {
alshabib10580802015-02-18 18:30:33 -0800802 GroupDescription tmp = new DefaultGroupDescription(
803 group.deviceId(),
804 group.type(),
805 group.buckets(),
806 group.appCookie(),
Saurav Das100e3b82015-04-30 11:12:10 -0700807 group.givenGroupId(),
alshabib10580802015-02-18 18:30:33 -0800808 group.appId());
809 storeGroupDescriptionInternal(tmp);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700810 getPendingGroupKeyTable().
811 remove(new GroupStoreKeyMapKey(deviceId, group.appCookie()));
alshabib10580802015-02-18 18:30:33 -0800812 }
alshabib10580802015-02-18 18:30:33 -0800813 } else {
Thomas Vachuskac40d4632015-04-09 16:55:03 -0700814 Boolean audited = deviceAuditStatus.get(deviceId);
815 if (audited != null && audited) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700816 log.debug("Clearing AUDIT status for device {}", deviceId);
alshabib10580802015-02-18 18:30:33 -0800817 deviceAuditStatus.put(deviceId, false);
818 }
819 }
820 }
821 }
822
823 @Override
824 public boolean deviceInitialAuditStatus(DeviceId deviceId) {
825 synchronized (deviceAuditStatus) {
Thomas Vachuskac40d4632015-04-09 16:55:03 -0700826 Boolean audited = deviceAuditStatus.get(deviceId);
827 return audited != null && audited;
alshabib10580802015-02-18 18:30:33 -0800828 }
829 }
830
831 @Override
832 public void groupOperationFailed(DeviceId deviceId, GroupOperation operation) {
833
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700834 StoredGroupEntry existing = getStoredGroupEntry(deviceId,
835 operation.groupId());
alshabib10580802015-02-18 18:30:33 -0800836
837 if (existing == null) {
838 log.warn("No group entry with ID {} found ", operation.groupId());
839 return;
840 }
841
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700842 log.warn("groupOperationFailed: group operation {} failed"
843 + "for group {} in device {}",
844 operation.opType(),
845 existing.id(),
846 existing.deviceId());
alshabib10580802015-02-18 18:30:33 -0800847 switch (operation.opType()) {
848 case ADD:
Srikanth Vavilapalli5428b6c2015-05-14 20:22:47 -0700849 if (existing.state() == GroupState.PENDING_ADD) {
850 //TODO: Need to add support for passing the group
851 //operation failure reason from group provider.
852 //If the error type is anything other than GROUP_EXISTS,
853 //then the GROUP_ADD_FAILED event should be raised even
854 //in PENDING_ADD_RETRY state also.
855 notifyDelegate(new GroupEvent(Type.GROUP_ADD_FAILED, existing));
856 log.warn("groupOperationFailed: cleaningup "
857 + "group {} from store in device {}....",
858 existing.id(),
859 existing.deviceId());
860 //Removal from groupid based map will happen in the
861 //map update listener
862 getGroupStoreKeyMap().remove(new GroupStoreKeyMapKey(existing.deviceId(),
863 existing.appCookie()));
864 }
alshabib10580802015-02-18 18:30:33 -0800865 break;
866 case MODIFY:
867 notifyDelegate(new GroupEvent(Type.GROUP_UPDATE_FAILED, existing));
868 break;
869 case DELETE:
870 notifyDelegate(new GroupEvent(Type.GROUP_REMOVE_FAILED, existing));
871 break;
872 default:
873 log.warn("Unknown group operation type {}", operation.opType());
874 }
alshabib10580802015-02-18 18:30:33 -0800875 }
876
877 @Override
878 public void addOrUpdateExtraneousGroupEntry(Group group) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700879 log.debug("add/update extraneous group entry {} in device {}",
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700880 group.id(),
881 group.deviceId());
alshabib10580802015-02-18 18:30:33 -0800882 ConcurrentMap<GroupId, Group> extraneousIdTable =
883 getExtraneousGroupIdTable(group.deviceId());
884 extraneousIdTable.put(group.id(), group);
885 // Check the reference counter
886 if (group.referenceCount() == 0) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700887 log.debug("Flow reference counter is zero and triggering remove",
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700888 group.id(),
889 group.deviceId());
alshabib10580802015-02-18 18:30:33 -0800890 notifyDelegate(new GroupEvent(Type.GROUP_REMOVE_REQUESTED, group));
891 }
892 }
893
894 @Override
895 public void removeExtraneousGroupEntry(Group group) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700896 log.debug("remove extraneous group entry {} of device {} from store",
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700897 group.id(),
898 group.deviceId());
alshabib10580802015-02-18 18:30:33 -0800899 ConcurrentMap<GroupId, Group> extraneousIdTable =
900 getExtraneousGroupIdTable(group.deviceId());
901 extraneousIdTable.remove(group.id());
902 }
903
904 @Override
905 public Iterable<Group> getExtraneousGroups(DeviceId deviceId) {
906 // flatten and make iterator unmodifiable
907 return FluentIterable.from(
908 getExtraneousGroupIdTable(deviceId).values());
909 }
910
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700911 /**
912 * ClockService that generates wallclock based timestamps.
913 */
914 private class GroupStoreLogicalClockManager<T, U>
915 implements ClockService<T, U> {
alshabib10580802015-02-18 18:30:33 -0800916
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700917 private final AtomicLong sequenceNumber = new AtomicLong(0);
918
919 @Override
920 public Timestamp getTimestamp(T t1, U u1) {
921 return new MultiValuedTimestamp<>(System.currentTimeMillis(),
922 sequenceNumber.getAndIncrement());
923 }
924 }
925
926 /**
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700927 * Map handler to receive any events when the group key map is updated.
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700928 */
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700929 private class GroupStoreKeyMapListener implements
930 EventuallyConsistentMapListener<GroupStoreKeyMapKey, StoredGroupEntry> {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700931
932 @Override
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700933 public void event(EventuallyConsistentMapEvent<GroupStoreKeyMapKey,
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700934 StoredGroupEntry> mapEvent) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700935 GroupEvent groupEvent = null;
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700936 GroupStoreKeyMapKey key = mapEvent.key();
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700937 StoredGroupEntry group = mapEvent.value();
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700938 if ((key == null) && (group == null)) {
939 log.error("GroupStoreKeyMapListener: Received "
940 + "event {} with null entry", mapEvent.type());
941 return;
942 } else if (group == null) {
943 group = getGroupIdTable(key.deviceId()).values()
944 .stream()
945 .filter((storedGroup) -> (storedGroup.appCookie().equals(key.appCookie)))
946 .findFirst().get();
947 if (group == null) {
948 log.error("GroupStoreKeyMapListener: Received "
949 + "event {} with null entry... can not process", mapEvent.type());
950 return;
951 }
952 }
953 log.trace("received groupid map event {} for id {} in device {}",
954 mapEvent.type(),
955 group.id(),
956 key.deviceId());
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700957 if (mapEvent.type() == EventuallyConsistentMapEvent.Type.PUT) {
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700958 // Update the group ID table
959 getGroupIdTable(group.deviceId()).put(group.id(), group);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700960 if (mapEvent.value().state() == Group.GroupState.ADDED) {
961 if (mapEvent.value().isGroupStateAddedFirstTime()) {
962 groupEvent = new GroupEvent(Type.GROUP_ADDED,
963 mapEvent.value());
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700964 log.trace("Received first time GROUP_ADDED state update for id {} in device {}",
965 group.id(),
966 group.deviceId());
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700967 } else {
968 groupEvent = new GroupEvent(Type.GROUP_UPDATED,
969 mapEvent.value());
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700970 log.trace("Received following GROUP_ADDED state update for id {} in device {}",
971 group.id(),
972 group.deviceId());
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700973 }
974 }
975 } else if (mapEvent.type() == EventuallyConsistentMapEvent.Type.REMOVE) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700976 groupEvent = new GroupEvent(Type.GROUP_REMOVED, group);
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700977 // Remove the entry from the group ID table
978 getGroupIdTable(group.deviceId()).remove(group.id(), group);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700979 }
980
981 if (groupEvent != null) {
982 notifyDelegate(groupEvent);
983 }
984 }
985 }
986 /**
987 * Message handler to receive messages from group subsystems of
988 * other cluster members.
989 */
990 private final class ClusterGroupMsgHandler
991 implements ClusterMessageHandler {
992 @Override
993 public void handle(ClusterMessage message) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700994 if (message.subject().equals(
995 GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST)) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700996 GroupStoreMessage groupOp = kryoBuilder.
997 build().deserialize(message.payload());
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700998 log.debug("received remote group operation {} request for device {}",
999 groupOp.type(),
1000 groupOp.deviceId());
1001 if (mastershipService.
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001002 getLocalRole(groupOp.deviceId()) !=
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001003 MastershipRole.MASTER) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001004 log.warn("ClusterGroupMsgHandler: This node is not "
1005 + "MASTER for device {}", groupOp.deviceId());
1006 return;
1007 }
1008 if (groupOp.type() == GroupStoreMessage.Type.ADD) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001009 storeGroupDescriptionInternal(groupOp.groupDesc());
1010 } else if (groupOp.type() == GroupStoreMessage.Type.UPDATE) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001011 updateGroupDescriptionInternal(groupOp.deviceId(),
1012 groupOp.appCookie(),
1013 groupOp.updateType(),
1014 groupOp.updateBuckets(),
1015 groupOp.newAppCookie());
1016 } else if (groupOp.type() == GroupStoreMessage.Type.DELETE) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001017 deleteGroupDescriptionInternal(groupOp.deviceId(),
1018 groupOp.appCookie());
1019 }
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001020 } else {
1021 log.warn("ClusterGroupMsgHandler: Unknown remote message type {}",
1022 message.subject());
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001023 }
1024 }
1025 }
1026
1027 /**
1028 * Flattened map key to be used to store group entries.
1029 */
1030 private class GroupStoreMapKey {
1031 private final DeviceId deviceId;
1032
1033 public GroupStoreMapKey(DeviceId deviceId) {
1034 this.deviceId = deviceId;
1035 }
1036
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001037 public DeviceId deviceId() {
1038 return deviceId;
1039 }
1040
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001041 @Override
1042 public boolean equals(Object o) {
1043 if (this == o) {
1044 return true;
1045 }
1046 if (!(o instanceof GroupStoreMapKey)) {
1047 return false;
1048 }
1049 GroupStoreMapKey that = (GroupStoreMapKey) o;
1050 return this.deviceId.equals(that.deviceId);
1051 }
1052
1053 @Override
1054 public int hashCode() {
1055 int result = 17;
1056
1057 result = 31 * result + Objects.hash(this.deviceId);
1058
1059 return result;
1060 }
1061 }
1062
1063 private class GroupStoreKeyMapKey extends GroupStoreMapKey {
1064 private final GroupKey appCookie;
1065 public GroupStoreKeyMapKey(DeviceId deviceId,
1066 GroupKey appCookie) {
1067 super(deviceId);
1068 this.appCookie = appCookie;
1069 }
1070
1071 @Override
1072 public boolean equals(Object o) {
1073 if (this == o) {
1074 return true;
1075 }
1076 if (!(o instanceof GroupStoreKeyMapKey)) {
1077 return false;
1078 }
1079 GroupStoreKeyMapKey that = (GroupStoreKeyMapKey) o;
1080 return (super.equals(that) &&
1081 this.appCookie.equals(that.appCookie));
1082 }
1083
1084 @Override
1085 public int hashCode() {
1086 int result = 17;
1087
1088 result = 31 * result + super.hashCode() + Objects.hash(this.appCookie);
1089
1090 return result;
1091 }
1092 }
1093
1094 private class GroupStoreIdMapKey extends GroupStoreMapKey {
1095 private final GroupId groupId;
1096 public GroupStoreIdMapKey(DeviceId deviceId,
1097 GroupId groupId) {
1098 super(deviceId);
1099 this.groupId = groupId;
1100 }
1101
1102 @Override
1103 public boolean equals(Object o) {
1104 if (this == o) {
1105 return true;
1106 }
1107 if (!(o instanceof GroupStoreIdMapKey)) {
1108 return false;
1109 }
1110 GroupStoreIdMapKey that = (GroupStoreIdMapKey) o;
1111 return (super.equals(that) &&
1112 this.groupId.equals(that.groupId));
1113 }
1114
1115 @Override
1116 public int hashCode() {
1117 int result = 17;
1118
1119 result = 31 * result + super.hashCode() + Objects.hash(this.groupId);
1120
1121 return result;
1122 }
1123 }
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001124
1125 @Override
1126 public void pushGroupMetrics(DeviceId deviceId,
1127 Collection<Group> groupEntries) {
1128 boolean deviceInitialAuditStatus =
1129 deviceInitialAuditStatus(deviceId);
1130 Set<Group> southboundGroupEntries =
1131 Sets.newHashSet(groupEntries);
1132 Set<StoredGroupEntry> storedGroupEntries =
1133 Sets.newHashSet(getStoredGroups(deviceId));
1134 Set<Group> extraneousStoredEntries =
1135 Sets.newHashSet(getExtraneousGroups(deviceId));
1136
1137 log.trace("pushGroupMetrics: Displaying all ({}) southboundGroupEntries for device {}",
1138 southboundGroupEntries.size(),
1139 deviceId);
1140 for (Iterator<Group> it = southboundGroupEntries.iterator(); it.hasNext();) {
1141 Group group = it.next();
1142 log.trace("Group {} in device {}", group, deviceId);
1143 }
1144
1145 log.trace("Displaying all ({}) stored group entries for device {}",
1146 storedGroupEntries.size(),
1147 deviceId);
1148 for (Iterator<StoredGroupEntry> it1 = storedGroupEntries.iterator();
1149 it1.hasNext();) {
1150 Group group = it1.next();
1151 log.trace("Stored Group {} for device {}", group, deviceId);
1152 }
1153
1154 for (Iterator<Group> it2 = southboundGroupEntries.iterator(); it2.hasNext();) {
1155 Group group = it2.next();
1156 if (storedGroupEntries.remove(group)) {
1157 // we both have the group, let's update some info then.
1158 log.trace("Group AUDIT: group {} exists in both planes for device {}",
1159 group.id(), deviceId);
1160 groupAdded(group);
1161 it2.remove();
1162 }
1163 }
1164 for (Group group : southboundGroupEntries) {
1165 if (getGroup(group.deviceId(), group.id()) != null) {
1166 // There is a group existing with the same id
1167 // It is possible that group update is
1168 // in progress while we got a stale info from switch
1169 if (!storedGroupEntries.remove(getGroup(
1170 group.deviceId(), group.id()))) {
1171 log.warn("Group AUDIT: Inconsistent state:"
1172 + "Group exists in ID based table while "
1173 + "not present in key based table");
1174 }
1175 } else {
1176 // there are groups in the switch that aren't in the store
1177 log.debug("Group AUDIT: extraneous group {} exists in data plane for device {}",
1178 group.id(), deviceId);
1179 extraneousStoredEntries.remove(group);
1180 extraneousGroup(group);
1181 }
1182 }
1183 for (Group group : storedGroupEntries) {
1184 // there are groups in the store that aren't in the switch
1185 log.debug("Group AUDIT: group {} missing in data plane for device {}",
1186 group.id(), deviceId);
1187 groupMissing(group);
1188 }
1189 for (Group group : extraneousStoredEntries) {
1190 // there are groups in the extraneous store that
1191 // aren't in the switch
1192 log.debug("Group AUDIT: clearing extransoeus group {} from store for device {}",
1193 group.id(), deviceId);
1194 removeExtraneousGroupEntry(group);
1195 }
1196
1197 if (!deviceInitialAuditStatus) {
1198 log.debug("Group AUDIT: Setting device {} initial AUDIT completed",
1199 deviceId);
1200 deviceInitialAuditCompleted(deviceId, true);
1201 }
1202 }
1203
1204 private void groupMissing(Group group) {
1205 switch (group.state()) {
1206 case PENDING_DELETE:
1207 log.debug("Group {} delete confirmation from device {}",
1208 group, group.deviceId());
1209 removeGroupEntry(group);
1210 break;
1211 case ADDED:
1212 case PENDING_ADD:
Srikanth Vavilapalli5428b6c2015-05-14 20:22:47 -07001213 case PENDING_ADD_RETRY:
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001214 case PENDING_UPDATE:
1215 log.debug("Group {} is in store but not on device {}",
1216 group, group.deviceId());
1217 StoredGroupEntry existing =
1218 getStoredGroupEntry(group.deviceId(), group.id());
Srikanth Vavilapalli5428b6c2015-05-14 20:22:47 -07001219 log.debug("groupMissing: group entry {} in device {} moving from {} to PENDING_ADD_RETRY",
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001220 existing.id(),
1221 existing.deviceId(),
1222 existing.state());
Srikanth Vavilapalli5428b6c2015-05-14 20:22:47 -07001223 existing.setState(Group.GroupState.PENDING_ADD_RETRY);
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001224 //Re-PUT map entries to trigger map update events
1225 getGroupStoreKeyMap().
1226 put(new GroupStoreKeyMapKey(existing.deviceId(),
1227 existing.appCookie()), existing);
1228 notifyDelegate(new GroupEvent(GroupEvent.Type.GROUP_ADD_REQUESTED,
1229 group));
1230 break;
1231 default:
1232 log.debug("Group {} has not been installed.", group);
1233 break;
1234 }
1235 }
1236
1237 private void extraneousGroup(Group group) {
1238 log.debug("Group {} is on device {} but not in store.",
1239 group, group.deviceId());
1240 addOrUpdateExtraneousGroupEntry(group);
1241 }
1242
1243 private void groupAdded(Group group) {
1244 log.trace("Group {} Added or Updated in device {}",
1245 group, group.deviceId());
1246 addOrUpdateGroupEntry(group);
1247 }
alshabib10580802015-02-18 18:30:33 -08001248}