blob: 2c682a148e8b7d3bb32e8969304b12b9d16615f9 [file] [log] [blame]
alshabib10580802015-02-18 18:30:33 -08001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.group.impl;
17
Jonathan Hart6ec029a2015-03-24 17:12:35 -070018import com.google.common.collect.FluentIterable;
19import com.google.common.collect.Iterables;
Srikanth Vavilapalli23181912015-05-04 09:48:09 -070020import com.google.common.collect.Sets;
Srikanth Vavilapalli10e75cd2015-04-13 16:21:24 -070021
alshabib10580802015-02-18 18:30:33 -080022import org.apache.felix.scr.annotations.Activate;
23import org.apache.felix.scr.annotations.Component;
24import org.apache.felix.scr.annotations.Deactivate;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070025import org.apache.felix.scr.annotations.Reference;
26import org.apache.felix.scr.annotations.ReferenceCardinality;
alshabib10580802015-02-18 18:30:33 -080027import org.apache.felix.scr.annotations.Service;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070028import org.onlab.util.KryoNamespace;
alshabib10580802015-02-18 18:30:33 -080029import org.onlab.util.NewConcurrentHashMap;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070030import org.onosproject.cluster.ClusterService;
31import org.onosproject.core.DefaultApplicationId;
alshabib10580802015-02-18 18:30:33 -080032import org.onosproject.core.DefaultGroupId;
33import org.onosproject.core.GroupId;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070034import org.onosproject.mastership.MastershipService;
alshabib10580802015-02-18 18:30:33 -080035import org.onosproject.net.DeviceId;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070036import org.onosproject.net.MastershipRole;
37import org.onosproject.net.PortNumber;
38import org.onosproject.net.flow.DefaultTrafficTreatment;
39import org.onosproject.net.flow.FlowRule;
40import org.onosproject.net.flow.instructions.Instructions;
41import org.onosproject.net.flow.instructions.L0ModificationInstruction;
42import org.onosproject.net.flow.instructions.L2ModificationInstruction;
43import org.onosproject.net.flow.instructions.L3ModificationInstruction;
alshabib10580802015-02-18 18:30:33 -080044import org.onosproject.net.group.DefaultGroup;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070045import org.onosproject.net.group.DefaultGroupBucket;
alshabib10580802015-02-18 18:30:33 -080046import org.onosproject.net.group.DefaultGroupDescription;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070047import org.onosproject.net.group.DefaultGroupKey;
alshabib10580802015-02-18 18:30:33 -080048import org.onosproject.net.group.Group;
49import org.onosproject.net.group.Group.GroupState;
50import org.onosproject.net.group.GroupBucket;
51import org.onosproject.net.group.GroupBuckets;
52import org.onosproject.net.group.GroupDescription;
53import org.onosproject.net.group.GroupEvent;
54import org.onosproject.net.group.GroupEvent.Type;
55import org.onosproject.net.group.GroupKey;
56import org.onosproject.net.group.GroupOperation;
57import org.onosproject.net.group.GroupStore;
58import org.onosproject.net.group.GroupStoreDelegate;
Srikanth Vavilapalli10e75cd2015-04-13 16:21:24 -070059import org.onosproject.net.group.StoredGroupBucketEntry;
alshabib10580802015-02-18 18:30:33 -080060import org.onosproject.net.group.StoredGroupEntry;
61import org.onosproject.store.AbstractStore;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070062import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
Jonathan Hart63939a32015-05-08 11:57:03 -070063import org.onosproject.store.service.MultiValuedTimestamp;
Srikanth Vavilapalli23181912015-05-04 09:48:09 -070064import org.onosproject.store.serializers.DeviceIdSerializer;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070065import org.onosproject.store.serializers.KryoNamespaces;
Srikanth Vavilapalli23181912015-05-04 09:48:09 -070066import org.onosproject.store.serializers.URISerializer;
Jonathan Hart6ec029a2015-03-24 17:12:35 -070067import org.onosproject.store.service.EventuallyConsistentMap;
68import org.onosproject.store.service.EventuallyConsistentMapBuilder;
69import org.onosproject.store.service.EventuallyConsistentMapEvent;
70import org.onosproject.store.service.EventuallyConsistentMapListener;
71import org.onosproject.store.service.StorageService;
alshabib10580802015-02-18 18:30:33 -080072import org.slf4j.Logger;
73
Jonathan Hart6ec029a2015-03-24 17:12:35 -070074import java.net.URI;
75import java.util.ArrayList;
Srikanth Vavilapalli23181912015-05-04 09:48:09 -070076import java.util.Collection;
Jonathan Hart6ec029a2015-03-24 17:12:35 -070077import java.util.HashMap;
Srikanth Vavilapalli23181912015-05-04 09:48:09 -070078import java.util.Iterator;
Jonathan Hart6ec029a2015-03-24 17:12:35 -070079import java.util.List;
80import java.util.Objects;
Sho SHIMIZU30d639b2015-05-05 09:30:35 -070081import java.util.Optional;
Srikanth Vavilapalli23181912015-05-04 09:48:09 -070082import java.util.Set;
Jonathan Hart6ec029a2015-03-24 17:12:35 -070083import java.util.concurrent.ConcurrentHashMap;
84import java.util.concurrent.ConcurrentMap;
85import java.util.concurrent.ExecutorService;
86import java.util.concurrent.Executors;
87import java.util.concurrent.atomic.AtomicInteger;
88import java.util.concurrent.atomic.AtomicLong;
89import java.util.stream.Collectors;
90
91import static org.apache.commons.lang3.concurrent.ConcurrentUtils.createIfAbsentUnchecked;
92import static org.onlab.util.Tools.groupedThreads;
93import static org.slf4j.LoggerFactory.getLogger;
alshabib10580802015-02-18 18:30:33 -080094
95/**
96 * Manages inventory of group entries using trivial in-memory implementation.
97 */
98@Component(immediate = true)
99@Service
100public class DistributedGroupStore
101 extends AbstractStore<GroupEvent, GroupStoreDelegate>
102 implements GroupStore {
103
104 private final Logger log = getLogger(getClass());
105
106 private final int dummyId = 0xffffffff;
107 private final GroupId dummyGroupId = new DefaultGroupId(dummyId);
108
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700109 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
110 protected ClusterCommunicationService clusterCommunicator;
111
112 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
113 protected ClusterService clusterService;
114
115 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700116 protected StorageService storageService;
117
118 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700119 protected MastershipService mastershipService;
120
121 // Per device group table with (device id + app cookie) as key
122 private EventuallyConsistentMap<GroupStoreKeyMapKey,
123 StoredGroupEntry> groupStoreEntriesByKey = null;
124 // Per device group table with (device id + group id) as key
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700125 private final ConcurrentMap<DeviceId, ConcurrentMap<GroupId, StoredGroupEntry>>
126 groupEntriesById = new ConcurrentHashMap<>();
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700127 private EventuallyConsistentMap<GroupStoreKeyMapKey,
128 StoredGroupEntry> auditPendingReqQueue = null;
alshabib10580802015-02-18 18:30:33 -0800129 private final ConcurrentMap<DeviceId, ConcurrentMap<GroupId, Group>>
130 extraneousGroupEntriesById = new ConcurrentHashMap<>();
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700131 private ExecutorService messageHandlingExecutor;
132 private static final int MESSAGE_HANDLER_THREAD_POOL_SIZE = 1;
alshabib10580802015-02-18 18:30:33 -0800133
134 private final HashMap<DeviceId, Boolean> deviceAuditStatus =
135 new HashMap<DeviceId, Boolean>();
136
137 private final AtomicInteger groupIdGen = new AtomicInteger();
138
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700139 private KryoNamespace.Builder kryoBuilder = null;
140
Madan Jampanibcf1a482015-06-24 19:05:56 -0700141 private final AtomicLong sequenceNumber = new AtomicLong(0);
142
alshabib10580802015-02-18 18:30:33 -0800143 @Activate
144 public void activate() {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700145 kryoBuilder = new KryoNamespace.Builder()
146 .register(DefaultGroup.class,
147 DefaultGroupBucket.class,
148 DefaultGroupDescription.class,
149 DefaultGroupKey.class,
150 GroupDescription.Type.class,
151 Group.GroupState.class,
152 GroupBuckets.class,
153 DefaultGroupId.class,
154 GroupStoreMessage.class,
155 GroupStoreMessage.Type.class,
156 UpdateType.class,
157 GroupStoreMessageSubjects.class,
158 MultiValuedTimestamp.class,
159 GroupStoreKeyMapKey.class,
160 GroupStoreIdMapKey.class,
161 GroupStoreMapKey.class
162 )
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700163 .register(new URISerializer(), URI.class)
164 .register(new DeviceIdSerializer(), DeviceId.class)
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700165 .register(PortNumber.class)
166 .register(DefaultApplicationId.class)
167 .register(DefaultTrafficTreatment.class,
168 Instructions.DropInstruction.class,
169 Instructions.OutputInstruction.class,
170 Instructions.GroupInstruction.class,
171 Instructions.TableTypeTransition.class,
172 FlowRule.Type.class,
173 L0ModificationInstruction.class,
174 L0ModificationInstruction.L0SubType.class,
175 L0ModificationInstruction.ModLambdaInstruction.class,
176 L2ModificationInstruction.class,
177 L2ModificationInstruction.L2SubType.class,
178 L2ModificationInstruction.ModEtherInstruction.class,
179 L2ModificationInstruction.PushHeaderInstructions.class,
180 L2ModificationInstruction.ModVlanIdInstruction.class,
181 L2ModificationInstruction.ModVlanPcpInstruction.class,
182 L2ModificationInstruction.ModMplsLabelInstruction.class,
183 L2ModificationInstruction.ModMplsTtlInstruction.class,
184 L3ModificationInstruction.class,
185 L3ModificationInstruction.L3SubType.class,
186 L3ModificationInstruction.ModIPInstruction.class,
187 L3ModificationInstruction.ModIPv6FlowLabelInstruction.class,
188 L3ModificationInstruction.ModTtlInstruction.class,
189 org.onlab.packet.MplsLabel.class
190 )
191 .register(org.onosproject.cluster.NodeId.class)
192 .register(KryoNamespaces.BASIC)
193 .register(KryoNamespaces.MISC);
194
195 messageHandlingExecutor = Executors.
196 newFixedThreadPool(MESSAGE_HANDLER_THREAD_POOL_SIZE,
197 groupedThreads("onos/store/group",
198 "message-handlers"));
Madan Jampani01e05fb2015-08-13 13:29:36 -0700199
200 clusterCommunicator.addSubscriber(GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST,
201 kryoBuilder.build()::deserialize,
202 this::process,
203 messageHandlingExecutor);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700204
205 log.debug("Creating EC map groupstorekeymap");
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700206 EventuallyConsistentMapBuilder<GroupStoreKeyMapKey, StoredGroupEntry>
207 keyMapBuilder = storageService.eventuallyConsistentMapBuilder();
208
209 groupStoreEntriesByKey = keyMapBuilder
210 .withName("groupstorekeymap")
211 .withSerializer(kryoBuilder)
Madan Jampanibcf1a482015-06-24 19:05:56 -0700212 .withTimestampProvider((k, v) -> new MultiValuedTimestamp<>(System.currentTimeMillis(),
213 sequenceNumber.getAndIncrement()))
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700214 .build();
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700215 groupStoreEntriesByKey.addListener(new GroupStoreKeyMapListener());
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700216 log.debug("Current size of groupstorekeymap:{}",
217 groupStoreEntriesByKey.size());
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700218
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700219 log.debug("Creating EC map pendinggroupkeymap");
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700220 EventuallyConsistentMapBuilder<GroupStoreKeyMapKey, StoredGroupEntry>
221 auditMapBuilder = storageService.eventuallyConsistentMapBuilder();
222
223 auditPendingReqQueue = auditMapBuilder
224 .withName("pendinggroupkeymap")
225 .withSerializer(kryoBuilder)
Madan Jampanibcf1a482015-06-24 19:05:56 -0700226 .withTimestampProvider((k, v) -> new MultiValuedTimestamp<>(System.currentTimeMillis(),
227 sequenceNumber.getAndIncrement()))
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700228 .build();
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700229 log.debug("Current size of pendinggroupkeymap:{}",
230 auditPendingReqQueue.size());
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700231
alshabib10580802015-02-18 18:30:33 -0800232 log.info("Started");
233 }
234
235 @Deactivate
236 public void deactivate() {
Thomas Vachuska152f9fd2015-04-02 16:28:13 -0700237 groupStoreEntriesByKey.destroy();
Thomas Vachuska152f9fd2015-04-02 16:28:13 -0700238 auditPendingReqQueue.destroy();
alshabib10580802015-02-18 18:30:33 -0800239 log.info("Stopped");
240 }
241
alshabib10580802015-02-18 18:30:33 -0800242 private static NewConcurrentHashMap<GroupId, Group>
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700243 lazyEmptyExtraneousGroupIdTable() {
alshabib10580802015-02-18 18:30:33 -0800244 return NewConcurrentHashMap.<GroupId, Group>ifNeeded();
245 }
246
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700247 private static NewConcurrentHashMap<GroupId, StoredGroupEntry>
248 lazyEmptyGroupIdTable() {
249 return NewConcurrentHashMap.<GroupId, StoredGroupEntry>ifNeeded();
250 }
251
alshabib10580802015-02-18 18:30:33 -0800252 /**
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700253 * Returns the group store eventual consistent key map.
alshabib10580802015-02-18 18:30:33 -0800254 *
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700255 * @return Map representing group key table.
alshabib10580802015-02-18 18:30:33 -0800256 */
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700257 private EventuallyConsistentMap<GroupStoreKeyMapKey, StoredGroupEntry>
258 getGroupStoreKeyMap() {
259 return groupStoreEntriesByKey;
alshabib10580802015-02-18 18:30:33 -0800260 }
261
262 /**
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700263 * Returns the group id table for specified device.
alshabib10580802015-02-18 18:30:33 -0800264 *
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700265 * @param deviceId identifier of the device
266 * @return Map representing group key table of given device.
alshabib10580802015-02-18 18:30:33 -0800267 */
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700268 private ConcurrentMap<GroupId, StoredGroupEntry> getGroupIdTable(DeviceId deviceId) {
269 return createIfAbsentUnchecked(groupEntriesById,
270 deviceId, lazyEmptyGroupIdTable());
alshabib10580802015-02-18 18:30:33 -0800271 }
272
273 /**
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700274 * Returns the pending group request table.
alshabib10580802015-02-18 18:30:33 -0800275 *
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700276 * @return Map representing group key table.
alshabib10580802015-02-18 18:30:33 -0800277 */
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700278 private EventuallyConsistentMap<GroupStoreKeyMapKey, StoredGroupEntry>
279 getPendingGroupKeyTable() {
280 return auditPendingReqQueue;
alshabib10580802015-02-18 18:30:33 -0800281 }
282
283 /**
284 * Returns the extraneous group id table for specified device.
285 *
286 * @param deviceId identifier of the device
287 * @return Map representing group key table of given device.
288 */
289 private ConcurrentMap<GroupId, Group>
290 getExtraneousGroupIdTable(DeviceId deviceId) {
291 return createIfAbsentUnchecked(extraneousGroupEntriesById,
292 deviceId,
293 lazyEmptyExtraneousGroupIdTable());
294 }
295
296 /**
297 * Returns the number of groups for the specified device in the store.
298 *
299 * @return number of groups for the specified device
300 */
301 @Override
302 public int getGroupCount(DeviceId deviceId) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700303 return (getGroups(deviceId) != null) ?
304 Iterables.size(getGroups(deviceId)) : 0;
alshabib10580802015-02-18 18:30:33 -0800305 }
306
307 /**
308 * Returns the groups associated with a device.
309 *
310 * @param deviceId the device ID
311 *
312 * @return the group entries
313 */
314 @Override
315 public Iterable<Group> getGroups(DeviceId deviceId) {
316 // flatten and make iterator unmodifiable
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700317 log.debug("getGroups: for device {} total number of groups {}",
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700318 deviceId, getGroupStoreKeyMap().values().size());
319 return FluentIterable.from(getGroupStoreKeyMap().values())
320 .filter(input -> input.deviceId().equals(deviceId))
321 .transform(input -> input);
alshabib10580802015-02-18 18:30:33 -0800322 }
323
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700324 private Iterable<StoredGroupEntry> getStoredGroups(DeviceId deviceId) {
325 // flatten and make iterator unmodifiable
326 log.debug("getGroups: for device {} total number of groups {}",
327 deviceId, getGroupStoreKeyMap().values().size());
328 return FluentIterable.from(getGroupStoreKeyMap().values())
329 .filter(input -> input.deviceId().equals(deviceId));
330 }
331
alshabib10580802015-02-18 18:30:33 -0800332 /**
333 * Returns the stored group entry.
334 *
335 * @param deviceId the device ID
336 * @param appCookie the group key
337 *
338 * @return a group associated with the key
339 */
340 @Override
341 public Group getGroup(DeviceId deviceId, GroupKey appCookie) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700342 return getStoredGroupEntry(deviceId, appCookie);
343 }
344
345 private StoredGroupEntry getStoredGroupEntry(DeviceId deviceId,
346 GroupKey appCookie) {
347 return getGroupStoreKeyMap().get(new GroupStoreKeyMapKey(deviceId,
348 appCookie));
349 }
350
351 @Override
352 public Group getGroup(DeviceId deviceId, GroupId groupId) {
353 return getStoredGroupEntry(deviceId, groupId);
354 }
355
356 private StoredGroupEntry getStoredGroupEntry(DeviceId deviceId,
357 GroupId groupId) {
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700358 return getGroupIdTable(deviceId).get(groupId);
alshabib10580802015-02-18 18:30:33 -0800359 }
360
361 private int getFreeGroupIdValue(DeviceId deviceId) {
362 int freeId = groupIdGen.incrementAndGet();
363
364 while (true) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700365 Group existing = getGroup(deviceId, new DefaultGroupId(freeId));
alshabib10580802015-02-18 18:30:33 -0800366 if (existing == null) {
367 existing = (
368 extraneousGroupEntriesById.get(deviceId) != null) ?
369 extraneousGroupEntriesById.get(deviceId).
370 get(new DefaultGroupId(freeId)) :
371 null;
372 }
373 if (existing != null) {
374 freeId = groupIdGen.incrementAndGet();
375 } else {
376 break;
377 }
378 }
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700379 log.debug("getFreeGroupIdValue: Next Free ID is {}", freeId);
alshabib10580802015-02-18 18:30:33 -0800380 return freeId;
381 }
382
383 /**
384 * Stores a new group entry using the information from group description.
385 *
386 * @param groupDesc group description to be used to create group entry
387 */
388 @Override
389 public void storeGroupDescription(GroupDescription groupDesc) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700390 log.debug("In storeGroupDescription");
alshabib10580802015-02-18 18:30:33 -0800391 // Check if a group is existing with the same key
392 if (getGroup(groupDesc.deviceId(), groupDesc.appCookie()) != null) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700393 log.warn("Group already exists with the same key {}",
394 groupDesc.appCookie());
alshabib10580802015-02-18 18:30:33 -0800395 return;
396 }
397
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700398 // Check if group to be created by a remote instance
Madan Jampani175e8fd2015-05-20 14:10:45 -0700399 if (mastershipService.getLocalRole(groupDesc.deviceId()) != MastershipRole.MASTER) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700400 log.debug("storeGroupDescription: Device {} local role is not MASTER",
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700401 groupDesc.deviceId());
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700402 if (mastershipService.getMasterFor(groupDesc.deviceId()) == null) {
403 log.error("No Master for device {}..."
404 + "Can not perform add group operation",
405 groupDesc.deviceId());
406 //TODO: Send Group operation failure event
407 return;
408 }
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700409 GroupStoreMessage groupOp = GroupStoreMessage.
410 createGroupAddRequestMsg(groupDesc.deviceId(),
411 groupDesc);
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700412
Madan Jampani175e8fd2015-05-20 14:10:45 -0700413 clusterCommunicator.unicast(groupOp,
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700414 GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST,
415 m -> kryoBuilder.build().serialize(m),
Madan Jampani175e8fd2015-05-20 14:10:45 -0700416 mastershipService.getMasterFor(groupDesc.deviceId())).whenComplete((result, error) -> {
417 if (error != null) {
418 log.warn("Failed to send request to master: {} to {}",
419 groupOp,
420 mastershipService.getMasterFor(groupDesc.deviceId()));
421 //TODO: Send Group operation failure event
422 } else {
423 log.debug("Sent Group operation request for device {} "
424 + "to remote MASTER {}",
425 groupDesc.deviceId(),
426 mastershipService.getMasterFor(groupDesc.deviceId()));
427 }
428 });
alshabib10580802015-02-18 18:30:33 -0800429 return;
430 }
431
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700432 log.debug("Store group for device {} is getting handled locally",
433 groupDesc.deviceId());
alshabib10580802015-02-18 18:30:33 -0800434 storeGroupDescriptionInternal(groupDesc);
435 }
436
437 private void storeGroupDescriptionInternal(GroupDescription groupDesc) {
438 // Check if a group is existing with the same key
439 if (getGroup(groupDesc.deviceId(), groupDesc.appCookie()) != null) {
440 return;
441 }
442
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700443 if (deviceAuditStatus.get(groupDesc.deviceId()) == null) {
444 // Device group audit has not completed yet
445 // Add this group description to pending group key table
446 // Create a group entry object with Dummy Group ID
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700447 log.debug("storeGroupDescriptionInternal: Device {} AUDIT pending...Queuing Group ADD request",
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700448 groupDesc.deviceId());
449 StoredGroupEntry group = new DefaultGroup(dummyGroupId, groupDesc);
450 group.setState(GroupState.WAITING_AUDIT_COMPLETE);
451 EventuallyConsistentMap<GroupStoreKeyMapKey, StoredGroupEntry> pendingKeyTable =
452 getPendingGroupKeyTable();
453 pendingKeyTable.put(new GroupStoreKeyMapKey(groupDesc.deviceId(),
454 groupDesc.appCookie()),
455 group);
456 return;
457 }
458
Saurav Das100e3b82015-04-30 11:12:10 -0700459 GroupId id = null;
460 if (groupDesc.givenGroupId() == null) {
461 // Get a new group identifier
462 id = new DefaultGroupId(getFreeGroupIdValue(groupDesc.deviceId()));
463 } else {
464 id = new DefaultGroupId(groupDesc.givenGroupId());
465 }
alshabib10580802015-02-18 18:30:33 -0800466 // Create a group entry object
467 StoredGroupEntry group = new DefaultGroup(id, groupDesc);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700468 // Insert the newly created group entry into key and id maps
469 getGroupStoreKeyMap().
470 put(new GroupStoreKeyMapKey(groupDesc.deviceId(),
471 groupDesc.appCookie()), group);
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700472 // Ensure it also inserted into group id based table to
473 // avoid any chances of duplication in group id generation
474 getGroupIdTable(groupDesc.deviceId()).
475 put(id, group);
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700476 log.debug("storeGroupDescriptionInternal: Processing Group ADD request for Id {} in device {}",
477 id,
478 groupDesc.deviceId());
alshabib10580802015-02-18 18:30:33 -0800479 notifyDelegate(new GroupEvent(GroupEvent.Type.GROUP_ADD_REQUESTED,
480 group));
481 }
482
483 /**
484 * Updates the existing group entry with the information
485 * from group description.
486 *
487 * @param deviceId the device ID
488 * @param oldAppCookie the current group key
489 * @param type update type
490 * @param newBuckets group buckets for updates
491 * @param newAppCookie optional new group key
492 */
493 @Override
494 public void updateGroupDescription(DeviceId deviceId,
495 GroupKey oldAppCookie,
496 UpdateType type,
497 GroupBuckets newBuckets,
498 GroupKey newAppCookie) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700499 // Check if group update to be done by a remote instance
sangho52abe3a2015-05-05 14:13:34 -0700500 if (mastershipService.getMasterFor(deviceId) != null &&
501 mastershipService.getLocalRole(deviceId) != MastershipRole.MASTER) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700502 log.debug("updateGroupDescription: Device {} local role is not MASTER",
503 deviceId);
504 if (mastershipService.getMasterFor(deviceId) == null) {
505 log.error("No Master for device {}..."
506 + "Can not perform update group operation",
507 deviceId);
508 //TODO: Send Group operation failure event
509 return;
510 }
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700511 GroupStoreMessage groupOp = GroupStoreMessage.
512 createGroupUpdateRequestMsg(deviceId,
513 oldAppCookie,
514 type,
515 newBuckets,
516 newAppCookie);
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700517
Madan Jampani175e8fd2015-05-20 14:10:45 -0700518 clusterCommunicator.unicast(groupOp,
519 GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST,
520 m -> kryoBuilder.build().serialize(m),
521 mastershipService.getMasterFor(deviceId)).whenComplete((result, error) -> {
522 if (error != null) {
523 log.warn("Failed to send request to master: {} to {}",
524 groupOp,
525 mastershipService.getMasterFor(deviceId), error);
526 }
527 //TODO: Send Group operation failure event
528 });
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700529 return;
530 }
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700531 log.debug("updateGroupDescription for device {} is getting handled locally",
532 deviceId);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700533 updateGroupDescriptionInternal(deviceId,
534 oldAppCookie,
535 type,
536 newBuckets,
537 newAppCookie);
538 }
539
540 private void updateGroupDescriptionInternal(DeviceId deviceId,
541 GroupKey oldAppCookie,
542 UpdateType type,
543 GroupBuckets newBuckets,
544 GroupKey newAppCookie) {
alshabib10580802015-02-18 18:30:33 -0800545 // Check if a group is existing with the provided key
546 Group oldGroup = getGroup(deviceId, oldAppCookie);
547 if (oldGroup == null) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700548 log.warn("updateGroupDescriptionInternal: Group not found...strange");
alshabib10580802015-02-18 18:30:33 -0800549 return;
550 }
551
552 List<GroupBucket> newBucketList = getUpdatedBucketList(oldGroup,
553 type,
554 newBuckets);
555 if (newBucketList != null) {
556 // Create a new group object from the old group
557 GroupBuckets updatedBuckets = new GroupBuckets(newBucketList);
558 GroupKey newCookie = (newAppCookie != null) ? newAppCookie : oldAppCookie;
559 GroupDescription updatedGroupDesc = new DefaultGroupDescription(
560 oldGroup.deviceId(),
561 oldGroup.type(),
562 updatedBuckets,
563 newCookie,
Saurav Das100e3b82015-04-30 11:12:10 -0700564 oldGroup.givenGroupId(),
alshabib10580802015-02-18 18:30:33 -0800565 oldGroup.appId());
566 StoredGroupEntry newGroup = new DefaultGroup(oldGroup.id(),
567 updatedGroupDesc);
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700568 log.debug("updateGroupDescriptionInternal: group entry {} in device {} moving from {} to PENDING_UPDATE",
569 oldGroup.id(),
570 oldGroup.deviceId(),
571 oldGroup.state());
alshabib10580802015-02-18 18:30:33 -0800572 newGroup.setState(GroupState.PENDING_UPDATE);
573 newGroup.setLife(oldGroup.life());
574 newGroup.setPackets(oldGroup.packets());
575 newGroup.setBytes(oldGroup.bytes());
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700576 //Update the group entry in groupkey based map.
577 //Update to groupid based map will happen in the
578 //groupkey based map update listener
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700579 log.debug("updateGroupDescriptionInternal with type {}: Group updated with buckets",
580 type);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700581 getGroupStoreKeyMap().
582 put(new GroupStoreKeyMapKey(newGroup.deviceId(),
583 newGroup.appCookie()), newGroup);
alshabib10580802015-02-18 18:30:33 -0800584 notifyDelegate(new GroupEvent(Type.GROUP_UPDATE_REQUESTED, newGroup));
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700585 } else {
586 log.warn("updateGroupDescriptionInternal with type {}: No "
587 + "change in the buckets in update", type);
alshabib10580802015-02-18 18:30:33 -0800588 }
589 }
590
591 private List<GroupBucket> getUpdatedBucketList(Group oldGroup,
592 UpdateType type,
593 GroupBuckets buckets) {
594 GroupBuckets oldBuckets = oldGroup.buckets();
595 List<GroupBucket> newBucketList = new ArrayList<GroupBucket>(
596 oldBuckets.buckets());
597 boolean groupDescUpdated = false;
598
599 if (type == UpdateType.ADD) {
600 // Check if the any of the new buckets are part of
601 // the old bucket list
602 for (GroupBucket addBucket:buckets.buckets()) {
603 if (!newBucketList.contains(addBucket)) {
604 newBucketList.add(addBucket);
605 groupDescUpdated = true;
606 }
607 }
608 } else if (type == UpdateType.REMOVE) {
609 // Check if the to be removed buckets are part of the
610 // old bucket list
611 for (GroupBucket removeBucket:buckets.buckets()) {
612 if (newBucketList.contains(removeBucket)) {
613 newBucketList.remove(removeBucket);
614 groupDescUpdated = true;
615 }
616 }
617 }
618
619 if (groupDescUpdated) {
620 return newBucketList;
621 } else {
622 return null;
623 }
624 }
625
626 /**
627 * Triggers deleting the existing group entry.
628 *
629 * @param deviceId the device ID
630 * @param appCookie the group key
631 */
632 @Override
633 public void deleteGroupDescription(DeviceId deviceId,
634 GroupKey appCookie) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700635 // Check if group to be deleted by a remote instance
636 if (mastershipService.
637 getLocalRole(deviceId) != MastershipRole.MASTER) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700638 log.debug("deleteGroupDescription: Device {} local role is not MASTER",
639 deviceId);
640 if (mastershipService.getMasterFor(deviceId) == null) {
641 log.error("No Master for device {}..."
642 + "Can not perform delete group operation",
643 deviceId);
644 //TODO: Send Group operation failure event
645 return;
646 }
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700647 GroupStoreMessage groupOp = GroupStoreMessage.
648 createGroupDeleteRequestMsg(deviceId,
649 appCookie);
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700650
Madan Jampani175e8fd2015-05-20 14:10:45 -0700651 clusterCommunicator.unicast(groupOp,
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700652 GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST,
653 m -> kryoBuilder.build().serialize(m),
Madan Jampani175e8fd2015-05-20 14:10:45 -0700654 mastershipService.getMasterFor(deviceId)).whenComplete((result, error) -> {
655 if (error != null) {
656 log.warn("Failed to send request to master: {} to {}",
657 groupOp,
658 mastershipService.getMasterFor(deviceId), error);
659 }
660 //TODO: Send Group operation failure event
661 });
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700662 return;
663 }
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700664 log.debug("deleteGroupDescription in device {} is getting handled locally",
665 deviceId);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700666 deleteGroupDescriptionInternal(deviceId, appCookie);
667 }
668
669 private void deleteGroupDescriptionInternal(DeviceId deviceId,
670 GroupKey appCookie) {
alshabib10580802015-02-18 18:30:33 -0800671 // Check if a group is existing with the provided key
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700672 StoredGroupEntry existing = getStoredGroupEntry(deviceId, appCookie);
alshabib10580802015-02-18 18:30:33 -0800673 if (existing == null) {
674 return;
675 }
676
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700677 log.debug("deleteGroupDescriptionInternal: group entry {} in device {} moving from {} to PENDING_DELETE",
678 existing.id(),
679 existing.deviceId(),
680 existing.state());
alshabib10580802015-02-18 18:30:33 -0800681 synchronized (existing) {
682 existing.setState(GroupState.PENDING_DELETE);
683 }
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700684 log.debug("deleteGroupDescriptionInternal: in device {} issuing GROUP_REMOVE_REQUESTED",
685 deviceId);
alshabib10580802015-02-18 18:30:33 -0800686 notifyDelegate(new GroupEvent(Type.GROUP_REMOVE_REQUESTED, existing));
687 }
688
689 /**
690 * Stores a new group entry, or updates an existing entry.
691 *
692 * @param group group entry
693 */
694 @Override
695 public void addOrUpdateGroupEntry(Group group) {
696 // check if this new entry is an update to an existing entry
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700697 StoredGroupEntry existing = getStoredGroupEntry(group.deviceId(),
698 group.id());
alshabib10580802015-02-18 18:30:33 -0800699 GroupEvent event = null;
700
701 if (existing != null) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700702 log.debug("addOrUpdateGroupEntry: updating group entry {} in device {}",
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700703 group.id(),
704 group.deviceId());
alshabib10580802015-02-18 18:30:33 -0800705 synchronized (existing) {
Srikanth Vavilapalli10e75cd2015-04-13 16:21:24 -0700706 for (GroupBucket bucket:group.buckets().buckets()) {
Sho SHIMIZU30d639b2015-05-05 09:30:35 -0700707 Optional<GroupBucket> matchingBucket =
Srikanth Vavilapalli10e75cd2015-04-13 16:21:24 -0700708 existing.buckets().buckets()
709 .stream()
710 .filter((existingBucket)->(existingBucket.equals(bucket)))
711 .findFirst();
712 if (matchingBucket.isPresent()) {
713 ((StoredGroupBucketEntry) matchingBucket.
714 get()).setPackets(bucket.packets());
715 ((StoredGroupBucketEntry) matchingBucket.
716 get()).setBytes(bucket.bytes());
717 } else {
718 log.warn("addOrUpdateGroupEntry: No matching "
719 + "buckets to update stats");
720 }
721 }
alshabib10580802015-02-18 18:30:33 -0800722 existing.setLife(group.life());
723 existing.setPackets(group.packets());
724 existing.setBytes(group.bytes());
Srikanth Vavilapalli5428b6c2015-05-14 20:22:47 -0700725 if ((existing.state() == GroupState.PENDING_ADD) ||
726 (existing.state() == GroupState.PENDING_ADD_RETRY)) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700727 log.debug("addOrUpdateGroupEntry: group entry {} in device {} moving from {} to ADDED",
728 existing.id(),
729 existing.deviceId(),
Srikanth Vavilapalli5428b6c2015-05-14 20:22:47 -0700730 existing.state());
alshabib10580802015-02-18 18:30:33 -0800731 existing.setState(GroupState.ADDED);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700732 existing.setIsGroupStateAddedFirstTime(true);
alshabib10580802015-02-18 18:30:33 -0800733 event = new GroupEvent(Type.GROUP_ADDED, existing);
734 } else {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700735 log.debug("addOrUpdateGroupEntry: group entry {} in device {} moving from {} to ADDED",
736 existing.id(),
737 existing.deviceId(),
738 GroupState.PENDING_UPDATE);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700739 existing.setState(GroupState.ADDED);
740 existing.setIsGroupStateAddedFirstTime(false);
alshabib10580802015-02-18 18:30:33 -0800741 event = new GroupEvent(Type.GROUP_UPDATED, existing);
742 }
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700743 //Re-PUT map entries to trigger map update events
744 getGroupStoreKeyMap().
745 put(new GroupStoreKeyMapKey(existing.deviceId(),
746 existing.appCookie()), existing);
alshabib10580802015-02-18 18:30:33 -0800747 }
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700748 } else {
749 log.warn("addOrUpdateGroupEntry: Group update "
750 + "happening for a non-existing entry in the map");
alshabib10580802015-02-18 18:30:33 -0800751 }
752
753 if (event != null) {
754 notifyDelegate(event);
755 }
756 }
757
758 /**
759 * Removes the group entry from store.
760 *
761 * @param group group entry
762 */
763 @Override
764 public void removeGroupEntry(Group group) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700765 StoredGroupEntry existing = getStoredGroupEntry(group.deviceId(),
766 group.id());
alshabib10580802015-02-18 18:30:33 -0800767
768 if (existing != null) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700769 log.debug("removeGroupEntry: removing group entry {} in device {}",
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700770 group.id(),
771 group.deviceId());
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700772 //Removal from groupid based map will happen in the
773 //map update listener
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700774 getGroupStoreKeyMap().remove(new GroupStoreKeyMapKey(existing.deviceId(),
775 existing.appCookie()));
alshabib10580802015-02-18 18:30:33 -0800776 notifyDelegate(new GroupEvent(Type.GROUP_REMOVED, existing));
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700777 } else {
778 log.warn("removeGroupEntry for {} in device{} is "
779 + "not existing in our maps",
780 group.id(),
781 group.deviceId());
alshabib10580802015-02-18 18:30:33 -0800782 }
783 }
784
785 @Override
786 public void deviceInitialAuditCompleted(DeviceId deviceId,
787 boolean completed) {
788 synchronized (deviceAuditStatus) {
789 if (completed) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700790 log.debug("AUDIT completed for device {}",
791 deviceId);
alshabib10580802015-02-18 18:30:33 -0800792 deviceAuditStatus.put(deviceId, true);
793 // Execute all pending group requests
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700794 List<StoredGroupEntry> pendingGroupRequests =
795 getPendingGroupKeyTable().values()
796 .stream()
797 .filter(g-> g.deviceId().equals(deviceId))
798 .collect(Collectors.toList());
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700799 log.debug("processing pending group add requests for device {} and number of pending requests {}",
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700800 deviceId,
801 pendingGroupRequests.size());
802 for (Group group:pendingGroupRequests) {
alshabib10580802015-02-18 18:30:33 -0800803 GroupDescription tmp = new DefaultGroupDescription(
804 group.deviceId(),
805 group.type(),
806 group.buckets(),
807 group.appCookie(),
Saurav Das100e3b82015-04-30 11:12:10 -0700808 group.givenGroupId(),
alshabib10580802015-02-18 18:30:33 -0800809 group.appId());
810 storeGroupDescriptionInternal(tmp);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700811 getPendingGroupKeyTable().
812 remove(new GroupStoreKeyMapKey(deviceId, group.appCookie()));
alshabib10580802015-02-18 18:30:33 -0800813 }
alshabib10580802015-02-18 18:30:33 -0800814 } else {
Thomas Vachuskac40d4632015-04-09 16:55:03 -0700815 Boolean audited = deviceAuditStatus.get(deviceId);
816 if (audited != null && audited) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700817 log.debug("Clearing AUDIT status for device {}", deviceId);
alshabib10580802015-02-18 18:30:33 -0800818 deviceAuditStatus.put(deviceId, false);
819 }
820 }
821 }
822 }
823
824 @Override
825 public boolean deviceInitialAuditStatus(DeviceId deviceId) {
826 synchronized (deviceAuditStatus) {
Thomas Vachuskac40d4632015-04-09 16:55:03 -0700827 Boolean audited = deviceAuditStatus.get(deviceId);
828 return audited != null && audited;
alshabib10580802015-02-18 18:30:33 -0800829 }
830 }
831
832 @Override
833 public void groupOperationFailed(DeviceId deviceId, GroupOperation operation) {
834
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700835 StoredGroupEntry existing = getStoredGroupEntry(deviceId,
836 operation.groupId());
alshabib10580802015-02-18 18:30:33 -0800837
838 if (existing == null) {
839 log.warn("No group entry with ID {} found ", operation.groupId());
840 return;
841 }
842
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700843 log.warn("groupOperationFailed: group operation {} failed"
844 + "for group {} in device {}",
845 operation.opType(),
846 existing.id(),
847 existing.deviceId());
alshabib10580802015-02-18 18:30:33 -0800848 switch (operation.opType()) {
849 case ADD:
Srikanth Vavilapalli5428b6c2015-05-14 20:22:47 -0700850 if (existing.state() == GroupState.PENDING_ADD) {
851 //TODO: Need to add support for passing the group
852 //operation failure reason from group provider.
853 //If the error type is anything other than GROUP_EXISTS,
854 //then the GROUP_ADD_FAILED event should be raised even
855 //in PENDING_ADD_RETRY state also.
856 notifyDelegate(new GroupEvent(Type.GROUP_ADD_FAILED, existing));
857 log.warn("groupOperationFailed: cleaningup "
858 + "group {} from store in device {}....",
859 existing.id(),
860 existing.deviceId());
861 //Removal from groupid based map will happen in the
862 //map update listener
863 getGroupStoreKeyMap().remove(new GroupStoreKeyMapKey(existing.deviceId(),
864 existing.appCookie()));
865 }
alshabib10580802015-02-18 18:30:33 -0800866 break;
867 case MODIFY:
868 notifyDelegate(new GroupEvent(Type.GROUP_UPDATE_FAILED, existing));
869 break;
870 case DELETE:
871 notifyDelegate(new GroupEvent(Type.GROUP_REMOVE_FAILED, existing));
872 break;
873 default:
874 log.warn("Unknown group operation type {}", operation.opType());
875 }
alshabib10580802015-02-18 18:30:33 -0800876 }
877
878 @Override
879 public void addOrUpdateExtraneousGroupEntry(Group group) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700880 log.debug("add/update extraneous group entry {} in device {}",
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700881 group.id(),
882 group.deviceId());
alshabib10580802015-02-18 18:30:33 -0800883 ConcurrentMap<GroupId, Group> extraneousIdTable =
884 getExtraneousGroupIdTable(group.deviceId());
885 extraneousIdTable.put(group.id(), group);
886 // Check the reference counter
887 if (group.referenceCount() == 0) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700888 log.debug("Flow reference counter is zero and triggering remove",
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700889 group.id(),
890 group.deviceId());
alshabib10580802015-02-18 18:30:33 -0800891 notifyDelegate(new GroupEvent(Type.GROUP_REMOVE_REQUESTED, group));
892 }
893 }
894
895 @Override
896 public void removeExtraneousGroupEntry(Group group) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700897 log.debug("remove extraneous group entry {} of device {} from store",
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700898 group.id(),
899 group.deviceId());
alshabib10580802015-02-18 18:30:33 -0800900 ConcurrentMap<GroupId, Group> extraneousIdTable =
901 getExtraneousGroupIdTable(group.deviceId());
902 extraneousIdTable.remove(group.id());
903 }
904
905 @Override
906 public Iterable<Group> getExtraneousGroups(DeviceId deviceId) {
907 // flatten and make iterator unmodifiable
908 return FluentIterable.from(
909 getExtraneousGroupIdTable(deviceId).values());
910 }
911
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700912 /**
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700913 * Map handler to receive any events when the group key map is updated.
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700914 */
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700915 private class GroupStoreKeyMapListener implements
916 EventuallyConsistentMapListener<GroupStoreKeyMapKey, StoredGroupEntry> {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700917
918 @Override
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700919 public void event(EventuallyConsistentMapEvent<GroupStoreKeyMapKey,
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700920 StoredGroupEntry> mapEvent) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700921 GroupEvent groupEvent = null;
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700922 GroupStoreKeyMapKey key = mapEvent.key();
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700923 StoredGroupEntry group = mapEvent.value();
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700924 if ((key == null) && (group == null)) {
925 log.error("GroupStoreKeyMapListener: Received "
926 + "event {} with null entry", mapEvent.type());
927 return;
928 } else if (group == null) {
929 group = getGroupIdTable(key.deviceId()).values()
930 .stream()
931 .filter((storedGroup) -> (storedGroup.appCookie().equals(key.appCookie)))
932 .findFirst().get();
933 if (group == null) {
934 log.error("GroupStoreKeyMapListener: Received "
935 + "event {} with null entry... can not process", mapEvent.type());
936 return;
937 }
938 }
939 log.trace("received groupid map event {} for id {} in device {}",
940 mapEvent.type(),
941 group.id(),
942 key.deviceId());
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700943 if (mapEvent.type() == EventuallyConsistentMapEvent.Type.PUT) {
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700944 // Update the group ID table
945 getGroupIdTable(group.deviceId()).put(group.id(), group);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700946 if (mapEvent.value().state() == Group.GroupState.ADDED) {
947 if (mapEvent.value().isGroupStateAddedFirstTime()) {
948 groupEvent = new GroupEvent(Type.GROUP_ADDED,
949 mapEvent.value());
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700950 log.trace("Received first time GROUP_ADDED state update for id {} in device {}",
951 group.id(),
952 group.deviceId());
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700953 } else {
954 groupEvent = new GroupEvent(Type.GROUP_UPDATED,
955 mapEvent.value());
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700956 log.trace("Received following GROUP_ADDED state update for id {} in device {}",
957 group.id(),
958 group.deviceId());
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700959 }
960 }
961 } else if (mapEvent.type() == EventuallyConsistentMapEvent.Type.REMOVE) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700962 groupEvent = new GroupEvent(Type.GROUP_REMOVED, group);
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700963 // Remove the entry from the group ID table
964 getGroupIdTable(group.deviceId()).remove(group.id(), group);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700965 }
966
967 if (groupEvent != null) {
968 notifyDelegate(groupEvent);
969 }
970 }
971 }
Madan Jampani01e05fb2015-08-13 13:29:36 -0700972
973 private void process(GroupStoreMessage groupOp) {
974 log.debug("Received remote group operation {} request for device {}",
975 groupOp.type(),
976 groupOp.deviceId());
977 if (!mastershipService.isLocalMaster(groupOp.deviceId())) {
978 log.warn("This node is not MASTER for device {}", groupOp.deviceId());
979 return;
980 }
981 if (groupOp.type() == GroupStoreMessage.Type.ADD) {
982 storeGroupDescriptionInternal(groupOp.groupDesc());
983 } else if (groupOp.type() == GroupStoreMessage.Type.UPDATE) {
984 updateGroupDescriptionInternal(groupOp.deviceId(),
985 groupOp.appCookie(),
986 groupOp.updateType(),
987 groupOp.updateBuckets(),
988 groupOp.newAppCookie());
989 } else if (groupOp.type() == GroupStoreMessage.Type.DELETE) {
990 deleteGroupDescriptionInternal(groupOp.deviceId(),
991 groupOp.appCookie());
992 }
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700993 }
994
995 /**
996 * Flattened map key to be used to store group entries.
997 */
Ray Milkeyb3c5ce22015-08-10 09:07:36 -0700998 protected static class GroupStoreMapKey {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700999 private final DeviceId deviceId;
1000
1001 public GroupStoreMapKey(DeviceId deviceId) {
1002 this.deviceId = deviceId;
1003 }
1004
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001005 public DeviceId deviceId() {
1006 return deviceId;
1007 }
1008
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001009 @Override
1010 public boolean equals(Object o) {
1011 if (this == o) {
1012 return true;
1013 }
1014 if (!(o instanceof GroupStoreMapKey)) {
1015 return false;
1016 }
1017 GroupStoreMapKey that = (GroupStoreMapKey) o;
1018 return this.deviceId.equals(that.deviceId);
1019 }
1020
1021 @Override
1022 public int hashCode() {
1023 int result = 17;
1024
1025 result = 31 * result + Objects.hash(this.deviceId);
1026
1027 return result;
1028 }
1029 }
1030
Ray Milkeyb3c5ce22015-08-10 09:07:36 -07001031 protected static class GroupStoreKeyMapKey extends GroupStoreMapKey {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001032 private final GroupKey appCookie;
1033 public GroupStoreKeyMapKey(DeviceId deviceId,
1034 GroupKey appCookie) {
1035 super(deviceId);
1036 this.appCookie = appCookie;
1037 }
1038
1039 @Override
1040 public boolean equals(Object o) {
1041 if (this == o) {
1042 return true;
1043 }
1044 if (!(o instanceof GroupStoreKeyMapKey)) {
1045 return false;
1046 }
1047 GroupStoreKeyMapKey that = (GroupStoreKeyMapKey) o;
1048 return (super.equals(that) &&
1049 this.appCookie.equals(that.appCookie));
1050 }
1051
1052 @Override
1053 public int hashCode() {
1054 int result = 17;
1055
1056 result = 31 * result + super.hashCode() + Objects.hash(this.appCookie);
1057
1058 return result;
1059 }
1060 }
1061
Ray Milkeyb3c5ce22015-08-10 09:07:36 -07001062 protected static class GroupStoreIdMapKey extends GroupStoreMapKey {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001063 private final GroupId groupId;
1064 public GroupStoreIdMapKey(DeviceId deviceId,
1065 GroupId groupId) {
1066 super(deviceId);
1067 this.groupId = groupId;
1068 }
1069
1070 @Override
1071 public boolean equals(Object o) {
1072 if (this == o) {
1073 return true;
1074 }
1075 if (!(o instanceof GroupStoreIdMapKey)) {
1076 return false;
1077 }
1078 GroupStoreIdMapKey that = (GroupStoreIdMapKey) o;
1079 return (super.equals(that) &&
1080 this.groupId.equals(that.groupId));
1081 }
1082
1083 @Override
1084 public int hashCode() {
1085 int result = 17;
1086
1087 result = 31 * result + super.hashCode() + Objects.hash(this.groupId);
1088
1089 return result;
1090 }
1091 }
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001092
1093 @Override
1094 public void pushGroupMetrics(DeviceId deviceId,
1095 Collection<Group> groupEntries) {
1096 boolean deviceInitialAuditStatus =
1097 deviceInitialAuditStatus(deviceId);
1098 Set<Group> southboundGroupEntries =
1099 Sets.newHashSet(groupEntries);
1100 Set<StoredGroupEntry> storedGroupEntries =
1101 Sets.newHashSet(getStoredGroups(deviceId));
1102 Set<Group> extraneousStoredEntries =
1103 Sets.newHashSet(getExtraneousGroups(deviceId));
1104
1105 log.trace("pushGroupMetrics: Displaying all ({}) southboundGroupEntries for device {}",
1106 southboundGroupEntries.size(),
1107 deviceId);
1108 for (Iterator<Group> it = southboundGroupEntries.iterator(); it.hasNext();) {
1109 Group group = it.next();
1110 log.trace("Group {} in device {}", group, deviceId);
1111 }
1112
1113 log.trace("Displaying all ({}) stored group entries for device {}",
1114 storedGroupEntries.size(),
1115 deviceId);
1116 for (Iterator<StoredGroupEntry> it1 = storedGroupEntries.iterator();
1117 it1.hasNext();) {
1118 Group group = it1.next();
1119 log.trace("Stored Group {} for device {}", group, deviceId);
1120 }
1121
1122 for (Iterator<Group> it2 = southboundGroupEntries.iterator(); it2.hasNext();) {
1123 Group group = it2.next();
1124 if (storedGroupEntries.remove(group)) {
1125 // we both have the group, let's update some info then.
1126 log.trace("Group AUDIT: group {} exists in both planes for device {}",
1127 group.id(), deviceId);
1128 groupAdded(group);
1129 it2.remove();
1130 }
1131 }
1132 for (Group group : southboundGroupEntries) {
1133 if (getGroup(group.deviceId(), group.id()) != null) {
1134 // There is a group existing with the same id
1135 // It is possible that group update is
1136 // in progress while we got a stale info from switch
1137 if (!storedGroupEntries.remove(getGroup(
1138 group.deviceId(), group.id()))) {
1139 log.warn("Group AUDIT: Inconsistent state:"
1140 + "Group exists in ID based table while "
1141 + "not present in key based table");
1142 }
1143 } else {
1144 // there are groups in the switch that aren't in the store
1145 log.debug("Group AUDIT: extraneous group {} exists in data plane for device {}",
1146 group.id(), deviceId);
1147 extraneousStoredEntries.remove(group);
1148 extraneousGroup(group);
1149 }
1150 }
1151 for (Group group : storedGroupEntries) {
1152 // there are groups in the store that aren't in the switch
1153 log.debug("Group AUDIT: group {} missing in data plane for device {}",
1154 group.id(), deviceId);
1155 groupMissing(group);
1156 }
1157 for (Group group : extraneousStoredEntries) {
1158 // there are groups in the extraneous store that
1159 // aren't in the switch
1160 log.debug("Group AUDIT: clearing extransoeus group {} from store for device {}",
1161 group.id(), deviceId);
1162 removeExtraneousGroupEntry(group);
1163 }
1164
1165 if (!deviceInitialAuditStatus) {
1166 log.debug("Group AUDIT: Setting device {} initial AUDIT completed",
1167 deviceId);
1168 deviceInitialAuditCompleted(deviceId, true);
1169 }
1170 }
1171
1172 private void groupMissing(Group group) {
1173 switch (group.state()) {
1174 case PENDING_DELETE:
1175 log.debug("Group {} delete confirmation from device {}",
1176 group, group.deviceId());
1177 removeGroupEntry(group);
1178 break;
1179 case ADDED:
1180 case PENDING_ADD:
Srikanth Vavilapalli5428b6c2015-05-14 20:22:47 -07001181 case PENDING_ADD_RETRY:
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001182 case PENDING_UPDATE:
1183 log.debug("Group {} is in store but not on device {}",
1184 group, group.deviceId());
1185 StoredGroupEntry existing =
1186 getStoredGroupEntry(group.deviceId(), group.id());
Srikanth Vavilapalli5428b6c2015-05-14 20:22:47 -07001187 log.debug("groupMissing: group entry {} in device {} moving from {} to PENDING_ADD_RETRY",
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001188 existing.id(),
1189 existing.deviceId(),
1190 existing.state());
Srikanth Vavilapalli5428b6c2015-05-14 20:22:47 -07001191 existing.setState(Group.GroupState.PENDING_ADD_RETRY);
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001192 //Re-PUT map entries to trigger map update events
1193 getGroupStoreKeyMap().
1194 put(new GroupStoreKeyMapKey(existing.deviceId(),
1195 existing.appCookie()), existing);
1196 notifyDelegate(new GroupEvent(GroupEvent.Type.GROUP_ADD_REQUESTED,
1197 group));
1198 break;
1199 default:
1200 log.debug("Group {} has not been installed.", group);
1201 break;
1202 }
1203 }
1204
1205 private void extraneousGroup(Group group) {
1206 log.debug("Group {} is on device {} but not in store.",
1207 group, group.deviceId());
1208 addOrUpdateExtraneousGroupEntry(group);
1209 }
1210
1211 private void groupAdded(Group group) {
1212 log.trace("Group {} Added or Updated in device {}",
1213 group, group.deviceId());
1214 addOrUpdateGroupEntry(group);
1215 }
alshabib10580802015-02-18 18:30:33 -08001216}