blob: 3ebbf78a81b7efc03d20e6109cec51c344d49442 [file] [log] [blame]
alshabib10580802015-02-18 18:30:33 -08001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.group.impl;
17
Jonathan Hart6ec029a2015-03-24 17:12:35 -070018import com.google.common.collect.FluentIterable;
19import com.google.common.collect.Iterables;
Srikanth Vavilapalli23181912015-05-04 09:48:09 -070020import com.google.common.collect.Sets;
Srikanth Vavilapalli10e75cd2015-04-13 16:21:24 -070021
alshabib10580802015-02-18 18:30:33 -080022import org.apache.felix.scr.annotations.Activate;
23import org.apache.felix.scr.annotations.Component;
24import org.apache.felix.scr.annotations.Deactivate;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070025import org.apache.felix.scr.annotations.Reference;
26import org.apache.felix.scr.annotations.ReferenceCardinality;
alshabib10580802015-02-18 18:30:33 -080027import org.apache.felix.scr.annotations.Service;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070028import org.onlab.util.KryoNamespace;
alshabib10580802015-02-18 18:30:33 -080029import org.onlab.util.NewConcurrentHashMap;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070030import org.onosproject.cluster.ClusterService;
31import org.onosproject.core.DefaultApplicationId;
alshabib10580802015-02-18 18:30:33 -080032import org.onosproject.core.DefaultGroupId;
33import org.onosproject.core.GroupId;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070034import org.onosproject.mastership.MastershipService;
alshabib10580802015-02-18 18:30:33 -080035import org.onosproject.net.DeviceId;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070036import org.onosproject.net.MastershipRole;
37import org.onosproject.net.PortNumber;
38import org.onosproject.net.flow.DefaultTrafficTreatment;
39import org.onosproject.net.flow.FlowRule;
40import org.onosproject.net.flow.instructions.Instructions;
41import org.onosproject.net.flow.instructions.L0ModificationInstruction;
42import org.onosproject.net.flow.instructions.L2ModificationInstruction;
43import org.onosproject.net.flow.instructions.L3ModificationInstruction;
alshabib10580802015-02-18 18:30:33 -080044import org.onosproject.net.group.DefaultGroup;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070045import org.onosproject.net.group.DefaultGroupBucket;
alshabib10580802015-02-18 18:30:33 -080046import org.onosproject.net.group.DefaultGroupDescription;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070047import org.onosproject.net.group.DefaultGroupKey;
alshabib10580802015-02-18 18:30:33 -080048import org.onosproject.net.group.Group;
49import org.onosproject.net.group.Group.GroupState;
50import org.onosproject.net.group.GroupBucket;
51import org.onosproject.net.group.GroupBuckets;
52import org.onosproject.net.group.GroupDescription;
53import org.onosproject.net.group.GroupEvent;
54import org.onosproject.net.group.GroupEvent.Type;
55import org.onosproject.net.group.GroupKey;
56import org.onosproject.net.group.GroupOperation;
57import org.onosproject.net.group.GroupStore;
58import org.onosproject.net.group.GroupStoreDelegate;
Srikanth Vavilapalli10e75cd2015-04-13 16:21:24 -070059import org.onosproject.net.group.StoredGroupBucketEntry;
alshabib10580802015-02-18 18:30:33 -080060import org.onosproject.net.group.StoredGroupEntry;
61import org.onosproject.store.AbstractStore;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070062import org.onosproject.store.Timestamp;
63import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
64import org.onosproject.store.cluster.messaging.ClusterMessage;
65import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
Jonathan Hart63939a32015-05-08 11:57:03 -070066import org.onosproject.store.service.MultiValuedTimestamp;
Srikanth Vavilapalli23181912015-05-04 09:48:09 -070067import org.onosproject.store.serializers.DeviceIdSerializer;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070068import org.onosproject.store.serializers.KryoNamespaces;
Srikanth Vavilapalli23181912015-05-04 09:48:09 -070069import org.onosproject.store.serializers.URISerializer;
Jonathan Hart6ec029a2015-03-24 17:12:35 -070070import org.onosproject.store.service.ClockService;
71import org.onosproject.store.service.EventuallyConsistentMap;
72import org.onosproject.store.service.EventuallyConsistentMapBuilder;
73import org.onosproject.store.service.EventuallyConsistentMapEvent;
74import org.onosproject.store.service.EventuallyConsistentMapListener;
75import org.onosproject.store.service.StorageService;
alshabib10580802015-02-18 18:30:33 -080076import org.slf4j.Logger;
77
Jonathan Hart6ec029a2015-03-24 17:12:35 -070078import java.net.URI;
79import java.util.ArrayList;
Srikanth Vavilapalli23181912015-05-04 09:48:09 -070080import java.util.Collection;
Jonathan Hart6ec029a2015-03-24 17:12:35 -070081import java.util.HashMap;
Srikanth Vavilapalli23181912015-05-04 09:48:09 -070082import java.util.Iterator;
Jonathan Hart6ec029a2015-03-24 17:12:35 -070083import java.util.List;
84import java.util.Objects;
Sho SHIMIZU30d639b2015-05-05 09:30:35 -070085import java.util.Optional;
Srikanth Vavilapalli23181912015-05-04 09:48:09 -070086import java.util.Set;
Jonathan Hart6ec029a2015-03-24 17:12:35 -070087import java.util.concurrent.ConcurrentHashMap;
88import java.util.concurrent.ConcurrentMap;
89import java.util.concurrent.ExecutorService;
90import java.util.concurrent.Executors;
91import java.util.concurrent.atomic.AtomicInteger;
92import java.util.concurrent.atomic.AtomicLong;
93import java.util.stream.Collectors;
94
95import static org.apache.commons.lang3.concurrent.ConcurrentUtils.createIfAbsentUnchecked;
96import static org.onlab.util.Tools.groupedThreads;
97import static org.slf4j.LoggerFactory.getLogger;
alshabib10580802015-02-18 18:30:33 -080098
99/**
100 * Manages inventory of group entries using trivial in-memory implementation.
101 */
102@Component(immediate = true)
103@Service
104public class DistributedGroupStore
105 extends AbstractStore<GroupEvent, GroupStoreDelegate>
106 implements GroupStore {
107
108 private final Logger log = getLogger(getClass());
109
110 private final int dummyId = 0xffffffff;
111 private final GroupId dummyGroupId = new DefaultGroupId(dummyId);
112
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700113 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
114 protected ClusterCommunicationService clusterCommunicator;
115
116 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
117 protected ClusterService clusterService;
118
119 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700120 protected StorageService storageService;
121
122 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700123 protected MastershipService mastershipService;
124
125 // Per device group table with (device id + app cookie) as key
126 private EventuallyConsistentMap<GroupStoreKeyMapKey,
127 StoredGroupEntry> groupStoreEntriesByKey = null;
128 // Per device group table with (device id + group id) as key
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700129 private final ConcurrentMap<DeviceId, ConcurrentMap<GroupId, StoredGroupEntry>>
130 groupEntriesById = new ConcurrentHashMap<>();
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700131 private EventuallyConsistentMap<GroupStoreKeyMapKey,
132 StoredGroupEntry> auditPendingReqQueue = null;
alshabib10580802015-02-18 18:30:33 -0800133 private final ConcurrentMap<DeviceId, ConcurrentMap<GroupId, Group>>
134 extraneousGroupEntriesById = new ConcurrentHashMap<>();
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700135 private ExecutorService messageHandlingExecutor;
136 private static final int MESSAGE_HANDLER_THREAD_POOL_SIZE = 1;
alshabib10580802015-02-18 18:30:33 -0800137
138 private final HashMap<DeviceId, Boolean> deviceAuditStatus =
139 new HashMap<DeviceId, Boolean>();
140
141 private final AtomicInteger groupIdGen = new AtomicInteger();
142
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700143 private KryoNamespace.Builder kryoBuilder = null;
144
alshabib10580802015-02-18 18:30:33 -0800145 @Activate
146 public void activate() {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700147 kryoBuilder = new KryoNamespace.Builder()
148 .register(DefaultGroup.class,
149 DefaultGroupBucket.class,
150 DefaultGroupDescription.class,
151 DefaultGroupKey.class,
152 GroupDescription.Type.class,
153 Group.GroupState.class,
154 GroupBuckets.class,
155 DefaultGroupId.class,
156 GroupStoreMessage.class,
157 GroupStoreMessage.Type.class,
158 UpdateType.class,
159 GroupStoreMessageSubjects.class,
160 MultiValuedTimestamp.class,
161 GroupStoreKeyMapKey.class,
162 GroupStoreIdMapKey.class,
163 GroupStoreMapKey.class
164 )
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700165 .register(new URISerializer(), URI.class)
166 .register(new DeviceIdSerializer(), DeviceId.class)
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700167 .register(PortNumber.class)
168 .register(DefaultApplicationId.class)
169 .register(DefaultTrafficTreatment.class,
170 Instructions.DropInstruction.class,
171 Instructions.OutputInstruction.class,
172 Instructions.GroupInstruction.class,
173 Instructions.TableTypeTransition.class,
174 FlowRule.Type.class,
175 L0ModificationInstruction.class,
176 L0ModificationInstruction.L0SubType.class,
177 L0ModificationInstruction.ModLambdaInstruction.class,
178 L2ModificationInstruction.class,
179 L2ModificationInstruction.L2SubType.class,
180 L2ModificationInstruction.ModEtherInstruction.class,
181 L2ModificationInstruction.PushHeaderInstructions.class,
182 L2ModificationInstruction.ModVlanIdInstruction.class,
183 L2ModificationInstruction.ModVlanPcpInstruction.class,
184 L2ModificationInstruction.ModMplsLabelInstruction.class,
185 L2ModificationInstruction.ModMplsTtlInstruction.class,
186 L3ModificationInstruction.class,
187 L3ModificationInstruction.L3SubType.class,
188 L3ModificationInstruction.ModIPInstruction.class,
189 L3ModificationInstruction.ModIPv6FlowLabelInstruction.class,
190 L3ModificationInstruction.ModTtlInstruction.class,
191 org.onlab.packet.MplsLabel.class
192 )
193 .register(org.onosproject.cluster.NodeId.class)
194 .register(KryoNamespaces.BASIC)
195 .register(KryoNamespaces.MISC);
196
197 messageHandlingExecutor = Executors.
198 newFixedThreadPool(MESSAGE_HANDLER_THREAD_POOL_SIZE,
199 groupedThreads("onos/store/group",
200 "message-handlers"));
201 clusterCommunicator.
202 addSubscriber(GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST,
203 new ClusterGroupMsgHandler(),
204 messageHandlingExecutor);
205
206 log.debug("Creating EC map groupstorekeymap");
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700207 EventuallyConsistentMapBuilder<GroupStoreKeyMapKey, StoredGroupEntry>
208 keyMapBuilder = storageService.eventuallyConsistentMapBuilder();
209
210 groupStoreEntriesByKey = keyMapBuilder
211 .withName("groupstorekeymap")
212 .withSerializer(kryoBuilder)
213 .withClockService(new GroupStoreLogicalClockManager<>())
214 .build();
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700215 groupStoreEntriesByKey.addListener(new GroupStoreKeyMapListener());
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700216 log.debug("Current size of groupstorekeymap:{}",
217 groupStoreEntriesByKey.size());
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700218
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700219 log.debug("Creating EC map pendinggroupkeymap");
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700220 EventuallyConsistentMapBuilder<GroupStoreKeyMapKey, StoredGroupEntry>
221 auditMapBuilder = storageService.eventuallyConsistentMapBuilder();
222
223 auditPendingReqQueue = auditMapBuilder
224 .withName("pendinggroupkeymap")
225 .withSerializer(kryoBuilder)
226 .withClockService(new GroupStoreLogicalClockManager<>())
227 .build();
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700228 log.debug("Current size of pendinggroupkeymap:{}",
229 auditPendingReqQueue.size());
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700230
alshabib10580802015-02-18 18:30:33 -0800231 log.info("Started");
232 }
233
234 @Deactivate
235 public void deactivate() {
Thomas Vachuska152f9fd2015-04-02 16:28:13 -0700236 groupStoreEntriesByKey.destroy();
Thomas Vachuska152f9fd2015-04-02 16:28:13 -0700237 auditPendingReqQueue.destroy();
alshabib10580802015-02-18 18:30:33 -0800238 log.info("Stopped");
239 }
240
alshabib10580802015-02-18 18:30:33 -0800241 private static NewConcurrentHashMap<GroupId, Group>
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700242 lazyEmptyExtraneousGroupIdTable() {
alshabib10580802015-02-18 18:30:33 -0800243 return NewConcurrentHashMap.<GroupId, Group>ifNeeded();
244 }
245
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700246 private static NewConcurrentHashMap<GroupId, StoredGroupEntry>
247 lazyEmptyGroupIdTable() {
248 return NewConcurrentHashMap.<GroupId, StoredGroupEntry>ifNeeded();
249 }
250
alshabib10580802015-02-18 18:30:33 -0800251 /**
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700252 * Returns the group store eventual consistent key map.
alshabib10580802015-02-18 18:30:33 -0800253 *
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700254 * @return Map representing group key table.
alshabib10580802015-02-18 18:30:33 -0800255 */
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700256 private EventuallyConsistentMap<GroupStoreKeyMapKey, StoredGroupEntry>
257 getGroupStoreKeyMap() {
258 return groupStoreEntriesByKey;
alshabib10580802015-02-18 18:30:33 -0800259 }
260
261 /**
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700262 * Returns the group id table for specified device.
alshabib10580802015-02-18 18:30:33 -0800263 *
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700264 * @param deviceId identifier of the device
265 * @return Map representing group key table of given device.
alshabib10580802015-02-18 18:30:33 -0800266 */
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700267 private ConcurrentMap<GroupId, StoredGroupEntry> getGroupIdTable(DeviceId deviceId) {
268 return createIfAbsentUnchecked(groupEntriesById,
269 deviceId, lazyEmptyGroupIdTable());
alshabib10580802015-02-18 18:30:33 -0800270 }
271
272 /**
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700273 * Returns the pending group request table.
alshabib10580802015-02-18 18:30:33 -0800274 *
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700275 * @return Map representing group key table.
alshabib10580802015-02-18 18:30:33 -0800276 */
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700277 private EventuallyConsistentMap<GroupStoreKeyMapKey, StoredGroupEntry>
278 getPendingGroupKeyTable() {
279 return auditPendingReqQueue;
alshabib10580802015-02-18 18:30:33 -0800280 }
281
282 /**
283 * Returns the extraneous group id table for specified device.
284 *
285 * @param deviceId identifier of the device
286 * @return Map representing group key table of given device.
287 */
288 private ConcurrentMap<GroupId, Group>
289 getExtraneousGroupIdTable(DeviceId deviceId) {
290 return createIfAbsentUnchecked(extraneousGroupEntriesById,
291 deviceId,
292 lazyEmptyExtraneousGroupIdTable());
293 }
294
295 /**
296 * Returns the number of groups for the specified device in the store.
297 *
298 * @return number of groups for the specified device
299 */
300 @Override
301 public int getGroupCount(DeviceId deviceId) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700302 return (getGroups(deviceId) != null) ?
303 Iterables.size(getGroups(deviceId)) : 0;
alshabib10580802015-02-18 18:30:33 -0800304 }
305
306 /**
307 * Returns the groups associated with a device.
308 *
309 * @param deviceId the device ID
310 *
311 * @return the group entries
312 */
313 @Override
314 public Iterable<Group> getGroups(DeviceId deviceId) {
315 // flatten and make iterator unmodifiable
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700316 log.debug("getGroups: for device {} total number of groups {}",
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700317 deviceId, getGroupStoreKeyMap().values().size());
318 return FluentIterable.from(getGroupStoreKeyMap().values())
319 .filter(input -> input.deviceId().equals(deviceId))
320 .transform(input -> input);
alshabib10580802015-02-18 18:30:33 -0800321 }
322
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700323 private Iterable<StoredGroupEntry> getStoredGroups(DeviceId deviceId) {
324 // flatten and make iterator unmodifiable
325 log.debug("getGroups: for device {} total number of groups {}",
326 deviceId, getGroupStoreKeyMap().values().size());
327 return FluentIterable.from(getGroupStoreKeyMap().values())
328 .filter(input -> input.deviceId().equals(deviceId));
329 }
330
alshabib10580802015-02-18 18:30:33 -0800331 /**
332 * Returns the stored group entry.
333 *
334 * @param deviceId the device ID
335 * @param appCookie the group key
336 *
337 * @return a group associated with the key
338 */
339 @Override
340 public Group getGroup(DeviceId deviceId, GroupKey appCookie) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700341 return getStoredGroupEntry(deviceId, appCookie);
342 }
343
344 private StoredGroupEntry getStoredGroupEntry(DeviceId deviceId,
345 GroupKey appCookie) {
346 return getGroupStoreKeyMap().get(new GroupStoreKeyMapKey(deviceId,
347 appCookie));
348 }
349
350 @Override
351 public Group getGroup(DeviceId deviceId, GroupId groupId) {
352 return getStoredGroupEntry(deviceId, groupId);
353 }
354
355 private StoredGroupEntry getStoredGroupEntry(DeviceId deviceId,
356 GroupId groupId) {
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700357 return getGroupIdTable(deviceId).get(groupId);
alshabib10580802015-02-18 18:30:33 -0800358 }
359
360 private int getFreeGroupIdValue(DeviceId deviceId) {
361 int freeId = groupIdGen.incrementAndGet();
362
363 while (true) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700364 Group existing = getGroup(deviceId, new DefaultGroupId(freeId));
alshabib10580802015-02-18 18:30:33 -0800365 if (existing == null) {
366 existing = (
367 extraneousGroupEntriesById.get(deviceId) != null) ?
368 extraneousGroupEntriesById.get(deviceId).
369 get(new DefaultGroupId(freeId)) :
370 null;
371 }
372 if (existing != null) {
373 freeId = groupIdGen.incrementAndGet();
374 } else {
375 break;
376 }
377 }
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700378 log.debug("getFreeGroupIdValue: Next Free ID is {}", freeId);
alshabib10580802015-02-18 18:30:33 -0800379 return freeId;
380 }
381
382 /**
383 * Stores a new group entry using the information from group description.
384 *
385 * @param groupDesc group description to be used to create group entry
386 */
387 @Override
388 public void storeGroupDescription(GroupDescription groupDesc) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700389 log.debug("In storeGroupDescription");
alshabib10580802015-02-18 18:30:33 -0800390 // Check if a group is existing with the same key
391 if (getGroup(groupDesc.deviceId(), groupDesc.appCookie()) != null) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700392 log.warn("Group already exists with the same key {}",
393 groupDesc.appCookie());
alshabib10580802015-02-18 18:30:33 -0800394 return;
395 }
396
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700397 // Check if group to be created by a remote instance
398 if (mastershipService.getLocalRole(
399 groupDesc.deviceId()) != MastershipRole.MASTER) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700400 log.debug("storeGroupDescription: Device {} local role is not MASTER",
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700401 groupDesc.deviceId());
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700402 if (mastershipService.getMasterFor(groupDesc.deviceId()) == null) {
403 log.error("No Master for device {}..."
404 + "Can not perform add group operation",
405 groupDesc.deviceId());
406 //TODO: Send Group operation failure event
407 return;
408 }
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700409 GroupStoreMessage groupOp = GroupStoreMessage.
410 createGroupAddRequestMsg(groupDesc.deviceId(),
411 groupDesc);
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700412
413 if (!clusterCommunicator.unicast(groupOp,
414 GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST,
415 m -> kryoBuilder.build().serialize(m),
416 mastershipService.getMasterFor(groupDesc.deviceId()))) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700417 log.warn("Failed to send request to master: {} to {}",
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700418 groupOp,
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700419 mastershipService.getMasterFor(groupDesc.deviceId()));
420 //TODO: Send Group operation failure event
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700421 return;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700422 }
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700423 log.debug("Sent Group operation request for device {} to remote MASTER {}",
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700424 groupDesc.deviceId(),
425 mastershipService.getMasterFor(groupDesc.deviceId()));
alshabib10580802015-02-18 18:30:33 -0800426 return;
427 }
428
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700429 log.debug("Store group for device {} is getting handled locally",
430 groupDesc.deviceId());
alshabib10580802015-02-18 18:30:33 -0800431 storeGroupDescriptionInternal(groupDesc);
432 }
433
434 private void storeGroupDescriptionInternal(GroupDescription groupDesc) {
435 // Check if a group is existing with the same key
436 if (getGroup(groupDesc.deviceId(), groupDesc.appCookie()) != null) {
437 return;
438 }
439
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700440 if (deviceAuditStatus.get(groupDesc.deviceId()) == null) {
441 // Device group audit has not completed yet
442 // Add this group description to pending group key table
443 // Create a group entry object with Dummy Group ID
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700444 log.debug("storeGroupDescriptionInternal: Device {} AUDIT pending...Queuing Group ADD request",
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700445 groupDesc.deviceId());
446 StoredGroupEntry group = new DefaultGroup(dummyGroupId, groupDesc);
447 group.setState(GroupState.WAITING_AUDIT_COMPLETE);
448 EventuallyConsistentMap<GroupStoreKeyMapKey, StoredGroupEntry> pendingKeyTable =
449 getPendingGroupKeyTable();
450 pendingKeyTable.put(new GroupStoreKeyMapKey(groupDesc.deviceId(),
451 groupDesc.appCookie()),
452 group);
453 return;
454 }
455
Saurav Das100e3b82015-04-30 11:12:10 -0700456 GroupId id = null;
457 if (groupDesc.givenGroupId() == null) {
458 // Get a new group identifier
459 id = new DefaultGroupId(getFreeGroupIdValue(groupDesc.deviceId()));
460 } else {
461 id = new DefaultGroupId(groupDesc.givenGroupId());
462 }
alshabib10580802015-02-18 18:30:33 -0800463 // Create a group entry object
464 StoredGroupEntry group = new DefaultGroup(id, groupDesc);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700465 // Insert the newly created group entry into key and id maps
466 getGroupStoreKeyMap().
467 put(new GroupStoreKeyMapKey(groupDesc.deviceId(),
468 groupDesc.appCookie()), group);
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700469 // Ensure it also inserted into group id based table to
470 // avoid any chances of duplication in group id generation
471 getGroupIdTable(groupDesc.deviceId()).
472 put(id, group);
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700473 log.debug("storeGroupDescriptionInternal: Processing Group ADD request for Id {} in device {}",
474 id,
475 groupDesc.deviceId());
alshabib10580802015-02-18 18:30:33 -0800476 notifyDelegate(new GroupEvent(GroupEvent.Type.GROUP_ADD_REQUESTED,
477 group));
478 }
479
480 /**
481 * Updates the existing group entry with the information
482 * from group description.
483 *
484 * @param deviceId the device ID
485 * @param oldAppCookie the current group key
486 * @param type update type
487 * @param newBuckets group buckets for updates
488 * @param newAppCookie optional new group key
489 */
490 @Override
491 public void updateGroupDescription(DeviceId deviceId,
492 GroupKey oldAppCookie,
493 UpdateType type,
494 GroupBuckets newBuckets,
495 GroupKey newAppCookie) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700496 // Check if group update to be done by a remote instance
sangho52abe3a2015-05-05 14:13:34 -0700497 if (mastershipService.getMasterFor(deviceId) != null &&
498 mastershipService.getLocalRole(deviceId) != MastershipRole.MASTER) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700499 log.debug("updateGroupDescription: Device {} local role is not MASTER",
500 deviceId);
501 if (mastershipService.getMasterFor(deviceId) == null) {
502 log.error("No Master for device {}..."
503 + "Can not perform update group operation",
504 deviceId);
505 //TODO: Send Group operation failure event
506 return;
507 }
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700508 GroupStoreMessage groupOp = GroupStoreMessage.
509 createGroupUpdateRequestMsg(deviceId,
510 oldAppCookie,
511 type,
512 newBuckets,
513 newAppCookie);
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700514
515 if (!clusterCommunicator.unicast(groupOp,
516 GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST,
517 m -> kryoBuilder.build().serialize(m),
518 mastershipService.getMasterFor(deviceId))) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700519 log.warn("Failed to send request to master: {} to {}",
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700520 groupOp,
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700521 mastershipService.getMasterFor(deviceId));
522 //TODO: Send Group operation failure event
523 }
524 return;
525 }
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700526 log.debug("updateGroupDescription for device {} is getting handled locally",
527 deviceId);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700528 updateGroupDescriptionInternal(deviceId,
529 oldAppCookie,
530 type,
531 newBuckets,
532 newAppCookie);
533 }
534
535 private void updateGroupDescriptionInternal(DeviceId deviceId,
536 GroupKey oldAppCookie,
537 UpdateType type,
538 GroupBuckets newBuckets,
539 GroupKey newAppCookie) {
alshabib10580802015-02-18 18:30:33 -0800540 // Check if a group is existing with the provided key
541 Group oldGroup = getGroup(deviceId, oldAppCookie);
542 if (oldGroup == null) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700543 log.warn("updateGroupDescriptionInternal: Group not found...strange");
alshabib10580802015-02-18 18:30:33 -0800544 return;
545 }
546
547 List<GroupBucket> newBucketList = getUpdatedBucketList(oldGroup,
548 type,
549 newBuckets);
550 if (newBucketList != null) {
551 // Create a new group object from the old group
552 GroupBuckets updatedBuckets = new GroupBuckets(newBucketList);
553 GroupKey newCookie = (newAppCookie != null) ? newAppCookie : oldAppCookie;
554 GroupDescription updatedGroupDesc = new DefaultGroupDescription(
555 oldGroup.deviceId(),
556 oldGroup.type(),
557 updatedBuckets,
558 newCookie,
Saurav Das100e3b82015-04-30 11:12:10 -0700559 oldGroup.givenGroupId(),
alshabib10580802015-02-18 18:30:33 -0800560 oldGroup.appId());
561 StoredGroupEntry newGroup = new DefaultGroup(oldGroup.id(),
562 updatedGroupDesc);
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700563 log.debug("updateGroupDescriptionInternal: group entry {} in device {} moving from {} to PENDING_UPDATE",
564 oldGroup.id(),
565 oldGroup.deviceId(),
566 oldGroup.state());
alshabib10580802015-02-18 18:30:33 -0800567 newGroup.setState(GroupState.PENDING_UPDATE);
568 newGroup.setLife(oldGroup.life());
569 newGroup.setPackets(oldGroup.packets());
570 newGroup.setBytes(oldGroup.bytes());
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700571 //Update the group entry in groupkey based map.
572 //Update to groupid based map will happen in the
573 //groupkey based map update listener
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700574 log.debug("updateGroupDescriptionInternal with type {}: Group updated with buckets",
575 type);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700576 getGroupStoreKeyMap().
577 put(new GroupStoreKeyMapKey(newGroup.deviceId(),
578 newGroup.appCookie()), newGroup);
alshabib10580802015-02-18 18:30:33 -0800579 notifyDelegate(new GroupEvent(Type.GROUP_UPDATE_REQUESTED, newGroup));
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700580 } else {
581 log.warn("updateGroupDescriptionInternal with type {}: No "
582 + "change in the buckets in update", type);
alshabib10580802015-02-18 18:30:33 -0800583 }
584 }
585
586 private List<GroupBucket> getUpdatedBucketList(Group oldGroup,
587 UpdateType type,
588 GroupBuckets buckets) {
589 GroupBuckets oldBuckets = oldGroup.buckets();
590 List<GroupBucket> newBucketList = new ArrayList<GroupBucket>(
591 oldBuckets.buckets());
592 boolean groupDescUpdated = false;
593
594 if (type == UpdateType.ADD) {
595 // Check if the any of the new buckets are part of
596 // the old bucket list
597 for (GroupBucket addBucket:buckets.buckets()) {
598 if (!newBucketList.contains(addBucket)) {
599 newBucketList.add(addBucket);
600 groupDescUpdated = true;
601 }
602 }
603 } else if (type == UpdateType.REMOVE) {
604 // Check if the to be removed buckets are part of the
605 // old bucket list
606 for (GroupBucket removeBucket:buckets.buckets()) {
607 if (newBucketList.contains(removeBucket)) {
608 newBucketList.remove(removeBucket);
609 groupDescUpdated = true;
610 }
611 }
612 }
613
614 if (groupDescUpdated) {
615 return newBucketList;
616 } else {
617 return null;
618 }
619 }
620
621 /**
622 * Triggers deleting the existing group entry.
623 *
624 * @param deviceId the device ID
625 * @param appCookie the group key
626 */
627 @Override
628 public void deleteGroupDescription(DeviceId deviceId,
629 GroupKey appCookie) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700630 // Check if group to be deleted by a remote instance
631 if (mastershipService.
632 getLocalRole(deviceId) != MastershipRole.MASTER) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700633 log.debug("deleteGroupDescription: Device {} local role is not MASTER",
634 deviceId);
635 if (mastershipService.getMasterFor(deviceId) == null) {
636 log.error("No Master for device {}..."
637 + "Can not perform delete group operation",
638 deviceId);
639 //TODO: Send Group operation failure event
640 return;
641 }
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700642 GroupStoreMessage groupOp = GroupStoreMessage.
643 createGroupDeleteRequestMsg(deviceId,
644 appCookie);
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700645
646 if (!clusterCommunicator.unicast(groupOp,
647 GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST,
648 m -> kryoBuilder.build().serialize(m),
649 mastershipService.getMasterFor(deviceId))) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700650 log.warn("Failed to send request to master: {} to {}",
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700651 groupOp,
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700652 mastershipService.getMasterFor(deviceId));
653 //TODO: Send Group operation failure event
654 }
655 return;
656 }
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700657 log.debug("deleteGroupDescription in device {} is getting handled locally",
658 deviceId);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700659 deleteGroupDescriptionInternal(deviceId, appCookie);
660 }
661
662 private void deleteGroupDescriptionInternal(DeviceId deviceId,
663 GroupKey appCookie) {
alshabib10580802015-02-18 18:30:33 -0800664 // Check if a group is existing with the provided key
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700665 StoredGroupEntry existing = getStoredGroupEntry(deviceId, appCookie);
alshabib10580802015-02-18 18:30:33 -0800666 if (existing == null) {
667 return;
668 }
669
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700670 log.debug("deleteGroupDescriptionInternal: group entry {} in device {} moving from {} to PENDING_DELETE",
671 existing.id(),
672 existing.deviceId(),
673 existing.state());
alshabib10580802015-02-18 18:30:33 -0800674 synchronized (existing) {
675 existing.setState(GroupState.PENDING_DELETE);
676 }
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700677 log.debug("deleteGroupDescriptionInternal: in device {} issuing GROUP_REMOVE_REQUESTED",
678 deviceId);
alshabib10580802015-02-18 18:30:33 -0800679 notifyDelegate(new GroupEvent(Type.GROUP_REMOVE_REQUESTED, existing));
680 }
681
682 /**
683 * Stores a new group entry, or updates an existing entry.
684 *
685 * @param group group entry
686 */
687 @Override
688 public void addOrUpdateGroupEntry(Group group) {
689 // check if this new entry is an update to an existing entry
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700690 StoredGroupEntry existing = getStoredGroupEntry(group.deviceId(),
691 group.id());
alshabib10580802015-02-18 18:30:33 -0800692 GroupEvent event = null;
693
694 if (existing != null) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700695 log.debug("addOrUpdateGroupEntry: updating group entry {} in device {}",
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700696 group.id(),
697 group.deviceId());
alshabib10580802015-02-18 18:30:33 -0800698 synchronized (existing) {
Srikanth Vavilapalli10e75cd2015-04-13 16:21:24 -0700699 for (GroupBucket bucket:group.buckets().buckets()) {
Sho SHIMIZU30d639b2015-05-05 09:30:35 -0700700 Optional<GroupBucket> matchingBucket =
Srikanth Vavilapalli10e75cd2015-04-13 16:21:24 -0700701 existing.buckets().buckets()
702 .stream()
703 .filter((existingBucket)->(existingBucket.equals(bucket)))
704 .findFirst();
705 if (matchingBucket.isPresent()) {
706 ((StoredGroupBucketEntry) matchingBucket.
707 get()).setPackets(bucket.packets());
708 ((StoredGroupBucketEntry) matchingBucket.
709 get()).setBytes(bucket.bytes());
710 } else {
711 log.warn("addOrUpdateGroupEntry: No matching "
712 + "buckets to update stats");
713 }
714 }
alshabib10580802015-02-18 18:30:33 -0800715 existing.setLife(group.life());
716 existing.setPackets(group.packets());
717 existing.setBytes(group.bytes());
Srikanth Vavilapalli5428b6c2015-05-14 20:22:47 -0700718 if ((existing.state() == GroupState.PENDING_ADD) ||
719 (existing.state() == GroupState.PENDING_ADD_RETRY)) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700720 log.debug("addOrUpdateGroupEntry: group entry {} in device {} moving from {} to ADDED",
721 existing.id(),
722 existing.deviceId(),
Srikanth Vavilapalli5428b6c2015-05-14 20:22:47 -0700723 existing.state());
alshabib10580802015-02-18 18:30:33 -0800724 existing.setState(GroupState.ADDED);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700725 existing.setIsGroupStateAddedFirstTime(true);
alshabib10580802015-02-18 18:30:33 -0800726 event = new GroupEvent(Type.GROUP_ADDED, existing);
727 } else {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700728 log.debug("addOrUpdateGroupEntry: group entry {} in device {} moving from {} to ADDED",
729 existing.id(),
730 existing.deviceId(),
731 GroupState.PENDING_UPDATE);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700732 existing.setState(GroupState.ADDED);
733 existing.setIsGroupStateAddedFirstTime(false);
alshabib10580802015-02-18 18:30:33 -0800734 event = new GroupEvent(Type.GROUP_UPDATED, existing);
735 }
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700736 //Re-PUT map entries to trigger map update events
737 getGroupStoreKeyMap().
738 put(new GroupStoreKeyMapKey(existing.deviceId(),
739 existing.appCookie()), existing);
alshabib10580802015-02-18 18:30:33 -0800740 }
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700741 } else {
742 log.warn("addOrUpdateGroupEntry: Group update "
743 + "happening for a non-existing entry in the map");
alshabib10580802015-02-18 18:30:33 -0800744 }
745
746 if (event != null) {
747 notifyDelegate(event);
748 }
749 }
750
751 /**
752 * Removes the group entry from store.
753 *
754 * @param group group entry
755 */
756 @Override
757 public void removeGroupEntry(Group group) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700758 StoredGroupEntry existing = getStoredGroupEntry(group.deviceId(),
759 group.id());
alshabib10580802015-02-18 18:30:33 -0800760
761 if (existing != null) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700762 log.debug("removeGroupEntry: removing group entry {} in device {}",
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700763 group.id(),
764 group.deviceId());
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700765 //Removal from groupid based map will happen in the
766 //map update listener
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700767 getGroupStoreKeyMap().remove(new GroupStoreKeyMapKey(existing.deviceId(),
768 existing.appCookie()));
alshabib10580802015-02-18 18:30:33 -0800769 notifyDelegate(new GroupEvent(Type.GROUP_REMOVED, existing));
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700770 } else {
771 log.warn("removeGroupEntry for {} in device{} is "
772 + "not existing in our maps",
773 group.id(),
774 group.deviceId());
alshabib10580802015-02-18 18:30:33 -0800775 }
776 }
777
778 @Override
779 public void deviceInitialAuditCompleted(DeviceId deviceId,
780 boolean completed) {
781 synchronized (deviceAuditStatus) {
782 if (completed) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700783 log.debug("AUDIT completed for device {}",
784 deviceId);
alshabib10580802015-02-18 18:30:33 -0800785 deviceAuditStatus.put(deviceId, true);
786 // Execute all pending group requests
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700787 List<StoredGroupEntry> pendingGroupRequests =
788 getPendingGroupKeyTable().values()
789 .stream()
790 .filter(g-> g.deviceId().equals(deviceId))
791 .collect(Collectors.toList());
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700792 log.debug("processing pending group add requests for device {} and number of pending requests {}",
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700793 deviceId,
794 pendingGroupRequests.size());
795 for (Group group:pendingGroupRequests) {
alshabib10580802015-02-18 18:30:33 -0800796 GroupDescription tmp = new DefaultGroupDescription(
797 group.deviceId(),
798 group.type(),
799 group.buckets(),
800 group.appCookie(),
Saurav Das100e3b82015-04-30 11:12:10 -0700801 group.givenGroupId(),
alshabib10580802015-02-18 18:30:33 -0800802 group.appId());
803 storeGroupDescriptionInternal(tmp);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700804 getPendingGroupKeyTable().
805 remove(new GroupStoreKeyMapKey(deviceId, group.appCookie()));
alshabib10580802015-02-18 18:30:33 -0800806 }
alshabib10580802015-02-18 18:30:33 -0800807 } else {
Thomas Vachuskac40d4632015-04-09 16:55:03 -0700808 Boolean audited = deviceAuditStatus.get(deviceId);
809 if (audited != null && audited) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700810 log.debug("Clearing AUDIT status for device {}", deviceId);
alshabib10580802015-02-18 18:30:33 -0800811 deviceAuditStatus.put(deviceId, false);
812 }
813 }
814 }
815 }
816
817 @Override
818 public boolean deviceInitialAuditStatus(DeviceId deviceId) {
819 synchronized (deviceAuditStatus) {
Thomas Vachuskac40d4632015-04-09 16:55:03 -0700820 Boolean audited = deviceAuditStatus.get(deviceId);
821 return audited != null && audited;
alshabib10580802015-02-18 18:30:33 -0800822 }
823 }
824
825 @Override
826 public void groupOperationFailed(DeviceId deviceId, GroupOperation operation) {
827
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700828 StoredGroupEntry existing = getStoredGroupEntry(deviceId,
829 operation.groupId());
alshabib10580802015-02-18 18:30:33 -0800830
831 if (existing == null) {
832 log.warn("No group entry with ID {} found ", operation.groupId());
833 return;
834 }
835
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700836 log.warn("groupOperationFailed: group operation {} failed"
837 + "for group {} in device {}",
838 operation.opType(),
839 existing.id(),
840 existing.deviceId());
alshabib10580802015-02-18 18:30:33 -0800841 switch (operation.opType()) {
842 case ADD:
Srikanth Vavilapalli5428b6c2015-05-14 20:22:47 -0700843 if (existing.state() == GroupState.PENDING_ADD) {
844 //TODO: Need to add support for passing the group
845 //operation failure reason from group provider.
846 //If the error type is anything other than GROUP_EXISTS,
847 //then the GROUP_ADD_FAILED event should be raised even
848 //in PENDING_ADD_RETRY state also.
849 notifyDelegate(new GroupEvent(Type.GROUP_ADD_FAILED, existing));
850 log.warn("groupOperationFailed: cleaningup "
851 + "group {} from store in device {}....",
852 existing.id(),
853 existing.deviceId());
854 //Removal from groupid based map will happen in the
855 //map update listener
856 getGroupStoreKeyMap().remove(new GroupStoreKeyMapKey(existing.deviceId(),
857 existing.appCookie()));
858 }
alshabib10580802015-02-18 18:30:33 -0800859 break;
860 case MODIFY:
861 notifyDelegate(new GroupEvent(Type.GROUP_UPDATE_FAILED, existing));
862 break;
863 case DELETE:
864 notifyDelegate(new GroupEvent(Type.GROUP_REMOVE_FAILED, existing));
865 break;
866 default:
867 log.warn("Unknown group operation type {}", operation.opType());
868 }
alshabib10580802015-02-18 18:30:33 -0800869 }
870
871 @Override
872 public void addOrUpdateExtraneousGroupEntry(Group group) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700873 log.debug("add/update extraneous group entry {} in device {}",
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700874 group.id(),
875 group.deviceId());
alshabib10580802015-02-18 18:30:33 -0800876 ConcurrentMap<GroupId, Group> extraneousIdTable =
877 getExtraneousGroupIdTable(group.deviceId());
878 extraneousIdTable.put(group.id(), group);
879 // Check the reference counter
880 if (group.referenceCount() == 0) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700881 log.debug("Flow reference counter is zero and triggering remove",
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700882 group.id(),
883 group.deviceId());
alshabib10580802015-02-18 18:30:33 -0800884 notifyDelegate(new GroupEvent(Type.GROUP_REMOVE_REQUESTED, group));
885 }
886 }
887
888 @Override
889 public void removeExtraneousGroupEntry(Group group) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700890 log.debug("remove extraneous group entry {} of device {} from store",
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700891 group.id(),
892 group.deviceId());
alshabib10580802015-02-18 18:30:33 -0800893 ConcurrentMap<GroupId, Group> extraneousIdTable =
894 getExtraneousGroupIdTable(group.deviceId());
895 extraneousIdTable.remove(group.id());
896 }
897
898 @Override
899 public Iterable<Group> getExtraneousGroups(DeviceId deviceId) {
900 // flatten and make iterator unmodifiable
901 return FluentIterable.from(
902 getExtraneousGroupIdTable(deviceId).values());
903 }
904
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700905 /**
906 * ClockService that generates wallclock based timestamps.
907 */
908 private class GroupStoreLogicalClockManager<T, U>
909 implements ClockService<T, U> {
alshabib10580802015-02-18 18:30:33 -0800910
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700911 private final AtomicLong sequenceNumber = new AtomicLong(0);
912
913 @Override
914 public Timestamp getTimestamp(T t1, U u1) {
915 return new MultiValuedTimestamp<>(System.currentTimeMillis(),
916 sequenceNumber.getAndIncrement());
917 }
918 }
919
920 /**
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700921 * Map handler to receive any events when the group key map is updated.
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700922 */
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700923 private class GroupStoreKeyMapListener implements
924 EventuallyConsistentMapListener<GroupStoreKeyMapKey, StoredGroupEntry> {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700925
926 @Override
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700927 public void event(EventuallyConsistentMapEvent<GroupStoreKeyMapKey,
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700928 StoredGroupEntry> mapEvent) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700929 GroupEvent groupEvent = null;
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700930 GroupStoreKeyMapKey key = mapEvent.key();
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700931 StoredGroupEntry group = mapEvent.value();
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700932 if ((key == null) && (group == null)) {
933 log.error("GroupStoreKeyMapListener: Received "
934 + "event {} with null entry", mapEvent.type());
935 return;
936 } else if (group == null) {
937 group = getGroupIdTable(key.deviceId()).values()
938 .stream()
939 .filter((storedGroup) -> (storedGroup.appCookie().equals(key.appCookie)))
940 .findFirst().get();
941 if (group == null) {
942 log.error("GroupStoreKeyMapListener: Received "
943 + "event {} with null entry... can not process", mapEvent.type());
944 return;
945 }
946 }
947 log.trace("received groupid map event {} for id {} in device {}",
948 mapEvent.type(),
949 group.id(),
950 key.deviceId());
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700951 if (mapEvent.type() == EventuallyConsistentMapEvent.Type.PUT) {
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700952 // Update the group ID table
953 getGroupIdTable(group.deviceId()).put(group.id(), group);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700954 if (mapEvent.value().state() == Group.GroupState.ADDED) {
955 if (mapEvent.value().isGroupStateAddedFirstTime()) {
956 groupEvent = new GroupEvent(Type.GROUP_ADDED,
957 mapEvent.value());
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700958 log.trace("Received first time GROUP_ADDED state update for id {} in device {}",
959 group.id(),
960 group.deviceId());
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700961 } else {
962 groupEvent = new GroupEvent(Type.GROUP_UPDATED,
963 mapEvent.value());
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700964 log.trace("Received following GROUP_ADDED state update for id {} in device {}",
965 group.id(),
966 group.deviceId());
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700967 }
968 }
969 } else if (mapEvent.type() == EventuallyConsistentMapEvent.Type.REMOVE) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700970 groupEvent = new GroupEvent(Type.GROUP_REMOVED, group);
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700971 // Remove the entry from the group ID table
972 getGroupIdTable(group.deviceId()).remove(group.id(), group);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700973 }
974
975 if (groupEvent != null) {
976 notifyDelegate(groupEvent);
977 }
978 }
979 }
980 /**
981 * Message handler to receive messages from group subsystems of
982 * other cluster members.
983 */
984 private final class ClusterGroupMsgHandler
985 implements ClusterMessageHandler {
986 @Override
987 public void handle(ClusterMessage message) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700988 if (message.subject().equals(
989 GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST)) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700990 GroupStoreMessage groupOp = kryoBuilder.
991 build().deserialize(message.payload());
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700992 log.debug("received remote group operation {} request for device {}",
993 groupOp.type(),
994 groupOp.deviceId());
995 if (mastershipService.
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700996 getLocalRole(groupOp.deviceId()) !=
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700997 MastershipRole.MASTER) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700998 log.warn("ClusterGroupMsgHandler: This node is not "
999 + "MASTER for device {}", groupOp.deviceId());
1000 return;
1001 }
1002 if (groupOp.type() == GroupStoreMessage.Type.ADD) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001003 storeGroupDescriptionInternal(groupOp.groupDesc());
1004 } else if (groupOp.type() == GroupStoreMessage.Type.UPDATE) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001005 updateGroupDescriptionInternal(groupOp.deviceId(),
1006 groupOp.appCookie(),
1007 groupOp.updateType(),
1008 groupOp.updateBuckets(),
1009 groupOp.newAppCookie());
1010 } else if (groupOp.type() == GroupStoreMessage.Type.DELETE) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001011 deleteGroupDescriptionInternal(groupOp.deviceId(),
1012 groupOp.appCookie());
1013 }
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001014 } else {
1015 log.warn("ClusterGroupMsgHandler: Unknown remote message type {}",
1016 message.subject());
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001017 }
1018 }
1019 }
1020
1021 /**
1022 * Flattened map key to be used to store group entries.
1023 */
1024 private class GroupStoreMapKey {
1025 private final DeviceId deviceId;
1026
1027 public GroupStoreMapKey(DeviceId deviceId) {
1028 this.deviceId = deviceId;
1029 }
1030
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001031 public DeviceId deviceId() {
1032 return deviceId;
1033 }
1034
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001035 @Override
1036 public boolean equals(Object o) {
1037 if (this == o) {
1038 return true;
1039 }
1040 if (!(o instanceof GroupStoreMapKey)) {
1041 return false;
1042 }
1043 GroupStoreMapKey that = (GroupStoreMapKey) o;
1044 return this.deviceId.equals(that.deviceId);
1045 }
1046
1047 @Override
1048 public int hashCode() {
1049 int result = 17;
1050
1051 result = 31 * result + Objects.hash(this.deviceId);
1052
1053 return result;
1054 }
1055 }
1056
1057 private class GroupStoreKeyMapKey extends GroupStoreMapKey {
1058 private final GroupKey appCookie;
1059 public GroupStoreKeyMapKey(DeviceId deviceId,
1060 GroupKey appCookie) {
1061 super(deviceId);
1062 this.appCookie = appCookie;
1063 }
1064
1065 @Override
1066 public boolean equals(Object o) {
1067 if (this == o) {
1068 return true;
1069 }
1070 if (!(o instanceof GroupStoreKeyMapKey)) {
1071 return false;
1072 }
1073 GroupStoreKeyMapKey that = (GroupStoreKeyMapKey) o;
1074 return (super.equals(that) &&
1075 this.appCookie.equals(that.appCookie));
1076 }
1077
1078 @Override
1079 public int hashCode() {
1080 int result = 17;
1081
1082 result = 31 * result + super.hashCode() + Objects.hash(this.appCookie);
1083
1084 return result;
1085 }
1086 }
1087
1088 private class GroupStoreIdMapKey extends GroupStoreMapKey {
1089 private final GroupId groupId;
1090 public GroupStoreIdMapKey(DeviceId deviceId,
1091 GroupId groupId) {
1092 super(deviceId);
1093 this.groupId = groupId;
1094 }
1095
1096 @Override
1097 public boolean equals(Object o) {
1098 if (this == o) {
1099 return true;
1100 }
1101 if (!(o instanceof GroupStoreIdMapKey)) {
1102 return false;
1103 }
1104 GroupStoreIdMapKey that = (GroupStoreIdMapKey) o;
1105 return (super.equals(that) &&
1106 this.groupId.equals(that.groupId));
1107 }
1108
1109 @Override
1110 public int hashCode() {
1111 int result = 17;
1112
1113 result = 31 * result + super.hashCode() + Objects.hash(this.groupId);
1114
1115 return result;
1116 }
1117 }
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001118
1119 @Override
1120 public void pushGroupMetrics(DeviceId deviceId,
1121 Collection<Group> groupEntries) {
1122 boolean deviceInitialAuditStatus =
1123 deviceInitialAuditStatus(deviceId);
1124 Set<Group> southboundGroupEntries =
1125 Sets.newHashSet(groupEntries);
1126 Set<StoredGroupEntry> storedGroupEntries =
1127 Sets.newHashSet(getStoredGroups(deviceId));
1128 Set<Group> extraneousStoredEntries =
1129 Sets.newHashSet(getExtraneousGroups(deviceId));
1130
1131 log.trace("pushGroupMetrics: Displaying all ({}) southboundGroupEntries for device {}",
1132 southboundGroupEntries.size(),
1133 deviceId);
1134 for (Iterator<Group> it = southboundGroupEntries.iterator(); it.hasNext();) {
1135 Group group = it.next();
1136 log.trace("Group {} in device {}", group, deviceId);
1137 }
1138
1139 log.trace("Displaying all ({}) stored group entries for device {}",
1140 storedGroupEntries.size(),
1141 deviceId);
1142 for (Iterator<StoredGroupEntry> it1 = storedGroupEntries.iterator();
1143 it1.hasNext();) {
1144 Group group = it1.next();
1145 log.trace("Stored Group {} for device {}", group, deviceId);
1146 }
1147
1148 for (Iterator<Group> it2 = southboundGroupEntries.iterator(); it2.hasNext();) {
1149 Group group = it2.next();
1150 if (storedGroupEntries.remove(group)) {
1151 // we both have the group, let's update some info then.
1152 log.trace("Group AUDIT: group {} exists in both planes for device {}",
1153 group.id(), deviceId);
1154 groupAdded(group);
1155 it2.remove();
1156 }
1157 }
1158 for (Group group : southboundGroupEntries) {
1159 if (getGroup(group.deviceId(), group.id()) != null) {
1160 // There is a group existing with the same id
1161 // It is possible that group update is
1162 // in progress while we got a stale info from switch
1163 if (!storedGroupEntries.remove(getGroup(
1164 group.deviceId(), group.id()))) {
1165 log.warn("Group AUDIT: Inconsistent state:"
1166 + "Group exists in ID based table while "
1167 + "not present in key based table");
1168 }
1169 } else {
1170 // there are groups in the switch that aren't in the store
1171 log.debug("Group AUDIT: extraneous group {} exists in data plane for device {}",
1172 group.id(), deviceId);
1173 extraneousStoredEntries.remove(group);
1174 extraneousGroup(group);
1175 }
1176 }
1177 for (Group group : storedGroupEntries) {
1178 // there are groups in the store that aren't in the switch
1179 log.debug("Group AUDIT: group {} missing in data plane for device {}",
1180 group.id(), deviceId);
1181 groupMissing(group);
1182 }
1183 for (Group group : extraneousStoredEntries) {
1184 // there are groups in the extraneous store that
1185 // aren't in the switch
1186 log.debug("Group AUDIT: clearing extransoeus group {} from store for device {}",
1187 group.id(), deviceId);
1188 removeExtraneousGroupEntry(group);
1189 }
1190
1191 if (!deviceInitialAuditStatus) {
1192 log.debug("Group AUDIT: Setting device {} initial AUDIT completed",
1193 deviceId);
1194 deviceInitialAuditCompleted(deviceId, true);
1195 }
1196 }
1197
1198 private void groupMissing(Group group) {
1199 switch (group.state()) {
1200 case PENDING_DELETE:
1201 log.debug("Group {} delete confirmation from device {}",
1202 group, group.deviceId());
1203 removeGroupEntry(group);
1204 break;
1205 case ADDED:
1206 case PENDING_ADD:
Srikanth Vavilapalli5428b6c2015-05-14 20:22:47 -07001207 case PENDING_ADD_RETRY:
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001208 case PENDING_UPDATE:
1209 log.debug("Group {} is in store but not on device {}",
1210 group, group.deviceId());
1211 StoredGroupEntry existing =
1212 getStoredGroupEntry(group.deviceId(), group.id());
Srikanth Vavilapalli5428b6c2015-05-14 20:22:47 -07001213 log.debug("groupMissing: group entry {} in device {} moving from {} to PENDING_ADD_RETRY",
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001214 existing.id(),
1215 existing.deviceId(),
1216 existing.state());
Srikanth Vavilapalli5428b6c2015-05-14 20:22:47 -07001217 existing.setState(Group.GroupState.PENDING_ADD_RETRY);
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001218 //Re-PUT map entries to trigger map update events
1219 getGroupStoreKeyMap().
1220 put(new GroupStoreKeyMapKey(existing.deviceId(),
1221 existing.appCookie()), existing);
1222 notifyDelegate(new GroupEvent(GroupEvent.Type.GROUP_ADD_REQUESTED,
1223 group));
1224 break;
1225 default:
1226 log.debug("Group {} has not been installed.", group);
1227 break;
1228 }
1229 }
1230
1231 private void extraneousGroup(Group group) {
1232 log.debug("Group {} is on device {} but not in store.",
1233 group, group.deviceId());
1234 addOrUpdateExtraneousGroupEntry(group);
1235 }
1236
1237 private void groupAdded(Group group) {
1238 log.trace("Group {} Added or Updated in device {}",
1239 group, group.deviceId());
1240 addOrUpdateGroupEntry(group);
1241 }
alshabib10580802015-02-18 18:30:33 -08001242}