blob: 8b2db26a76c33b82bf26803bdc18098184bacdbd [file] [log] [blame]
alshabib10580802015-02-18 18:30:33 -08001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.group.impl;
17
Jonathan Hart6ec029a2015-03-24 17:12:35 -070018import com.google.common.collect.FluentIterable;
19import com.google.common.collect.Iterables;
Srikanth Vavilapalli23181912015-05-04 09:48:09 -070020import com.google.common.collect.Sets;
Srikanth Vavilapalli10e75cd2015-04-13 16:21:24 -070021
alshabib10580802015-02-18 18:30:33 -080022import org.apache.felix.scr.annotations.Activate;
23import org.apache.felix.scr.annotations.Component;
24import org.apache.felix.scr.annotations.Deactivate;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070025import org.apache.felix.scr.annotations.Reference;
26import org.apache.felix.scr.annotations.ReferenceCardinality;
alshabib10580802015-02-18 18:30:33 -080027import org.apache.felix.scr.annotations.Service;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070028import org.onlab.util.KryoNamespace;
alshabib10580802015-02-18 18:30:33 -080029import org.onlab.util.NewConcurrentHashMap;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070030import org.onosproject.cluster.ClusterService;
31import org.onosproject.core.DefaultApplicationId;
alshabib10580802015-02-18 18:30:33 -080032import org.onosproject.core.DefaultGroupId;
33import org.onosproject.core.GroupId;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070034import org.onosproject.mastership.MastershipService;
alshabib10580802015-02-18 18:30:33 -080035import org.onosproject.net.DeviceId;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070036import org.onosproject.net.MastershipRole;
37import org.onosproject.net.PortNumber;
38import org.onosproject.net.flow.DefaultTrafficTreatment;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070039import org.onosproject.net.flow.instructions.Instructions;
40import org.onosproject.net.flow.instructions.L0ModificationInstruction;
41import org.onosproject.net.flow.instructions.L2ModificationInstruction;
42import org.onosproject.net.flow.instructions.L3ModificationInstruction;
alshabib10580802015-02-18 18:30:33 -080043import org.onosproject.net.group.DefaultGroup;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070044import org.onosproject.net.group.DefaultGroupBucket;
alshabib10580802015-02-18 18:30:33 -080045import org.onosproject.net.group.DefaultGroupDescription;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070046import org.onosproject.net.group.DefaultGroupKey;
alshabib10580802015-02-18 18:30:33 -080047import org.onosproject.net.group.Group;
48import org.onosproject.net.group.Group.GroupState;
49import org.onosproject.net.group.GroupBucket;
50import org.onosproject.net.group.GroupBuckets;
51import org.onosproject.net.group.GroupDescription;
52import org.onosproject.net.group.GroupEvent;
53import org.onosproject.net.group.GroupEvent.Type;
54import org.onosproject.net.group.GroupKey;
55import org.onosproject.net.group.GroupOperation;
56import org.onosproject.net.group.GroupStore;
57import org.onosproject.net.group.GroupStoreDelegate;
Srikanth Vavilapalli10e75cd2015-04-13 16:21:24 -070058import org.onosproject.net.group.StoredGroupBucketEntry;
alshabib10580802015-02-18 18:30:33 -080059import org.onosproject.net.group.StoredGroupEntry;
60import org.onosproject.store.AbstractStore;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070061import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
Jonathan Hart63939a32015-05-08 11:57:03 -070062import org.onosproject.store.service.MultiValuedTimestamp;
Srikanth Vavilapalli23181912015-05-04 09:48:09 -070063import org.onosproject.store.serializers.DeviceIdSerializer;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070064import org.onosproject.store.serializers.KryoNamespaces;
Srikanth Vavilapalli23181912015-05-04 09:48:09 -070065import org.onosproject.store.serializers.URISerializer;
Jonathan Hart6ec029a2015-03-24 17:12:35 -070066import org.onosproject.store.service.EventuallyConsistentMap;
67import org.onosproject.store.service.EventuallyConsistentMapBuilder;
68import org.onosproject.store.service.EventuallyConsistentMapEvent;
69import org.onosproject.store.service.EventuallyConsistentMapListener;
70import org.onosproject.store.service.StorageService;
alshabib10580802015-02-18 18:30:33 -080071import org.slf4j.Logger;
72
Jonathan Hart6ec029a2015-03-24 17:12:35 -070073import java.net.URI;
74import java.util.ArrayList;
Srikanth Vavilapalli23181912015-05-04 09:48:09 -070075import java.util.Collection;
Jonathan Hart6ec029a2015-03-24 17:12:35 -070076import java.util.HashMap;
Srikanth Vavilapalli23181912015-05-04 09:48:09 -070077import java.util.Iterator;
Jonathan Hart6ec029a2015-03-24 17:12:35 -070078import java.util.List;
79import java.util.Objects;
Sho SHIMIZU30d639b2015-05-05 09:30:35 -070080import java.util.Optional;
Srikanth Vavilapalli23181912015-05-04 09:48:09 -070081import java.util.Set;
Jonathan Hart6ec029a2015-03-24 17:12:35 -070082import java.util.concurrent.ConcurrentHashMap;
83import java.util.concurrent.ConcurrentMap;
84import java.util.concurrent.ExecutorService;
85import java.util.concurrent.Executors;
86import java.util.concurrent.atomic.AtomicInteger;
87import java.util.concurrent.atomic.AtomicLong;
88import java.util.stream.Collectors;
89
90import static org.apache.commons.lang3.concurrent.ConcurrentUtils.createIfAbsentUnchecked;
91import static org.onlab.util.Tools.groupedThreads;
92import static org.slf4j.LoggerFactory.getLogger;
alshabib10580802015-02-18 18:30:33 -080093
94/**
95 * Manages inventory of group entries using trivial in-memory implementation.
96 */
97@Component(immediate = true)
98@Service
99public class DistributedGroupStore
100 extends AbstractStore<GroupEvent, GroupStoreDelegate>
101 implements GroupStore {
102
103 private final Logger log = getLogger(getClass());
104
105 private final int dummyId = 0xffffffff;
106 private final GroupId dummyGroupId = new DefaultGroupId(dummyId);
107
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700108 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
109 protected ClusterCommunicationService clusterCommunicator;
110
111 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
112 protected ClusterService clusterService;
113
114 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700115 protected StorageService storageService;
116
117 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700118 protected MastershipService mastershipService;
119
120 // Per device group table with (device id + app cookie) as key
121 private EventuallyConsistentMap<GroupStoreKeyMapKey,
122 StoredGroupEntry> groupStoreEntriesByKey = null;
123 // Per device group table with (device id + group id) as key
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700124 private final ConcurrentMap<DeviceId, ConcurrentMap<GroupId, StoredGroupEntry>>
125 groupEntriesById = new ConcurrentHashMap<>();
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700126 private EventuallyConsistentMap<GroupStoreKeyMapKey,
127 StoredGroupEntry> auditPendingReqQueue = null;
alshabib10580802015-02-18 18:30:33 -0800128 private final ConcurrentMap<DeviceId, ConcurrentMap<GroupId, Group>>
129 extraneousGroupEntriesById = new ConcurrentHashMap<>();
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700130 private ExecutorService messageHandlingExecutor;
131 private static final int MESSAGE_HANDLER_THREAD_POOL_SIZE = 1;
alshabib10580802015-02-18 18:30:33 -0800132
Sho SHIMIZU7a4087b2015-09-10 09:23:16 -0700133 private final HashMap<DeviceId, Boolean> deviceAuditStatus = new HashMap<>();
alshabib10580802015-02-18 18:30:33 -0800134
135 private final AtomicInteger groupIdGen = new AtomicInteger();
136
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700137 private KryoNamespace.Builder kryoBuilder = null;
138
Madan Jampanibcf1a482015-06-24 19:05:56 -0700139 private final AtomicLong sequenceNumber = new AtomicLong(0);
140
alshabib10580802015-02-18 18:30:33 -0800141 @Activate
142 public void activate() {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700143 kryoBuilder = new KryoNamespace.Builder()
144 .register(DefaultGroup.class,
145 DefaultGroupBucket.class,
146 DefaultGroupDescription.class,
147 DefaultGroupKey.class,
148 GroupDescription.Type.class,
149 Group.GroupState.class,
150 GroupBuckets.class,
151 DefaultGroupId.class,
152 GroupStoreMessage.class,
153 GroupStoreMessage.Type.class,
154 UpdateType.class,
155 GroupStoreMessageSubjects.class,
156 MultiValuedTimestamp.class,
157 GroupStoreKeyMapKey.class,
158 GroupStoreIdMapKey.class,
159 GroupStoreMapKey.class
160 )
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700161 .register(new URISerializer(), URI.class)
162 .register(new DeviceIdSerializer(), DeviceId.class)
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700163 .register(PortNumber.class)
164 .register(DefaultApplicationId.class)
165 .register(DefaultTrafficTreatment.class,
166 Instructions.DropInstruction.class,
167 Instructions.OutputInstruction.class,
168 Instructions.GroupInstruction.class,
169 Instructions.TableTypeTransition.class,
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700170 L0ModificationInstruction.class,
171 L0ModificationInstruction.L0SubType.class,
172 L0ModificationInstruction.ModLambdaInstruction.class,
173 L2ModificationInstruction.class,
174 L2ModificationInstruction.L2SubType.class,
175 L2ModificationInstruction.ModEtherInstruction.class,
176 L2ModificationInstruction.PushHeaderInstructions.class,
177 L2ModificationInstruction.ModVlanIdInstruction.class,
178 L2ModificationInstruction.ModVlanPcpInstruction.class,
179 L2ModificationInstruction.ModMplsLabelInstruction.class,
180 L2ModificationInstruction.ModMplsTtlInstruction.class,
181 L3ModificationInstruction.class,
182 L3ModificationInstruction.L3SubType.class,
183 L3ModificationInstruction.ModIPInstruction.class,
184 L3ModificationInstruction.ModIPv6FlowLabelInstruction.class,
185 L3ModificationInstruction.ModTtlInstruction.class,
186 org.onlab.packet.MplsLabel.class
187 )
188 .register(org.onosproject.cluster.NodeId.class)
189 .register(KryoNamespaces.BASIC)
190 .register(KryoNamespaces.MISC);
191
192 messageHandlingExecutor = Executors.
193 newFixedThreadPool(MESSAGE_HANDLER_THREAD_POOL_SIZE,
194 groupedThreads("onos/store/group",
195 "message-handlers"));
Madan Jampani01e05fb2015-08-13 13:29:36 -0700196
197 clusterCommunicator.addSubscriber(GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST,
198 kryoBuilder.build()::deserialize,
199 this::process,
200 messageHandlingExecutor);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700201
202 log.debug("Creating EC map groupstorekeymap");
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700203 EventuallyConsistentMapBuilder<GroupStoreKeyMapKey, StoredGroupEntry>
204 keyMapBuilder = storageService.eventuallyConsistentMapBuilder();
205
206 groupStoreEntriesByKey = keyMapBuilder
207 .withName("groupstorekeymap")
208 .withSerializer(kryoBuilder)
Madan Jampanibcf1a482015-06-24 19:05:56 -0700209 .withTimestampProvider((k, v) -> new MultiValuedTimestamp<>(System.currentTimeMillis(),
210 sequenceNumber.getAndIncrement()))
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700211 .build();
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700212 groupStoreEntriesByKey.addListener(new GroupStoreKeyMapListener());
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700213 log.debug("Current size of groupstorekeymap:{}",
214 groupStoreEntriesByKey.size());
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700215
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700216 log.debug("Creating EC map pendinggroupkeymap");
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700217 EventuallyConsistentMapBuilder<GroupStoreKeyMapKey, StoredGroupEntry>
218 auditMapBuilder = storageService.eventuallyConsistentMapBuilder();
219
220 auditPendingReqQueue = auditMapBuilder
221 .withName("pendinggroupkeymap")
222 .withSerializer(kryoBuilder)
Madan Jampanibcf1a482015-06-24 19:05:56 -0700223 .withTimestampProvider((k, v) -> new MultiValuedTimestamp<>(System.currentTimeMillis(),
224 sequenceNumber.getAndIncrement()))
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700225 .build();
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700226 log.debug("Current size of pendinggroupkeymap:{}",
227 auditPendingReqQueue.size());
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700228
alshabib10580802015-02-18 18:30:33 -0800229 log.info("Started");
230 }
231
232 @Deactivate
233 public void deactivate() {
Thomas Vachuska152f9fd2015-04-02 16:28:13 -0700234 groupStoreEntriesByKey.destroy();
Thomas Vachuska152f9fd2015-04-02 16:28:13 -0700235 auditPendingReqQueue.destroy();
alshabib10580802015-02-18 18:30:33 -0800236 log.info("Stopped");
237 }
238
alshabib10580802015-02-18 18:30:33 -0800239 private static NewConcurrentHashMap<GroupId, Group>
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700240 lazyEmptyExtraneousGroupIdTable() {
alshabib10580802015-02-18 18:30:33 -0800241 return NewConcurrentHashMap.<GroupId, Group>ifNeeded();
242 }
243
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700244 private static NewConcurrentHashMap<GroupId, StoredGroupEntry>
245 lazyEmptyGroupIdTable() {
246 return NewConcurrentHashMap.<GroupId, StoredGroupEntry>ifNeeded();
247 }
248
alshabib10580802015-02-18 18:30:33 -0800249 /**
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700250 * Returns the group store eventual consistent key map.
alshabib10580802015-02-18 18:30:33 -0800251 *
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700252 * @return Map representing group key table.
alshabib10580802015-02-18 18:30:33 -0800253 */
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700254 private EventuallyConsistentMap<GroupStoreKeyMapKey, StoredGroupEntry>
255 getGroupStoreKeyMap() {
256 return groupStoreEntriesByKey;
alshabib10580802015-02-18 18:30:33 -0800257 }
258
259 /**
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700260 * Returns the group id table for specified device.
alshabib10580802015-02-18 18:30:33 -0800261 *
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700262 * @param deviceId identifier of the device
263 * @return Map representing group key table of given device.
alshabib10580802015-02-18 18:30:33 -0800264 */
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700265 private ConcurrentMap<GroupId, StoredGroupEntry> getGroupIdTable(DeviceId deviceId) {
266 return createIfAbsentUnchecked(groupEntriesById,
267 deviceId, lazyEmptyGroupIdTable());
alshabib10580802015-02-18 18:30:33 -0800268 }
269
270 /**
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700271 * Returns the pending group request table.
alshabib10580802015-02-18 18:30:33 -0800272 *
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700273 * @return Map representing group key table.
alshabib10580802015-02-18 18:30:33 -0800274 */
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700275 private EventuallyConsistentMap<GroupStoreKeyMapKey, StoredGroupEntry>
276 getPendingGroupKeyTable() {
277 return auditPendingReqQueue;
alshabib10580802015-02-18 18:30:33 -0800278 }
279
280 /**
281 * Returns the extraneous group id table for specified device.
282 *
283 * @param deviceId identifier of the device
284 * @return Map representing group key table of given device.
285 */
286 private ConcurrentMap<GroupId, Group>
287 getExtraneousGroupIdTable(DeviceId deviceId) {
288 return createIfAbsentUnchecked(extraneousGroupEntriesById,
289 deviceId,
290 lazyEmptyExtraneousGroupIdTable());
291 }
292
293 /**
294 * Returns the number of groups for the specified device in the store.
295 *
296 * @return number of groups for the specified device
297 */
298 @Override
299 public int getGroupCount(DeviceId deviceId) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700300 return (getGroups(deviceId) != null) ?
301 Iterables.size(getGroups(deviceId)) : 0;
alshabib10580802015-02-18 18:30:33 -0800302 }
303
304 /**
305 * Returns the groups associated with a device.
306 *
307 * @param deviceId the device ID
308 *
309 * @return the group entries
310 */
311 @Override
312 public Iterable<Group> getGroups(DeviceId deviceId) {
313 // flatten and make iterator unmodifiable
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700314 return FluentIterable.from(getGroupStoreKeyMap().values())
315 .filter(input -> input.deviceId().equals(deviceId))
316 .transform(input -> input);
alshabib10580802015-02-18 18:30:33 -0800317 }
318
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700319 private Iterable<StoredGroupEntry> getStoredGroups(DeviceId deviceId) {
320 // flatten and make iterator unmodifiable
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700321 return FluentIterable.from(getGroupStoreKeyMap().values())
322 .filter(input -> input.deviceId().equals(deviceId));
323 }
324
alshabib10580802015-02-18 18:30:33 -0800325 /**
326 * Returns the stored group entry.
327 *
328 * @param deviceId the device ID
329 * @param appCookie the group key
330 *
331 * @return a group associated with the key
332 */
333 @Override
334 public Group getGroup(DeviceId deviceId, GroupKey appCookie) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700335 return getStoredGroupEntry(deviceId, appCookie);
336 }
337
338 private StoredGroupEntry getStoredGroupEntry(DeviceId deviceId,
339 GroupKey appCookie) {
340 return getGroupStoreKeyMap().get(new GroupStoreKeyMapKey(deviceId,
341 appCookie));
342 }
343
344 @Override
345 public Group getGroup(DeviceId deviceId, GroupId groupId) {
346 return getStoredGroupEntry(deviceId, groupId);
347 }
348
349 private StoredGroupEntry getStoredGroupEntry(DeviceId deviceId,
350 GroupId groupId) {
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700351 return getGroupIdTable(deviceId).get(groupId);
alshabib10580802015-02-18 18:30:33 -0800352 }
353
354 private int getFreeGroupIdValue(DeviceId deviceId) {
355 int freeId = groupIdGen.incrementAndGet();
356
357 while (true) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700358 Group existing = getGroup(deviceId, new DefaultGroupId(freeId));
alshabib10580802015-02-18 18:30:33 -0800359 if (existing == null) {
360 existing = (
361 extraneousGroupEntriesById.get(deviceId) != null) ?
362 extraneousGroupEntriesById.get(deviceId).
363 get(new DefaultGroupId(freeId)) :
364 null;
365 }
366 if (existing != null) {
367 freeId = groupIdGen.incrementAndGet();
368 } else {
369 break;
370 }
371 }
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700372 log.debug("getFreeGroupIdValue: Next Free ID is {}", freeId);
alshabib10580802015-02-18 18:30:33 -0800373 return freeId;
374 }
375
376 /**
377 * Stores a new group entry using the information from group description.
378 *
379 * @param groupDesc group description to be used to create group entry
380 */
381 @Override
382 public void storeGroupDescription(GroupDescription groupDesc) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700383 log.debug("In storeGroupDescription");
alshabib10580802015-02-18 18:30:33 -0800384 // Check if a group is existing with the same key
385 if (getGroup(groupDesc.deviceId(), groupDesc.appCookie()) != null) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700386 log.warn("Group already exists with the same key {}",
387 groupDesc.appCookie());
alshabib10580802015-02-18 18:30:33 -0800388 return;
389 }
390
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700391 // Check if group to be created by a remote instance
Madan Jampani175e8fd2015-05-20 14:10:45 -0700392 if (mastershipService.getLocalRole(groupDesc.deviceId()) != MastershipRole.MASTER) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700393 log.debug("storeGroupDescription: Device {} local role is not MASTER",
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700394 groupDesc.deviceId());
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700395 if (mastershipService.getMasterFor(groupDesc.deviceId()) == null) {
396 log.error("No Master for device {}..."
397 + "Can not perform add group operation",
398 groupDesc.deviceId());
399 //TODO: Send Group operation failure event
400 return;
401 }
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700402 GroupStoreMessage groupOp = GroupStoreMessage.
403 createGroupAddRequestMsg(groupDesc.deviceId(),
404 groupDesc);
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700405
Madan Jampani175e8fd2015-05-20 14:10:45 -0700406 clusterCommunicator.unicast(groupOp,
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700407 GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST,
408 m -> kryoBuilder.build().serialize(m),
Madan Jampani175e8fd2015-05-20 14:10:45 -0700409 mastershipService.getMasterFor(groupDesc.deviceId())).whenComplete((result, error) -> {
410 if (error != null) {
411 log.warn("Failed to send request to master: {} to {}",
412 groupOp,
413 mastershipService.getMasterFor(groupDesc.deviceId()));
414 //TODO: Send Group operation failure event
415 } else {
416 log.debug("Sent Group operation request for device {} "
417 + "to remote MASTER {}",
418 groupDesc.deviceId(),
419 mastershipService.getMasterFor(groupDesc.deviceId()));
420 }
421 });
alshabib10580802015-02-18 18:30:33 -0800422 return;
423 }
424
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700425 log.debug("Store group for device {} is getting handled locally",
426 groupDesc.deviceId());
alshabib10580802015-02-18 18:30:33 -0800427 storeGroupDescriptionInternal(groupDesc);
428 }
429
Srikanth Vavilapallie48b3cf2015-07-06 11:43:07 -0700430 private Group getMatchingExtraneousGroupbyId(DeviceId deviceId, Integer groupId) {
431 ConcurrentMap<GroupId, Group> extraneousMap =
432 extraneousGroupEntriesById.get(deviceId);
433 if (extraneousMap == null) {
434 return null;
435 }
436 return extraneousMap.get(new DefaultGroupId(groupId));
437 }
438
439 private Group getMatchingExtraneousGroupbyBuckets(DeviceId deviceId,
440 GroupBuckets buckets) {
441 ConcurrentMap<GroupId, Group> extraneousMap =
442 extraneousGroupEntriesById.get(deviceId);
443 if (extraneousMap == null) {
444 return null;
445 }
446
447 for (Group extraneousGroup:extraneousMap.values()) {
448 if (extraneousGroup.buckets().equals(buckets)) {
449 return extraneousGroup;
450 }
451 }
452 return null;
453 }
454
alshabib10580802015-02-18 18:30:33 -0800455 private void storeGroupDescriptionInternal(GroupDescription groupDesc) {
456 // Check if a group is existing with the same key
457 if (getGroup(groupDesc.deviceId(), groupDesc.appCookie()) != null) {
458 return;
459 }
460
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700461 if (deviceAuditStatus.get(groupDesc.deviceId()) == null) {
462 // Device group audit has not completed yet
463 // Add this group description to pending group key table
464 // Create a group entry object with Dummy Group ID
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700465 log.debug("storeGroupDescriptionInternal: Device {} AUDIT pending...Queuing Group ADD request",
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700466 groupDesc.deviceId());
467 StoredGroupEntry group = new DefaultGroup(dummyGroupId, groupDesc);
468 group.setState(GroupState.WAITING_AUDIT_COMPLETE);
469 EventuallyConsistentMap<GroupStoreKeyMapKey, StoredGroupEntry> pendingKeyTable =
470 getPendingGroupKeyTable();
471 pendingKeyTable.put(new GroupStoreKeyMapKey(groupDesc.deviceId(),
472 groupDesc.appCookie()),
473 group);
474 return;
475 }
476
Srikanth Vavilapallie48b3cf2015-07-06 11:43:07 -0700477 Group matchingExtraneousGroup = null;
478 if (groupDesc.givenGroupId() != null) {
479 //Check if there is a extraneous group existing with the same Id
480 matchingExtraneousGroup = getMatchingExtraneousGroupbyId(
481 groupDesc.deviceId(), groupDesc.givenGroupId());
482 if (matchingExtraneousGroup != null) {
483 log.debug("storeGroupDescriptionInternal: Matching extraneous group found in Device {} for group id {}",
484 groupDesc.deviceId(),
485 groupDesc.givenGroupId());
486 //Check if the group buckets matches with user provided buckets
487 if (matchingExtraneousGroup.buckets().equals(groupDesc.buckets())) {
488 //Group is already existing with the same buckets and Id
489 // Create a group entry object
490 log.debug("storeGroupDescriptionInternal: Buckets also matching in Device {} for group id {}",
491 groupDesc.deviceId(),
492 groupDesc.givenGroupId());
493 StoredGroupEntry group = new DefaultGroup(
494 matchingExtraneousGroup.id(), groupDesc);
495 // Insert the newly created group entry into key and id maps
496 getGroupStoreKeyMap().
497 put(new GroupStoreKeyMapKey(groupDesc.deviceId(),
498 groupDesc.appCookie()), group);
499 // Ensure it also inserted into group id based table to
500 // avoid any chances of duplication in group id generation
501 getGroupIdTable(groupDesc.deviceId()).
502 put(matchingExtraneousGroup.id(), group);
503 addOrUpdateGroupEntry(matchingExtraneousGroup);
504 removeExtraneousGroupEntry(matchingExtraneousGroup);
505 return;
506 } else {
507 //Group buckets are not matching. Update group
508 //with user provided buckets.
509 //TODO
510 log.debug("storeGroupDescriptionInternal: Buckets are not matching in Device {} for group id {}",
511 groupDesc.deviceId(),
512 groupDesc.givenGroupId());
513 }
514 }
515 } else {
516 //Check if there is an extraneous group with user provided buckets
517 matchingExtraneousGroup = getMatchingExtraneousGroupbyBuckets(
518 groupDesc.deviceId(), groupDesc.buckets());
519 if (matchingExtraneousGroup != null) {
520 //Group is already existing with the same buckets.
521 //So reuse this group.
522 log.debug("storeGroupDescriptionInternal: Matching extraneous group found in Device {}",
523 groupDesc.deviceId());
524 //Create a group entry object
525 StoredGroupEntry group = new DefaultGroup(
526 matchingExtraneousGroup.id(), groupDesc);
527 // Insert the newly created group entry into key and id maps
528 getGroupStoreKeyMap().
529 put(new GroupStoreKeyMapKey(groupDesc.deviceId(),
530 groupDesc.appCookie()), group);
531 // Ensure it also inserted into group id based table to
532 // avoid any chances of duplication in group id generation
533 getGroupIdTable(groupDesc.deviceId()).
534 put(matchingExtraneousGroup.id(), group);
535 addOrUpdateGroupEntry(matchingExtraneousGroup);
536 removeExtraneousGroupEntry(matchingExtraneousGroup);
537 return;
538 } else {
539 //TODO: Check if there are any empty groups that can be used here
540 log.debug("storeGroupDescriptionInternal: No matching extraneous groups found in Device {}",
541 groupDesc.deviceId());
542 }
543 }
544
Saurav Das100e3b82015-04-30 11:12:10 -0700545 GroupId id = null;
546 if (groupDesc.givenGroupId() == null) {
547 // Get a new group identifier
548 id = new DefaultGroupId(getFreeGroupIdValue(groupDesc.deviceId()));
549 } else {
550 id = new DefaultGroupId(groupDesc.givenGroupId());
551 }
alshabib10580802015-02-18 18:30:33 -0800552 // Create a group entry object
553 StoredGroupEntry group = new DefaultGroup(id, groupDesc);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700554 // Insert the newly created group entry into key and id maps
555 getGroupStoreKeyMap().
556 put(new GroupStoreKeyMapKey(groupDesc.deviceId(),
557 groupDesc.appCookie()), group);
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700558 // Ensure it also inserted into group id based table to
559 // avoid any chances of duplication in group id generation
560 getGroupIdTable(groupDesc.deviceId()).
561 put(id, group);
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700562 log.debug("storeGroupDescriptionInternal: Processing Group ADD request for Id {} in device {}",
563 id,
564 groupDesc.deviceId());
alshabib10580802015-02-18 18:30:33 -0800565 notifyDelegate(new GroupEvent(GroupEvent.Type.GROUP_ADD_REQUESTED,
566 group));
567 }
568
569 /**
570 * Updates the existing group entry with the information
571 * from group description.
572 *
573 * @param deviceId the device ID
574 * @param oldAppCookie the current group key
575 * @param type update type
576 * @param newBuckets group buckets for updates
577 * @param newAppCookie optional new group key
578 */
579 @Override
580 public void updateGroupDescription(DeviceId deviceId,
581 GroupKey oldAppCookie,
582 UpdateType type,
583 GroupBuckets newBuckets,
584 GroupKey newAppCookie) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700585 // Check if group update to be done by a remote instance
sangho52abe3a2015-05-05 14:13:34 -0700586 if (mastershipService.getMasterFor(deviceId) != null &&
587 mastershipService.getLocalRole(deviceId) != MastershipRole.MASTER) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700588 log.debug("updateGroupDescription: Device {} local role is not MASTER",
589 deviceId);
590 if (mastershipService.getMasterFor(deviceId) == null) {
591 log.error("No Master for device {}..."
592 + "Can not perform update group operation",
593 deviceId);
594 //TODO: Send Group operation failure event
595 return;
596 }
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700597 GroupStoreMessage groupOp = GroupStoreMessage.
598 createGroupUpdateRequestMsg(deviceId,
599 oldAppCookie,
600 type,
601 newBuckets,
602 newAppCookie);
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700603
Madan Jampani175e8fd2015-05-20 14:10:45 -0700604 clusterCommunicator.unicast(groupOp,
605 GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST,
606 m -> kryoBuilder.build().serialize(m),
607 mastershipService.getMasterFor(deviceId)).whenComplete((result, error) -> {
608 if (error != null) {
609 log.warn("Failed to send request to master: {} to {}",
610 groupOp,
611 mastershipService.getMasterFor(deviceId), error);
612 }
613 //TODO: Send Group operation failure event
614 });
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700615 return;
616 }
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700617 log.debug("updateGroupDescription for device {} is getting handled locally",
618 deviceId);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700619 updateGroupDescriptionInternal(deviceId,
620 oldAppCookie,
621 type,
622 newBuckets,
623 newAppCookie);
624 }
625
626 private void updateGroupDescriptionInternal(DeviceId deviceId,
627 GroupKey oldAppCookie,
628 UpdateType type,
629 GroupBuckets newBuckets,
630 GroupKey newAppCookie) {
alshabib10580802015-02-18 18:30:33 -0800631 // Check if a group is existing with the provided key
632 Group oldGroup = getGroup(deviceId, oldAppCookie);
633 if (oldGroup == null) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700634 log.warn("updateGroupDescriptionInternal: Group not found...strange");
alshabib10580802015-02-18 18:30:33 -0800635 return;
636 }
637
638 List<GroupBucket> newBucketList = getUpdatedBucketList(oldGroup,
639 type,
640 newBuckets);
641 if (newBucketList != null) {
642 // Create a new group object from the old group
643 GroupBuckets updatedBuckets = new GroupBuckets(newBucketList);
644 GroupKey newCookie = (newAppCookie != null) ? newAppCookie : oldAppCookie;
645 GroupDescription updatedGroupDesc = new DefaultGroupDescription(
646 oldGroup.deviceId(),
647 oldGroup.type(),
648 updatedBuckets,
649 newCookie,
Saurav Das100e3b82015-04-30 11:12:10 -0700650 oldGroup.givenGroupId(),
alshabib10580802015-02-18 18:30:33 -0800651 oldGroup.appId());
652 StoredGroupEntry newGroup = new DefaultGroup(oldGroup.id(),
653 updatedGroupDesc);
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700654 log.debug("updateGroupDescriptionInternal: group entry {} in device {} moving from {} to PENDING_UPDATE",
655 oldGroup.id(),
656 oldGroup.deviceId(),
657 oldGroup.state());
alshabib10580802015-02-18 18:30:33 -0800658 newGroup.setState(GroupState.PENDING_UPDATE);
659 newGroup.setLife(oldGroup.life());
660 newGroup.setPackets(oldGroup.packets());
661 newGroup.setBytes(oldGroup.bytes());
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700662 //Update the group entry in groupkey based map.
663 //Update to groupid based map will happen in the
664 //groupkey based map update listener
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700665 log.debug("updateGroupDescriptionInternal with type {}: Group updated with buckets",
666 type);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700667 getGroupStoreKeyMap().
668 put(new GroupStoreKeyMapKey(newGroup.deviceId(),
669 newGroup.appCookie()), newGroup);
alshabib10580802015-02-18 18:30:33 -0800670 notifyDelegate(new GroupEvent(Type.GROUP_UPDATE_REQUESTED, newGroup));
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700671 } else {
672 log.warn("updateGroupDescriptionInternal with type {}: No "
673 + "change in the buckets in update", type);
alshabib10580802015-02-18 18:30:33 -0800674 }
675 }
676
677 private List<GroupBucket> getUpdatedBucketList(Group oldGroup,
678 UpdateType type,
679 GroupBuckets buckets) {
680 GroupBuckets oldBuckets = oldGroup.buckets();
Sho SHIMIZU7a4087b2015-09-10 09:23:16 -0700681 List<GroupBucket> newBucketList = new ArrayList<>(oldBuckets.buckets());
alshabib10580802015-02-18 18:30:33 -0800682 boolean groupDescUpdated = false;
683
684 if (type == UpdateType.ADD) {
685 // Check if the any of the new buckets are part of
686 // the old bucket list
687 for (GroupBucket addBucket:buckets.buckets()) {
688 if (!newBucketList.contains(addBucket)) {
689 newBucketList.add(addBucket);
690 groupDescUpdated = true;
691 }
692 }
693 } else if (type == UpdateType.REMOVE) {
694 // Check if the to be removed buckets are part of the
695 // old bucket list
696 for (GroupBucket removeBucket:buckets.buckets()) {
697 if (newBucketList.contains(removeBucket)) {
698 newBucketList.remove(removeBucket);
699 groupDescUpdated = true;
700 }
701 }
702 }
703
704 if (groupDescUpdated) {
705 return newBucketList;
706 } else {
707 return null;
708 }
709 }
710
711 /**
712 * Triggers deleting the existing group entry.
713 *
714 * @param deviceId the device ID
715 * @param appCookie the group key
716 */
717 @Override
718 public void deleteGroupDescription(DeviceId deviceId,
719 GroupKey appCookie) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700720 // Check if group to be deleted by a remote instance
721 if (mastershipService.
722 getLocalRole(deviceId) != MastershipRole.MASTER) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700723 log.debug("deleteGroupDescription: Device {} local role is not MASTER",
724 deviceId);
725 if (mastershipService.getMasterFor(deviceId) == null) {
726 log.error("No Master for device {}..."
727 + "Can not perform delete group operation",
728 deviceId);
729 //TODO: Send Group operation failure event
730 return;
731 }
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700732 GroupStoreMessage groupOp = GroupStoreMessage.
733 createGroupDeleteRequestMsg(deviceId,
734 appCookie);
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700735
Madan Jampani175e8fd2015-05-20 14:10:45 -0700736 clusterCommunicator.unicast(groupOp,
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700737 GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST,
738 m -> kryoBuilder.build().serialize(m),
Madan Jampani175e8fd2015-05-20 14:10:45 -0700739 mastershipService.getMasterFor(deviceId)).whenComplete((result, error) -> {
740 if (error != null) {
741 log.warn("Failed to send request to master: {} to {}",
742 groupOp,
743 mastershipService.getMasterFor(deviceId), error);
744 }
745 //TODO: Send Group operation failure event
746 });
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700747 return;
748 }
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700749 log.debug("deleteGroupDescription in device {} is getting handled locally",
750 deviceId);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700751 deleteGroupDescriptionInternal(deviceId, appCookie);
752 }
753
754 private void deleteGroupDescriptionInternal(DeviceId deviceId,
755 GroupKey appCookie) {
alshabib10580802015-02-18 18:30:33 -0800756 // Check if a group is existing with the provided key
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700757 StoredGroupEntry existing = getStoredGroupEntry(deviceId, appCookie);
alshabib10580802015-02-18 18:30:33 -0800758 if (existing == null) {
759 return;
760 }
761
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700762 log.debug("deleteGroupDescriptionInternal: group entry {} in device {} moving from {} to PENDING_DELETE",
763 existing.id(),
764 existing.deviceId(),
765 existing.state());
alshabib10580802015-02-18 18:30:33 -0800766 synchronized (existing) {
767 existing.setState(GroupState.PENDING_DELETE);
768 }
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700769 log.debug("deleteGroupDescriptionInternal: in device {} issuing GROUP_REMOVE_REQUESTED",
770 deviceId);
alshabib10580802015-02-18 18:30:33 -0800771 notifyDelegate(new GroupEvent(Type.GROUP_REMOVE_REQUESTED, existing));
772 }
773
774 /**
775 * Stores a new group entry, or updates an existing entry.
776 *
777 * @param group group entry
778 */
779 @Override
780 public void addOrUpdateGroupEntry(Group group) {
781 // check if this new entry is an update to an existing entry
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700782 StoredGroupEntry existing = getStoredGroupEntry(group.deviceId(),
783 group.id());
alshabib10580802015-02-18 18:30:33 -0800784 GroupEvent event = null;
785
786 if (existing != null) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700787 log.debug("addOrUpdateGroupEntry: updating group entry {} in device {}",
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700788 group.id(),
789 group.deviceId());
alshabib10580802015-02-18 18:30:33 -0800790 synchronized (existing) {
Srikanth Vavilapalli10e75cd2015-04-13 16:21:24 -0700791 for (GroupBucket bucket:group.buckets().buckets()) {
Sho SHIMIZU30d639b2015-05-05 09:30:35 -0700792 Optional<GroupBucket> matchingBucket =
Srikanth Vavilapalli10e75cd2015-04-13 16:21:24 -0700793 existing.buckets().buckets()
794 .stream()
795 .filter((existingBucket)->(existingBucket.equals(bucket)))
796 .findFirst();
797 if (matchingBucket.isPresent()) {
798 ((StoredGroupBucketEntry) matchingBucket.
799 get()).setPackets(bucket.packets());
800 ((StoredGroupBucketEntry) matchingBucket.
801 get()).setBytes(bucket.bytes());
802 } else {
803 log.warn("addOrUpdateGroupEntry: No matching "
804 + "buckets to update stats");
805 }
806 }
alshabib10580802015-02-18 18:30:33 -0800807 existing.setLife(group.life());
808 existing.setPackets(group.packets());
809 existing.setBytes(group.bytes());
Srikanth Vavilapalli5428b6c2015-05-14 20:22:47 -0700810 if ((existing.state() == GroupState.PENDING_ADD) ||
811 (existing.state() == GroupState.PENDING_ADD_RETRY)) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700812 log.debug("addOrUpdateGroupEntry: group entry {} in device {} moving from {} to ADDED",
813 existing.id(),
814 existing.deviceId(),
Srikanth Vavilapalli5428b6c2015-05-14 20:22:47 -0700815 existing.state());
alshabib10580802015-02-18 18:30:33 -0800816 existing.setState(GroupState.ADDED);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700817 existing.setIsGroupStateAddedFirstTime(true);
alshabib10580802015-02-18 18:30:33 -0800818 event = new GroupEvent(Type.GROUP_ADDED, existing);
819 } else {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700820 log.debug("addOrUpdateGroupEntry: group entry {} in device {} moving from {} to ADDED",
821 existing.id(),
822 existing.deviceId(),
823 GroupState.PENDING_UPDATE);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700824 existing.setState(GroupState.ADDED);
825 existing.setIsGroupStateAddedFirstTime(false);
alshabib10580802015-02-18 18:30:33 -0800826 event = new GroupEvent(Type.GROUP_UPDATED, existing);
827 }
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700828 //Re-PUT map entries to trigger map update events
829 getGroupStoreKeyMap().
830 put(new GroupStoreKeyMapKey(existing.deviceId(),
831 existing.appCookie()), existing);
alshabib10580802015-02-18 18:30:33 -0800832 }
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700833 } else {
834 log.warn("addOrUpdateGroupEntry: Group update "
835 + "happening for a non-existing entry in the map");
alshabib10580802015-02-18 18:30:33 -0800836 }
837
838 if (event != null) {
839 notifyDelegate(event);
840 }
841 }
842
843 /**
844 * Removes the group entry from store.
845 *
846 * @param group group entry
847 */
848 @Override
849 public void removeGroupEntry(Group group) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700850 StoredGroupEntry existing = getStoredGroupEntry(group.deviceId(),
851 group.id());
alshabib10580802015-02-18 18:30:33 -0800852
853 if (existing != null) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700854 log.debug("removeGroupEntry: removing group entry {} in device {}",
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700855 group.id(),
856 group.deviceId());
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700857 //Removal from groupid based map will happen in the
858 //map update listener
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700859 getGroupStoreKeyMap().remove(new GroupStoreKeyMapKey(existing.deviceId(),
860 existing.appCookie()));
alshabib10580802015-02-18 18:30:33 -0800861 notifyDelegate(new GroupEvent(Type.GROUP_REMOVED, existing));
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700862 } else {
863 log.warn("removeGroupEntry for {} in device{} is "
864 + "not existing in our maps",
865 group.id(),
866 group.deviceId());
alshabib10580802015-02-18 18:30:33 -0800867 }
868 }
869
870 @Override
871 public void deviceInitialAuditCompleted(DeviceId deviceId,
872 boolean completed) {
873 synchronized (deviceAuditStatus) {
874 if (completed) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700875 log.debug("AUDIT completed for device {}",
876 deviceId);
alshabib10580802015-02-18 18:30:33 -0800877 deviceAuditStatus.put(deviceId, true);
878 // Execute all pending group requests
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700879 List<StoredGroupEntry> pendingGroupRequests =
880 getPendingGroupKeyTable().values()
881 .stream()
882 .filter(g-> g.deviceId().equals(deviceId))
883 .collect(Collectors.toList());
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700884 log.debug("processing pending group add requests for device {} and number of pending requests {}",
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700885 deviceId,
886 pendingGroupRequests.size());
887 for (Group group:pendingGroupRequests) {
alshabib10580802015-02-18 18:30:33 -0800888 GroupDescription tmp = new DefaultGroupDescription(
889 group.deviceId(),
890 group.type(),
891 group.buckets(),
892 group.appCookie(),
Saurav Das100e3b82015-04-30 11:12:10 -0700893 group.givenGroupId(),
alshabib10580802015-02-18 18:30:33 -0800894 group.appId());
895 storeGroupDescriptionInternal(tmp);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700896 getPendingGroupKeyTable().
897 remove(new GroupStoreKeyMapKey(deviceId, group.appCookie()));
alshabib10580802015-02-18 18:30:33 -0800898 }
alshabib10580802015-02-18 18:30:33 -0800899 } else {
Thomas Vachuskac40d4632015-04-09 16:55:03 -0700900 Boolean audited = deviceAuditStatus.get(deviceId);
901 if (audited != null && audited) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700902 log.debug("Clearing AUDIT status for device {}", deviceId);
alshabib10580802015-02-18 18:30:33 -0800903 deviceAuditStatus.put(deviceId, false);
904 }
905 }
906 }
907 }
908
909 @Override
910 public boolean deviceInitialAuditStatus(DeviceId deviceId) {
911 synchronized (deviceAuditStatus) {
Thomas Vachuskac40d4632015-04-09 16:55:03 -0700912 Boolean audited = deviceAuditStatus.get(deviceId);
913 return audited != null && audited;
alshabib10580802015-02-18 18:30:33 -0800914 }
915 }
916
917 @Override
918 public void groupOperationFailed(DeviceId deviceId, GroupOperation operation) {
919
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700920 StoredGroupEntry existing = getStoredGroupEntry(deviceId,
921 operation.groupId());
alshabib10580802015-02-18 18:30:33 -0800922
923 if (existing == null) {
924 log.warn("No group entry with ID {} found ", operation.groupId());
925 return;
926 }
927
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700928 log.warn("groupOperationFailed: group operation {} failed"
929 + "for group {} in device {}",
930 operation.opType(),
931 existing.id(),
932 existing.deviceId());
alshabib10580802015-02-18 18:30:33 -0800933 switch (operation.opType()) {
934 case ADD:
Srikanth Vavilapalli5428b6c2015-05-14 20:22:47 -0700935 if (existing.state() == GroupState.PENDING_ADD) {
936 //TODO: Need to add support for passing the group
937 //operation failure reason from group provider.
938 //If the error type is anything other than GROUP_EXISTS,
939 //then the GROUP_ADD_FAILED event should be raised even
940 //in PENDING_ADD_RETRY state also.
941 notifyDelegate(new GroupEvent(Type.GROUP_ADD_FAILED, existing));
942 log.warn("groupOperationFailed: cleaningup "
943 + "group {} from store in device {}....",
944 existing.id(),
945 existing.deviceId());
946 //Removal from groupid based map will happen in the
947 //map update listener
948 getGroupStoreKeyMap().remove(new GroupStoreKeyMapKey(existing.deviceId(),
949 existing.appCookie()));
950 }
alshabib10580802015-02-18 18:30:33 -0800951 break;
952 case MODIFY:
953 notifyDelegate(new GroupEvent(Type.GROUP_UPDATE_FAILED, existing));
954 break;
955 case DELETE:
956 notifyDelegate(new GroupEvent(Type.GROUP_REMOVE_FAILED, existing));
957 break;
958 default:
959 log.warn("Unknown group operation type {}", operation.opType());
960 }
alshabib10580802015-02-18 18:30:33 -0800961 }
962
963 @Override
964 public void addOrUpdateExtraneousGroupEntry(Group group) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700965 log.debug("add/update extraneous group entry {} in device {}",
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700966 group.id(),
967 group.deviceId());
alshabib10580802015-02-18 18:30:33 -0800968 ConcurrentMap<GroupId, Group> extraneousIdTable =
969 getExtraneousGroupIdTable(group.deviceId());
970 extraneousIdTable.put(group.id(), group);
Srikanth Vavilapallie48b3cf2015-07-06 11:43:07 -0700971 // Don't remove the extraneous groups, instead re-use it when
972 // a group request comes with the same set of buckets
alshabib10580802015-02-18 18:30:33 -0800973 }
974
975 @Override
976 public void removeExtraneousGroupEntry(Group group) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700977 log.debug("remove extraneous group entry {} of device {} from store",
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700978 group.id(),
979 group.deviceId());
alshabib10580802015-02-18 18:30:33 -0800980 ConcurrentMap<GroupId, Group> extraneousIdTable =
981 getExtraneousGroupIdTable(group.deviceId());
982 extraneousIdTable.remove(group.id());
983 }
984
985 @Override
986 public Iterable<Group> getExtraneousGroups(DeviceId deviceId) {
987 // flatten and make iterator unmodifiable
988 return FluentIterable.from(
989 getExtraneousGroupIdTable(deviceId).values());
990 }
991
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700992 /**
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700993 * Map handler to receive any events when the group key map is updated.
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700994 */
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700995 private class GroupStoreKeyMapListener implements
996 EventuallyConsistentMapListener<GroupStoreKeyMapKey, StoredGroupEntry> {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700997
998 @Override
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700999 public void event(EventuallyConsistentMapEvent<GroupStoreKeyMapKey,
Jonathan Hart6ec029a2015-03-24 17:12:35 -07001000 StoredGroupEntry> mapEvent) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001001 GroupEvent groupEvent = null;
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001002 GroupStoreKeyMapKey key = mapEvent.key();
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -07001003 StoredGroupEntry group = mapEvent.value();
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001004 if ((key == null) && (group == null)) {
1005 log.error("GroupStoreKeyMapListener: Received "
1006 + "event {} with null entry", mapEvent.type());
1007 return;
1008 } else if (group == null) {
1009 group = getGroupIdTable(key.deviceId()).values()
1010 .stream()
1011 .filter((storedGroup) -> (storedGroup.appCookie().equals(key.appCookie)))
1012 .findFirst().get();
1013 if (group == null) {
1014 log.error("GroupStoreKeyMapListener: Received "
1015 + "event {} with null entry... can not process", mapEvent.type());
1016 return;
1017 }
1018 }
1019 log.trace("received groupid map event {} for id {} in device {}",
1020 mapEvent.type(),
1021 group.id(),
1022 key.deviceId());
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001023 if (mapEvent.type() == EventuallyConsistentMapEvent.Type.PUT) {
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -07001024 // Update the group ID table
1025 getGroupIdTable(group.deviceId()).put(group.id(), group);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001026 if (mapEvent.value().state() == Group.GroupState.ADDED) {
1027 if (mapEvent.value().isGroupStateAddedFirstTime()) {
1028 groupEvent = new GroupEvent(Type.GROUP_ADDED,
1029 mapEvent.value());
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001030 log.trace("Received first time GROUP_ADDED state update for id {} in device {}",
1031 group.id(),
1032 group.deviceId());
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001033 } else {
1034 groupEvent = new GroupEvent(Type.GROUP_UPDATED,
1035 mapEvent.value());
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001036 log.trace("Received following GROUP_ADDED state update for id {} in device {}",
1037 group.id(),
1038 group.deviceId());
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001039 }
1040 }
1041 } else if (mapEvent.type() == EventuallyConsistentMapEvent.Type.REMOVE) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001042 groupEvent = new GroupEvent(Type.GROUP_REMOVED, group);
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -07001043 // Remove the entry from the group ID table
1044 getGroupIdTable(group.deviceId()).remove(group.id(), group);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001045 }
1046
1047 if (groupEvent != null) {
1048 notifyDelegate(groupEvent);
1049 }
1050 }
1051 }
Madan Jampani01e05fb2015-08-13 13:29:36 -07001052
1053 private void process(GroupStoreMessage groupOp) {
1054 log.debug("Received remote group operation {} request for device {}",
1055 groupOp.type(),
1056 groupOp.deviceId());
1057 if (!mastershipService.isLocalMaster(groupOp.deviceId())) {
1058 log.warn("This node is not MASTER for device {}", groupOp.deviceId());
1059 return;
1060 }
1061 if (groupOp.type() == GroupStoreMessage.Type.ADD) {
1062 storeGroupDescriptionInternal(groupOp.groupDesc());
1063 } else if (groupOp.type() == GroupStoreMessage.Type.UPDATE) {
1064 updateGroupDescriptionInternal(groupOp.deviceId(),
1065 groupOp.appCookie(),
1066 groupOp.updateType(),
1067 groupOp.updateBuckets(),
1068 groupOp.newAppCookie());
1069 } else if (groupOp.type() == GroupStoreMessage.Type.DELETE) {
1070 deleteGroupDescriptionInternal(groupOp.deviceId(),
1071 groupOp.appCookie());
1072 }
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001073 }
1074
1075 /**
1076 * Flattened map key to be used to store group entries.
1077 */
Ray Milkeyb3c5ce22015-08-10 09:07:36 -07001078 protected static class GroupStoreMapKey {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001079 private final DeviceId deviceId;
1080
1081 public GroupStoreMapKey(DeviceId deviceId) {
1082 this.deviceId = deviceId;
1083 }
1084
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001085 public DeviceId deviceId() {
1086 return deviceId;
1087 }
1088
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001089 @Override
1090 public boolean equals(Object o) {
1091 if (this == o) {
1092 return true;
1093 }
1094 if (!(o instanceof GroupStoreMapKey)) {
1095 return false;
1096 }
1097 GroupStoreMapKey that = (GroupStoreMapKey) o;
1098 return this.deviceId.equals(that.deviceId);
1099 }
1100
1101 @Override
1102 public int hashCode() {
1103 int result = 17;
1104
1105 result = 31 * result + Objects.hash(this.deviceId);
1106
1107 return result;
1108 }
1109 }
1110
Ray Milkeyb3c5ce22015-08-10 09:07:36 -07001111 protected static class GroupStoreKeyMapKey extends GroupStoreMapKey {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001112 private final GroupKey appCookie;
1113 public GroupStoreKeyMapKey(DeviceId deviceId,
1114 GroupKey appCookie) {
1115 super(deviceId);
1116 this.appCookie = appCookie;
1117 }
1118
1119 @Override
1120 public boolean equals(Object o) {
1121 if (this == o) {
1122 return true;
1123 }
1124 if (!(o instanceof GroupStoreKeyMapKey)) {
1125 return false;
1126 }
1127 GroupStoreKeyMapKey that = (GroupStoreKeyMapKey) o;
1128 return (super.equals(that) &&
1129 this.appCookie.equals(that.appCookie));
1130 }
1131
1132 @Override
1133 public int hashCode() {
1134 int result = 17;
1135
1136 result = 31 * result + super.hashCode() + Objects.hash(this.appCookie);
1137
1138 return result;
1139 }
1140 }
1141
Ray Milkeyb3c5ce22015-08-10 09:07:36 -07001142 protected static class GroupStoreIdMapKey extends GroupStoreMapKey {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001143 private final GroupId groupId;
1144 public GroupStoreIdMapKey(DeviceId deviceId,
1145 GroupId groupId) {
1146 super(deviceId);
1147 this.groupId = groupId;
1148 }
1149
1150 @Override
1151 public boolean equals(Object o) {
1152 if (this == o) {
1153 return true;
1154 }
1155 if (!(o instanceof GroupStoreIdMapKey)) {
1156 return false;
1157 }
1158 GroupStoreIdMapKey that = (GroupStoreIdMapKey) o;
1159 return (super.equals(that) &&
1160 this.groupId.equals(that.groupId));
1161 }
1162
1163 @Override
1164 public int hashCode() {
1165 int result = 17;
1166
1167 result = 31 * result + super.hashCode() + Objects.hash(this.groupId);
1168
1169 return result;
1170 }
1171 }
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001172
1173 @Override
1174 public void pushGroupMetrics(DeviceId deviceId,
1175 Collection<Group> groupEntries) {
1176 boolean deviceInitialAuditStatus =
1177 deviceInitialAuditStatus(deviceId);
1178 Set<Group> southboundGroupEntries =
1179 Sets.newHashSet(groupEntries);
1180 Set<StoredGroupEntry> storedGroupEntries =
1181 Sets.newHashSet(getStoredGroups(deviceId));
1182 Set<Group> extraneousStoredEntries =
1183 Sets.newHashSet(getExtraneousGroups(deviceId));
1184
1185 log.trace("pushGroupMetrics: Displaying all ({}) southboundGroupEntries for device {}",
1186 southboundGroupEntries.size(),
1187 deviceId);
1188 for (Iterator<Group> it = southboundGroupEntries.iterator(); it.hasNext();) {
1189 Group group = it.next();
1190 log.trace("Group {} in device {}", group, deviceId);
1191 }
1192
1193 log.trace("Displaying all ({}) stored group entries for device {}",
1194 storedGroupEntries.size(),
1195 deviceId);
1196 for (Iterator<StoredGroupEntry> it1 = storedGroupEntries.iterator();
1197 it1.hasNext();) {
1198 Group group = it1.next();
1199 log.trace("Stored Group {} for device {}", group, deviceId);
1200 }
1201
1202 for (Iterator<Group> it2 = southboundGroupEntries.iterator(); it2.hasNext();) {
1203 Group group = it2.next();
1204 if (storedGroupEntries.remove(group)) {
1205 // we both have the group, let's update some info then.
1206 log.trace("Group AUDIT: group {} exists in both planes for device {}",
1207 group.id(), deviceId);
1208 groupAdded(group);
1209 it2.remove();
1210 }
1211 }
1212 for (Group group : southboundGroupEntries) {
1213 if (getGroup(group.deviceId(), group.id()) != null) {
1214 // There is a group existing with the same id
1215 // It is possible that group update is
1216 // in progress while we got a stale info from switch
1217 if (!storedGroupEntries.remove(getGroup(
1218 group.deviceId(), group.id()))) {
1219 log.warn("Group AUDIT: Inconsistent state:"
1220 + "Group exists in ID based table while "
1221 + "not present in key based table");
1222 }
1223 } else {
1224 // there are groups in the switch that aren't in the store
1225 log.debug("Group AUDIT: extraneous group {} exists in data plane for device {}",
1226 group.id(), deviceId);
1227 extraneousStoredEntries.remove(group);
1228 extraneousGroup(group);
1229 }
1230 }
1231 for (Group group : storedGroupEntries) {
1232 // there are groups in the store that aren't in the switch
1233 log.debug("Group AUDIT: group {} missing in data plane for device {}",
1234 group.id(), deviceId);
1235 groupMissing(group);
1236 }
1237 for (Group group : extraneousStoredEntries) {
1238 // there are groups in the extraneous store that
1239 // aren't in the switch
1240 log.debug("Group AUDIT: clearing extransoeus group {} from store for device {}",
1241 group.id(), deviceId);
1242 removeExtraneousGroupEntry(group);
1243 }
1244
1245 if (!deviceInitialAuditStatus) {
1246 log.debug("Group AUDIT: Setting device {} initial AUDIT completed",
1247 deviceId);
1248 deviceInitialAuditCompleted(deviceId, true);
1249 }
1250 }
1251
1252 private void groupMissing(Group group) {
1253 switch (group.state()) {
1254 case PENDING_DELETE:
1255 log.debug("Group {} delete confirmation from device {}",
1256 group, group.deviceId());
1257 removeGroupEntry(group);
1258 break;
1259 case ADDED:
1260 case PENDING_ADD:
Srikanth Vavilapalli5428b6c2015-05-14 20:22:47 -07001261 case PENDING_ADD_RETRY:
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001262 case PENDING_UPDATE:
1263 log.debug("Group {} is in store but not on device {}",
1264 group, group.deviceId());
1265 StoredGroupEntry existing =
1266 getStoredGroupEntry(group.deviceId(), group.id());
Srikanth Vavilapalli5428b6c2015-05-14 20:22:47 -07001267 log.debug("groupMissing: group entry {} in device {} moving from {} to PENDING_ADD_RETRY",
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001268 existing.id(),
1269 existing.deviceId(),
1270 existing.state());
Srikanth Vavilapalli5428b6c2015-05-14 20:22:47 -07001271 existing.setState(Group.GroupState.PENDING_ADD_RETRY);
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001272 //Re-PUT map entries to trigger map update events
1273 getGroupStoreKeyMap().
1274 put(new GroupStoreKeyMapKey(existing.deviceId(),
1275 existing.appCookie()), existing);
1276 notifyDelegate(new GroupEvent(GroupEvent.Type.GROUP_ADD_REQUESTED,
1277 group));
1278 break;
1279 default:
1280 log.debug("Group {} has not been installed.", group);
1281 break;
1282 }
1283 }
1284
1285 private void extraneousGroup(Group group) {
1286 log.debug("Group {} is on device {} but not in store.",
1287 group, group.deviceId());
1288 addOrUpdateExtraneousGroupEntry(group);
1289 }
1290
1291 private void groupAdded(Group group) {
1292 log.trace("Group {} Added or Updated in device {}",
1293 group, group.deviceId());
1294 addOrUpdateGroupEntry(group);
1295 }
alshabib10580802015-02-18 18:30:33 -08001296}