blob: 8a732c8cd8e83936ce1158da40c28b06f1d48fed [file] [log] [blame]
alshabib10580802015-02-18 18:30:33 -08001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.group.impl;
17
Jonathan Hart6ec029a2015-03-24 17:12:35 -070018import com.google.common.collect.FluentIterable;
19import com.google.common.collect.Iterables;
Srikanth Vavilapalli23181912015-05-04 09:48:09 -070020import com.google.common.collect.Sets;
Srikanth Vavilapalli10e75cd2015-04-13 16:21:24 -070021
alshabib10580802015-02-18 18:30:33 -080022import org.apache.felix.scr.annotations.Activate;
23import org.apache.felix.scr.annotations.Component;
24import org.apache.felix.scr.annotations.Deactivate;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070025import org.apache.felix.scr.annotations.Reference;
26import org.apache.felix.scr.annotations.ReferenceCardinality;
alshabib10580802015-02-18 18:30:33 -080027import org.apache.felix.scr.annotations.Service;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070028import org.onlab.util.KryoNamespace;
alshabib10580802015-02-18 18:30:33 -080029import org.onlab.util.NewConcurrentHashMap;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070030import org.onosproject.cluster.ClusterService;
31import org.onosproject.core.DefaultApplicationId;
alshabib10580802015-02-18 18:30:33 -080032import org.onosproject.core.DefaultGroupId;
33import org.onosproject.core.GroupId;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070034import org.onosproject.mastership.MastershipService;
alshabib10580802015-02-18 18:30:33 -080035import org.onosproject.net.DeviceId;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070036import org.onosproject.net.MastershipRole;
37import org.onosproject.net.PortNumber;
38import org.onosproject.net.flow.DefaultTrafficTreatment;
39import org.onosproject.net.flow.FlowRule;
40import org.onosproject.net.flow.instructions.Instructions;
41import org.onosproject.net.flow.instructions.L0ModificationInstruction;
42import org.onosproject.net.flow.instructions.L2ModificationInstruction;
43import org.onosproject.net.flow.instructions.L3ModificationInstruction;
alshabib10580802015-02-18 18:30:33 -080044import org.onosproject.net.group.DefaultGroup;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070045import org.onosproject.net.group.DefaultGroupBucket;
alshabib10580802015-02-18 18:30:33 -080046import org.onosproject.net.group.DefaultGroupDescription;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070047import org.onosproject.net.group.DefaultGroupKey;
alshabib10580802015-02-18 18:30:33 -080048import org.onosproject.net.group.Group;
49import org.onosproject.net.group.Group.GroupState;
50import org.onosproject.net.group.GroupBucket;
51import org.onosproject.net.group.GroupBuckets;
52import org.onosproject.net.group.GroupDescription;
53import org.onosproject.net.group.GroupEvent;
54import org.onosproject.net.group.GroupEvent.Type;
55import org.onosproject.net.group.GroupKey;
56import org.onosproject.net.group.GroupOperation;
57import org.onosproject.net.group.GroupStore;
58import org.onosproject.net.group.GroupStoreDelegate;
Srikanth Vavilapalli10e75cd2015-04-13 16:21:24 -070059import org.onosproject.net.group.StoredGroupBucketEntry;
alshabib10580802015-02-18 18:30:33 -080060import org.onosproject.net.group.StoredGroupEntry;
61import org.onosproject.store.AbstractStore;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070062import org.onosproject.store.Timestamp;
63import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
64import org.onosproject.store.cluster.messaging.ClusterMessage;
65import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
Jonathan Hart63939a32015-05-08 11:57:03 -070066import org.onosproject.store.service.MultiValuedTimestamp;
Srikanth Vavilapalli23181912015-05-04 09:48:09 -070067import org.onosproject.store.serializers.DeviceIdSerializer;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070068import org.onosproject.store.serializers.KryoNamespaces;
Srikanth Vavilapalli23181912015-05-04 09:48:09 -070069import org.onosproject.store.serializers.URISerializer;
Jonathan Hart6ec029a2015-03-24 17:12:35 -070070import org.onosproject.store.service.ClockService;
71import org.onosproject.store.service.EventuallyConsistentMap;
72import org.onosproject.store.service.EventuallyConsistentMapBuilder;
73import org.onosproject.store.service.EventuallyConsistentMapEvent;
74import org.onosproject.store.service.EventuallyConsistentMapListener;
75import org.onosproject.store.service.StorageService;
alshabib10580802015-02-18 18:30:33 -080076import org.slf4j.Logger;
77
Jonathan Hart6ec029a2015-03-24 17:12:35 -070078import java.net.URI;
79import java.util.ArrayList;
Srikanth Vavilapalli23181912015-05-04 09:48:09 -070080import java.util.Collection;
Jonathan Hart6ec029a2015-03-24 17:12:35 -070081import java.util.HashMap;
Srikanth Vavilapalli23181912015-05-04 09:48:09 -070082import java.util.Iterator;
Jonathan Hart6ec029a2015-03-24 17:12:35 -070083import java.util.List;
84import java.util.Objects;
Sho SHIMIZU30d639b2015-05-05 09:30:35 -070085import java.util.Optional;
Srikanth Vavilapalli23181912015-05-04 09:48:09 -070086import java.util.Set;
Jonathan Hart6ec029a2015-03-24 17:12:35 -070087import java.util.concurrent.ConcurrentHashMap;
88import java.util.concurrent.ConcurrentMap;
89import java.util.concurrent.ExecutorService;
90import java.util.concurrent.Executors;
91import java.util.concurrent.atomic.AtomicInteger;
92import java.util.concurrent.atomic.AtomicLong;
93import java.util.stream.Collectors;
94
95import static org.apache.commons.lang3.concurrent.ConcurrentUtils.createIfAbsentUnchecked;
96import static org.onlab.util.Tools.groupedThreads;
97import static org.slf4j.LoggerFactory.getLogger;
alshabib10580802015-02-18 18:30:33 -080098
99/**
100 * Manages inventory of group entries using trivial in-memory implementation.
101 */
102@Component(immediate = true)
103@Service
104public class DistributedGroupStore
105 extends AbstractStore<GroupEvent, GroupStoreDelegate>
106 implements GroupStore {
107
108 private final Logger log = getLogger(getClass());
109
110 private final int dummyId = 0xffffffff;
111 private final GroupId dummyGroupId = new DefaultGroupId(dummyId);
112
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700113 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
114 protected ClusterCommunicationService clusterCommunicator;
115
116 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
117 protected ClusterService clusterService;
118
119 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700120 protected StorageService storageService;
121
122 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700123 protected MastershipService mastershipService;
124
125 // Per device group table with (device id + app cookie) as key
126 private EventuallyConsistentMap<GroupStoreKeyMapKey,
127 StoredGroupEntry> groupStoreEntriesByKey = null;
128 // Per device group table with (device id + group id) as key
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700129 private final ConcurrentMap<DeviceId, ConcurrentMap<GroupId, StoredGroupEntry>>
130 groupEntriesById = new ConcurrentHashMap<>();
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700131 private EventuallyConsistentMap<GroupStoreKeyMapKey,
132 StoredGroupEntry> auditPendingReqQueue = null;
alshabib10580802015-02-18 18:30:33 -0800133 private final ConcurrentMap<DeviceId, ConcurrentMap<GroupId, Group>>
134 extraneousGroupEntriesById = new ConcurrentHashMap<>();
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700135 private ExecutorService messageHandlingExecutor;
136 private static final int MESSAGE_HANDLER_THREAD_POOL_SIZE = 1;
alshabib10580802015-02-18 18:30:33 -0800137
138 private final HashMap<DeviceId, Boolean> deviceAuditStatus =
139 new HashMap<DeviceId, Boolean>();
140
141 private final AtomicInteger groupIdGen = new AtomicInteger();
142
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700143 private KryoNamespace.Builder kryoBuilder = null;
144
alshabib10580802015-02-18 18:30:33 -0800145 @Activate
146 public void activate() {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700147 kryoBuilder = new KryoNamespace.Builder()
148 .register(DefaultGroup.class,
149 DefaultGroupBucket.class,
150 DefaultGroupDescription.class,
151 DefaultGroupKey.class,
152 GroupDescription.Type.class,
153 Group.GroupState.class,
154 GroupBuckets.class,
155 DefaultGroupId.class,
156 GroupStoreMessage.class,
157 GroupStoreMessage.Type.class,
158 UpdateType.class,
159 GroupStoreMessageSubjects.class,
160 MultiValuedTimestamp.class,
161 GroupStoreKeyMapKey.class,
162 GroupStoreIdMapKey.class,
163 GroupStoreMapKey.class
164 )
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700165 .register(new URISerializer(), URI.class)
166 .register(new DeviceIdSerializer(), DeviceId.class)
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700167 .register(PortNumber.class)
168 .register(DefaultApplicationId.class)
169 .register(DefaultTrafficTreatment.class,
170 Instructions.DropInstruction.class,
171 Instructions.OutputInstruction.class,
172 Instructions.GroupInstruction.class,
173 Instructions.TableTypeTransition.class,
174 FlowRule.Type.class,
175 L0ModificationInstruction.class,
176 L0ModificationInstruction.L0SubType.class,
177 L0ModificationInstruction.ModLambdaInstruction.class,
178 L2ModificationInstruction.class,
179 L2ModificationInstruction.L2SubType.class,
180 L2ModificationInstruction.ModEtherInstruction.class,
181 L2ModificationInstruction.PushHeaderInstructions.class,
182 L2ModificationInstruction.ModVlanIdInstruction.class,
183 L2ModificationInstruction.ModVlanPcpInstruction.class,
184 L2ModificationInstruction.ModMplsLabelInstruction.class,
185 L2ModificationInstruction.ModMplsTtlInstruction.class,
186 L3ModificationInstruction.class,
187 L3ModificationInstruction.L3SubType.class,
188 L3ModificationInstruction.ModIPInstruction.class,
189 L3ModificationInstruction.ModIPv6FlowLabelInstruction.class,
190 L3ModificationInstruction.ModTtlInstruction.class,
191 org.onlab.packet.MplsLabel.class
192 )
193 .register(org.onosproject.cluster.NodeId.class)
194 .register(KryoNamespaces.BASIC)
195 .register(KryoNamespaces.MISC);
196
197 messageHandlingExecutor = Executors.
198 newFixedThreadPool(MESSAGE_HANDLER_THREAD_POOL_SIZE,
199 groupedThreads("onos/store/group",
200 "message-handlers"));
201 clusterCommunicator.
202 addSubscriber(GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST,
203 new ClusterGroupMsgHandler(),
204 messageHandlingExecutor);
205
206 log.debug("Creating EC map groupstorekeymap");
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700207 EventuallyConsistentMapBuilder<GroupStoreKeyMapKey, StoredGroupEntry>
208 keyMapBuilder = storageService.eventuallyConsistentMapBuilder();
209
210 groupStoreEntriesByKey = keyMapBuilder
211 .withName("groupstorekeymap")
212 .withSerializer(kryoBuilder)
213 .withClockService(new GroupStoreLogicalClockManager<>())
214 .build();
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700215 groupStoreEntriesByKey.addListener(new GroupStoreKeyMapListener());
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700216 log.debug("Current size of groupstorekeymap:{}",
217 groupStoreEntriesByKey.size());
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700218
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700219 log.debug("Creating EC map pendinggroupkeymap");
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700220 EventuallyConsistentMapBuilder<GroupStoreKeyMapKey, StoredGroupEntry>
221 auditMapBuilder = storageService.eventuallyConsistentMapBuilder();
222
223 auditPendingReqQueue = auditMapBuilder
224 .withName("pendinggroupkeymap")
225 .withSerializer(kryoBuilder)
226 .withClockService(new GroupStoreLogicalClockManager<>())
227 .build();
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700228 log.debug("Current size of pendinggroupkeymap:{}",
229 auditPendingReqQueue.size());
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700230
alshabib10580802015-02-18 18:30:33 -0800231 log.info("Started");
232 }
233
234 @Deactivate
235 public void deactivate() {
Thomas Vachuska152f9fd2015-04-02 16:28:13 -0700236 groupStoreEntriesByKey.destroy();
Thomas Vachuska152f9fd2015-04-02 16:28:13 -0700237 auditPendingReqQueue.destroy();
alshabib10580802015-02-18 18:30:33 -0800238 log.info("Stopped");
239 }
240
alshabib10580802015-02-18 18:30:33 -0800241 private static NewConcurrentHashMap<GroupId, Group>
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700242 lazyEmptyExtraneousGroupIdTable() {
alshabib10580802015-02-18 18:30:33 -0800243 return NewConcurrentHashMap.<GroupId, Group>ifNeeded();
244 }
245
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700246 private static NewConcurrentHashMap<GroupId, StoredGroupEntry>
247 lazyEmptyGroupIdTable() {
248 return NewConcurrentHashMap.<GroupId, StoredGroupEntry>ifNeeded();
249 }
250
alshabib10580802015-02-18 18:30:33 -0800251 /**
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700252 * Returns the group store eventual consistent key map.
alshabib10580802015-02-18 18:30:33 -0800253 *
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700254 * @return Map representing group key table.
alshabib10580802015-02-18 18:30:33 -0800255 */
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700256 private EventuallyConsistentMap<GroupStoreKeyMapKey, StoredGroupEntry>
257 getGroupStoreKeyMap() {
258 return groupStoreEntriesByKey;
alshabib10580802015-02-18 18:30:33 -0800259 }
260
261 /**
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700262 * Returns the group id table for specified device.
alshabib10580802015-02-18 18:30:33 -0800263 *
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700264 * @param deviceId identifier of the device
265 * @return Map representing group key table of given device.
alshabib10580802015-02-18 18:30:33 -0800266 */
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700267 private ConcurrentMap<GroupId, StoredGroupEntry> getGroupIdTable(DeviceId deviceId) {
268 return createIfAbsentUnchecked(groupEntriesById,
269 deviceId, lazyEmptyGroupIdTable());
alshabib10580802015-02-18 18:30:33 -0800270 }
271
272 /**
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700273 * Returns the pending group request table.
alshabib10580802015-02-18 18:30:33 -0800274 *
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700275 * @return Map representing group key table.
alshabib10580802015-02-18 18:30:33 -0800276 */
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700277 private EventuallyConsistentMap<GroupStoreKeyMapKey, StoredGroupEntry>
278 getPendingGroupKeyTable() {
279 return auditPendingReqQueue;
alshabib10580802015-02-18 18:30:33 -0800280 }
281
282 /**
283 * Returns the extraneous group id table for specified device.
284 *
285 * @param deviceId identifier of the device
286 * @return Map representing group key table of given device.
287 */
288 private ConcurrentMap<GroupId, Group>
289 getExtraneousGroupIdTable(DeviceId deviceId) {
290 return createIfAbsentUnchecked(extraneousGroupEntriesById,
291 deviceId,
292 lazyEmptyExtraneousGroupIdTable());
293 }
294
295 /**
296 * Returns the number of groups for the specified device in the store.
297 *
298 * @return number of groups for the specified device
299 */
300 @Override
301 public int getGroupCount(DeviceId deviceId) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700302 return (getGroups(deviceId) != null) ?
303 Iterables.size(getGroups(deviceId)) : 0;
alshabib10580802015-02-18 18:30:33 -0800304 }
305
306 /**
307 * Returns the groups associated with a device.
308 *
309 * @param deviceId the device ID
310 *
311 * @return the group entries
312 */
313 @Override
314 public Iterable<Group> getGroups(DeviceId deviceId) {
315 // flatten and make iterator unmodifiable
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700316 log.debug("getGroups: for device {} total number of groups {}",
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700317 deviceId, getGroupStoreKeyMap().values().size());
318 return FluentIterable.from(getGroupStoreKeyMap().values())
319 .filter(input -> input.deviceId().equals(deviceId))
320 .transform(input -> input);
alshabib10580802015-02-18 18:30:33 -0800321 }
322
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700323 private Iterable<StoredGroupEntry> getStoredGroups(DeviceId deviceId) {
324 // flatten and make iterator unmodifiable
325 log.debug("getGroups: for device {} total number of groups {}",
326 deviceId, getGroupStoreKeyMap().values().size());
327 return FluentIterable.from(getGroupStoreKeyMap().values())
328 .filter(input -> input.deviceId().equals(deviceId));
329 }
330
alshabib10580802015-02-18 18:30:33 -0800331 /**
332 * Returns the stored group entry.
333 *
334 * @param deviceId the device ID
335 * @param appCookie the group key
336 *
337 * @return a group associated with the key
338 */
339 @Override
340 public Group getGroup(DeviceId deviceId, GroupKey appCookie) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700341 return getStoredGroupEntry(deviceId, appCookie);
342 }
343
344 private StoredGroupEntry getStoredGroupEntry(DeviceId deviceId,
345 GroupKey appCookie) {
346 return getGroupStoreKeyMap().get(new GroupStoreKeyMapKey(deviceId,
347 appCookie));
348 }
349
350 @Override
351 public Group getGroup(DeviceId deviceId, GroupId groupId) {
352 return getStoredGroupEntry(deviceId, groupId);
353 }
354
355 private StoredGroupEntry getStoredGroupEntry(DeviceId deviceId,
356 GroupId groupId) {
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700357 return getGroupIdTable(deviceId).get(groupId);
alshabib10580802015-02-18 18:30:33 -0800358 }
359
360 private int getFreeGroupIdValue(DeviceId deviceId) {
361 int freeId = groupIdGen.incrementAndGet();
362
363 while (true) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700364 Group existing = getGroup(deviceId, new DefaultGroupId(freeId));
alshabib10580802015-02-18 18:30:33 -0800365 if (existing == null) {
366 existing = (
367 extraneousGroupEntriesById.get(deviceId) != null) ?
368 extraneousGroupEntriesById.get(deviceId).
369 get(new DefaultGroupId(freeId)) :
370 null;
371 }
372 if (existing != null) {
373 freeId = groupIdGen.incrementAndGet();
374 } else {
375 break;
376 }
377 }
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700378 log.debug("getFreeGroupIdValue: Next Free ID is {}", freeId);
alshabib10580802015-02-18 18:30:33 -0800379 return freeId;
380 }
381
382 /**
383 * Stores a new group entry using the information from group description.
384 *
385 * @param groupDesc group description to be used to create group entry
386 */
387 @Override
388 public void storeGroupDescription(GroupDescription groupDesc) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700389 log.debug("In storeGroupDescription");
alshabib10580802015-02-18 18:30:33 -0800390 // Check if a group is existing with the same key
391 if (getGroup(groupDesc.deviceId(), groupDesc.appCookie()) != null) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700392 log.warn("Group already exists with the same key {}",
393 groupDesc.appCookie());
alshabib10580802015-02-18 18:30:33 -0800394 return;
395 }
396
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700397 // Check if group to be created by a remote instance
398 if (mastershipService.getLocalRole(
399 groupDesc.deviceId()) != MastershipRole.MASTER) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700400 log.debug("storeGroupDescription: Device {} local role is not MASTER",
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700401 groupDesc.deviceId());
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700402 if (mastershipService.getMasterFor(groupDesc.deviceId()) == null) {
403 log.error("No Master for device {}..."
404 + "Can not perform add group operation",
405 groupDesc.deviceId());
406 //TODO: Send Group operation failure event
407 return;
408 }
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700409 GroupStoreMessage groupOp = GroupStoreMessage.
410 createGroupAddRequestMsg(groupDesc.deviceId(),
411 groupDesc);
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700412
413 if (!clusterCommunicator.unicast(groupOp,
414 GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST,
415 m -> kryoBuilder.build().serialize(m),
416 mastershipService.getMasterFor(groupDesc.deviceId()))) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700417 log.warn("Failed to send request to master: {} to {}",
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700418 groupOp,
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700419 mastershipService.getMasterFor(groupDesc.deviceId()));
420 //TODO: Send Group operation failure event
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700421 return;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700422 }
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700423 log.debug("Sent Group operation request for device {} to remote MASTER {}",
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700424 groupDesc.deviceId(),
425 mastershipService.getMasterFor(groupDesc.deviceId()));
alshabib10580802015-02-18 18:30:33 -0800426 return;
427 }
428
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700429 log.debug("Store group for device {} is getting handled locally",
430 groupDesc.deviceId());
alshabib10580802015-02-18 18:30:33 -0800431 storeGroupDescriptionInternal(groupDesc);
432 }
433
434 private void storeGroupDescriptionInternal(GroupDescription groupDesc) {
435 // Check if a group is existing with the same key
436 if (getGroup(groupDesc.deviceId(), groupDesc.appCookie()) != null) {
437 return;
438 }
439
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700440 if (deviceAuditStatus.get(groupDesc.deviceId()) == null) {
441 // Device group audit has not completed yet
442 // Add this group description to pending group key table
443 // Create a group entry object with Dummy Group ID
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700444 log.debug("storeGroupDescriptionInternal: Device {} AUDIT pending...Queuing Group ADD request",
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700445 groupDesc.deviceId());
446 StoredGroupEntry group = new DefaultGroup(dummyGroupId, groupDesc);
447 group.setState(GroupState.WAITING_AUDIT_COMPLETE);
448 EventuallyConsistentMap<GroupStoreKeyMapKey, StoredGroupEntry> pendingKeyTable =
449 getPendingGroupKeyTable();
450 pendingKeyTable.put(new GroupStoreKeyMapKey(groupDesc.deviceId(),
451 groupDesc.appCookie()),
452 group);
453 return;
454 }
455
Saurav Das100e3b82015-04-30 11:12:10 -0700456 GroupId id = null;
457 if (groupDesc.givenGroupId() == null) {
458 // Get a new group identifier
459 id = new DefaultGroupId(getFreeGroupIdValue(groupDesc.deviceId()));
460 } else {
461 id = new DefaultGroupId(groupDesc.givenGroupId());
462 }
alshabib10580802015-02-18 18:30:33 -0800463 // Create a group entry object
464 StoredGroupEntry group = new DefaultGroup(id, groupDesc);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700465 // Insert the newly created group entry into key and id maps
466 getGroupStoreKeyMap().
467 put(new GroupStoreKeyMapKey(groupDesc.deviceId(),
468 groupDesc.appCookie()), group);
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700469 // Ensure it also inserted into group id based table to
470 // avoid any chances of duplication in group id generation
471 getGroupIdTable(groupDesc.deviceId()).
472 put(id, group);
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700473 log.debug("storeGroupDescriptionInternal: Processing Group ADD request for Id {} in device {}",
474 id,
475 groupDesc.deviceId());
alshabib10580802015-02-18 18:30:33 -0800476 notifyDelegate(new GroupEvent(GroupEvent.Type.GROUP_ADD_REQUESTED,
477 group));
478 }
479
480 /**
481 * Updates the existing group entry with the information
482 * from group description.
483 *
484 * @param deviceId the device ID
485 * @param oldAppCookie the current group key
486 * @param type update type
487 * @param newBuckets group buckets for updates
488 * @param newAppCookie optional new group key
489 */
490 @Override
491 public void updateGroupDescription(DeviceId deviceId,
492 GroupKey oldAppCookie,
493 UpdateType type,
494 GroupBuckets newBuckets,
495 GroupKey newAppCookie) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700496 // Check if group update to be done by a remote instance
sangho52abe3a2015-05-05 14:13:34 -0700497 if (mastershipService.getMasterFor(deviceId) != null &&
498 mastershipService.getLocalRole(deviceId) != MastershipRole.MASTER) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700499 log.debug("updateGroupDescription: Device {} local role is not MASTER",
500 deviceId);
501 if (mastershipService.getMasterFor(deviceId) == null) {
502 log.error("No Master for device {}..."
503 + "Can not perform update group operation",
504 deviceId);
505 //TODO: Send Group operation failure event
506 return;
507 }
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700508 GroupStoreMessage groupOp = GroupStoreMessage.
509 createGroupUpdateRequestMsg(deviceId,
510 oldAppCookie,
511 type,
512 newBuckets,
513 newAppCookie);
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700514
515 if (!clusterCommunicator.unicast(groupOp,
516 GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST,
517 m -> kryoBuilder.build().serialize(m),
518 mastershipService.getMasterFor(deviceId))) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700519 log.warn("Failed to send request to master: {} to {}",
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700520 groupOp,
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700521 mastershipService.getMasterFor(deviceId));
522 //TODO: Send Group operation failure event
523 }
524 return;
525 }
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700526 log.debug("updateGroupDescription for device {} is getting handled locally",
527 deviceId);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700528 updateGroupDescriptionInternal(deviceId,
529 oldAppCookie,
530 type,
531 newBuckets,
532 newAppCookie);
533 }
534
535 private void updateGroupDescriptionInternal(DeviceId deviceId,
536 GroupKey oldAppCookie,
537 UpdateType type,
538 GroupBuckets newBuckets,
539 GroupKey newAppCookie) {
alshabib10580802015-02-18 18:30:33 -0800540 // Check if a group is existing with the provided key
541 Group oldGroup = getGroup(deviceId, oldAppCookie);
542 if (oldGroup == null) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700543 log.warn("updateGroupDescriptionInternal: Group not found...strange");
alshabib10580802015-02-18 18:30:33 -0800544 return;
545 }
546
547 List<GroupBucket> newBucketList = getUpdatedBucketList(oldGroup,
548 type,
549 newBuckets);
550 if (newBucketList != null) {
551 // Create a new group object from the old group
552 GroupBuckets updatedBuckets = new GroupBuckets(newBucketList);
553 GroupKey newCookie = (newAppCookie != null) ? newAppCookie : oldAppCookie;
554 GroupDescription updatedGroupDesc = new DefaultGroupDescription(
555 oldGroup.deviceId(),
556 oldGroup.type(),
557 updatedBuckets,
558 newCookie,
Saurav Das100e3b82015-04-30 11:12:10 -0700559 oldGroup.givenGroupId(),
alshabib10580802015-02-18 18:30:33 -0800560 oldGroup.appId());
561 StoredGroupEntry newGroup = new DefaultGroup(oldGroup.id(),
562 updatedGroupDesc);
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700563 log.debug("updateGroupDescriptionInternal: group entry {} in device {} moving from {} to PENDING_UPDATE",
564 oldGroup.id(),
565 oldGroup.deviceId(),
566 oldGroup.state());
alshabib10580802015-02-18 18:30:33 -0800567 newGroup.setState(GroupState.PENDING_UPDATE);
568 newGroup.setLife(oldGroup.life());
569 newGroup.setPackets(oldGroup.packets());
570 newGroup.setBytes(oldGroup.bytes());
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700571 //Update the group entry in groupkey based map.
572 //Update to groupid based map will happen in the
573 //groupkey based map update listener
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700574 log.debug("updateGroupDescriptionInternal with type {}: Group updated with buckets",
575 type);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700576 getGroupStoreKeyMap().
577 put(new GroupStoreKeyMapKey(newGroup.deviceId(),
578 newGroup.appCookie()), newGroup);
alshabib10580802015-02-18 18:30:33 -0800579 notifyDelegate(new GroupEvent(Type.GROUP_UPDATE_REQUESTED, newGroup));
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700580 } else {
581 log.warn("updateGroupDescriptionInternal with type {}: No "
582 + "change in the buckets in update", type);
alshabib10580802015-02-18 18:30:33 -0800583 }
584 }
585
586 private List<GroupBucket> getUpdatedBucketList(Group oldGroup,
587 UpdateType type,
588 GroupBuckets buckets) {
589 GroupBuckets oldBuckets = oldGroup.buckets();
590 List<GroupBucket> newBucketList = new ArrayList<GroupBucket>(
591 oldBuckets.buckets());
592 boolean groupDescUpdated = false;
593
594 if (type == UpdateType.ADD) {
595 // Check if the any of the new buckets are part of
596 // the old bucket list
597 for (GroupBucket addBucket:buckets.buckets()) {
598 if (!newBucketList.contains(addBucket)) {
599 newBucketList.add(addBucket);
600 groupDescUpdated = true;
601 }
602 }
603 } else if (type == UpdateType.REMOVE) {
604 // Check if the to be removed buckets are part of the
605 // old bucket list
606 for (GroupBucket removeBucket:buckets.buckets()) {
607 if (newBucketList.contains(removeBucket)) {
608 newBucketList.remove(removeBucket);
609 groupDescUpdated = true;
610 }
611 }
612 }
613
614 if (groupDescUpdated) {
615 return newBucketList;
616 } else {
617 return null;
618 }
619 }
620
621 /**
622 * Triggers deleting the existing group entry.
623 *
624 * @param deviceId the device ID
625 * @param appCookie the group key
626 */
627 @Override
628 public void deleteGroupDescription(DeviceId deviceId,
629 GroupKey appCookie) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700630 // Check if group to be deleted by a remote instance
631 if (mastershipService.
632 getLocalRole(deviceId) != MastershipRole.MASTER) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700633 log.debug("deleteGroupDescription: Device {} local role is not MASTER",
634 deviceId);
635 if (mastershipService.getMasterFor(deviceId) == null) {
636 log.error("No Master for device {}..."
637 + "Can not perform delete group operation",
638 deviceId);
639 //TODO: Send Group operation failure event
640 return;
641 }
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700642 GroupStoreMessage groupOp = GroupStoreMessage.
643 createGroupDeleteRequestMsg(deviceId,
644 appCookie);
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700645
646 if (!clusterCommunicator.unicast(groupOp,
647 GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST,
648 m -> kryoBuilder.build().serialize(m),
649 mastershipService.getMasterFor(deviceId))) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700650 log.warn("Failed to send request to master: {} to {}",
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700651 groupOp,
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700652 mastershipService.getMasterFor(deviceId));
653 //TODO: Send Group operation failure event
654 }
655 return;
656 }
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700657 log.debug("deleteGroupDescription in device {} is getting handled locally",
658 deviceId);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700659 deleteGroupDescriptionInternal(deviceId, appCookie);
660 }
661
662 private void deleteGroupDescriptionInternal(DeviceId deviceId,
663 GroupKey appCookie) {
alshabib10580802015-02-18 18:30:33 -0800664 // Check if a group is existing with the provided key
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700665 StoredGroupEntry existing = getStoredGroupEntry(deviceId, appCookie);
alshabib10580802015-02-18 18:30:33 -0800666 if (existing == null) {
667 return;
668 }
669
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700670 log.debug("deleteGroupDescriptionInternal: group entry {} in device {} moving from {} to PENDING_DELETE",
671 existing.id(),
672 existing.deviceId(),
673 existing.state());
alshabib10580802015-02-18 18:30:33 -0800674 synchronized (existing) {
675 existing.setState(GroupState.PENDING_DELETE);
676 }
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700677 log.debug("deleteGroupDescriptionInternal: in device {} issuing GROUP_REMOVE_REQUESTED",
678 deviceId);
alshabib10580802015-02-18 18:30:33 -0800679 notifyDelegate(new GroupEvent(Type.GROUP_REMOVE_REQUESTED, existing));
680 }
681
682 /**
683 * Stores a new group entry, or updates an existing entry.
684 *
685 * @param group group entry
686 */
687 @Override
688 public void addOrUpdateGroupEntry(Group group) {
689 // check if this new entry is an update to an existing entry
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700690 StoredGroupEntry existing = getStoredGroupEntry(group.deviceId(),
691 group.id());
alshabib10580802015-02-18 18:30:33 -0800692 GroupEvent event = null;
693
694 if (existing != null) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700695 log.debug("addOrUpdateGroupEntry: updating group entry {} in device {}",
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700696 group.id(),
697 group.deviceId());
alshabib10580802015-02-18 18:30:33 -0800698 synchronized (existing) {
Srikanth Vavilapalli10e75cd2015-04-13 16:21:24 -0700699 for (GroupBucket bucket:group.buckets().buckets()) {
Sho SHIMIZU30d639b2015-05-05 09:30:35 -0700700 Optional<GroupBucket> matchingBucket =
Srikanth Vavilapalli10e75cd2015-04-13 16:21:24 -0700701 existing.buckets().buckets()
702 .stream()
703 .filter((existingBucket)->(existingBucket.equals(bucket)))
704 .findFirst();
705 if (matchingBucket.isPresent()) {
706 ((StoredGroupBucketEntry) matchingBucket.
707 get()).setPackets(bucket.packets());
708 ((StoredGroupBucketEntry) matchingBucket.
709 get()).setBytes(bucket.bytes());
710 } else {
711 log.warn("addOrUpdateGroupEntry: No matching "
712 + "buckets to update stats");
713 }
714 }
alshabib10580802015-02-18 18:30:33 -0800715 existing.setLife(group.life());
716 existing.setPackets(group.packets());
717 existing.setBytes(group.bytes());
718 if (existing.state() == GroupState.PENDING_ADD) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700719 log.debug("addOrUpdateGroupEntry: group entry {} in device {} moving from {} to ADDED",
720 existing.id(),
721 existing.deviceId(),
722 GroupState.PENDING_ADD);
alshabib10580802015-02-18 18:30:33 -0800723 existing.setState(GroupState.ADDED);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700724 existing.setIsGroupStateAddedFirstTime(true);
alshabib10580802015-02-18 18:30:33 -0800725 event = new GroupEvent(Type.GROUP_ADDED, existing);
726 } else {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700727 log.debug("addOrUpdateGroupEntry: group entry {} in device {} moving from {} to ADDED",
728 existing.id(),
729 existing.deviceId(),
730 GroupState.PENDING_UPDATE);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700731 existing.setState(GroupState.ADDED);
732 existing.setIsGroupStateAddedFirstTime(false);
alshabib10580802015-02-18 18:30:33 -0800733 event = new GroupEvent(Type.GROUP_UPDATED, existing);
734 }
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700735 //Re-PUT map entries to trigger map update events
736 getGroupStoreKeyMap().
737 put(new GroupStoreKeyMapKey(existing.deviceId(),
738 existing.appCookie()), existing);
alshabib10580802015-02-18 18:30:33 -0800739 }
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700740 } else {
741 log.warn("addOrUpdateGroupEntry: Group update "
742 + "happening for a non-existing entry in the map");
alshabib10580802015-02-18 18:30:33 -0800743 }
744
745 if (event != null) {
746 notifyDelegate(event);
747 }
748 }
749
750 /**
751 * Removes the group entry from store.
752 *
753 * @param group group entry
754 */
755 @Override
756 public void removeGroupEntry(Group group) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700757 StoredGroupEntry existing = getStoredGroupEntry(group.deviceId(),
758 group.id());
alshabib10580802015-02-18 18:30:33 -0800759
760 if (existing != null) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700761 log.debug("removeGroupEntry: removing group entry {} in device {}",
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700762 group.id(),
763 group.deviceId());
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700764 //Removal from groupid based map will happen in the
765 //map update listener
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700766 getGroupStoreKeyMap().remove(new GroupStoreKeyMapKey(existing.deviceId(),
767 existing.appCookie()));
alshabib10580802015-02-18 18:30:33 -0800768 notifyDelegate(new GroupEvent(Type.GROUP_REMOVED, existing));
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700769 } else {
770 log.warn("removeGroupEntry for {} in device{} is "
771 + "not existing in our maps",
772 group.id(),
773 group.deviceId());
alshabib10580802015-02-18 18:30:33 -0800774 }
775 }
776
777 @Override
778 public void deviceInitialAuditCompleted(DeviceId deviceId,
779 boolean completed) {
780 synchronized (deviceAuditStatus) {
781 if (completed) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700782 log.debug("AUDIT completed for device {}",
783 deviceId);
alshabib10580802015-02-18 18:30:33 -0800784 deviceAuditStatus.put(deviceId, true);
785 // Execute all pending group requests
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700786 List<StoredGroupEntry> pendingGroupRequests =
787 getPendingGroupKeyTable().values()
788 .stream()
789 .filter(g-> g.deviceId().equals(deviceId))
790 .collect(Collectors.toList());
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700791 log.debug("processing pending group add requests for device {} and number of pending requests {}",
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700792 deviceId,
793 pendingGroupRequests.size());
794 for (Group group:pendingGroupRequests) {
alshabib10580802015-02-18 18:30:33 -0800795 GroupDescription tmp = new DefaultGroupDescription(
796 group.deviceId(),
797 group.type(),
798 group.buckets(),
799 group.appCookie(),
Saurav Das100e3b82015-04-30 11:12:10 -0700800 group.givenGroupId(),
alshabib10580802015-02-18 18:30:33 -0800801 group.appId());
802 storeGroupDescriptionInternal(tmp);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700803 getPendingGroupKeyTable().
804 remove(new GroupStoreKeyMapKey(deviceId, group.appCookie()));
alshabib10580802015-02-18 18:30:33 -0800805 }
alshabib10580802015-02-18 18:30:33 -0800806 } else {
Thomas Vachuskac40d4632015-04-09 16:55:03 -0700807 Boolean audited = deviceAuditStatus.get(deviceId);
808 if (audited != null && audited) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700809 log.debug("Clearing AUDIT status for device {}", deviceId);
alshabib10580802015-02-18 18:30:33 -0800810 deviceAuditStatus.put(deviceId, false);
811 }
812 }
813 }
814 }
815
816 @Override
817 public boolean deviceInitialAuditStatus(DeviceId deviceId) {
818 synchronized (deviceAuditStatus) {
Thomas Vachuskac40d4632015-04-09 16:55:03 -0700819 Boolean audited = deviceAuditStatus.get(deviceId);
820 return audited != null && audited;
alshabib10580802015-02-18 18:30:33 -0800821 }
822 }
823
824 @Override
825 public void groupOperationFailed(DeviceId deviceId, GroupOperation operation) {
826
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700827 StoredGroupEntry existing = getStoredGroupEntry(deviceId,
828 operation.groupId());
alshabib10580802015-02-18 18:30:33 -0800829
830 if (existing == null) {
831 log.warn("No group entry with ID {} found ", operation.groupId());
832 return;
833 }
834
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700835 log.warn("groupOperationFailed: group operation {} failed"
836 + "for group {} in device {}",
837 operation.opType(),
838 existing.id(),
839 existing.deviceId());
alshabib10580802015-02-18 18:30:33 -0800840 switch (operation.opType()) {
841 case ADD:
842 notifyDelegate(new GroupEvent(Type.GROUP_ADD_FAILED, existing));
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700843 log.warn("groupOperationFailed: cleaningup "
844 + "group {} from store in device {}....",
845 existing.id(),
846 existing.deviceId());
847 //Removal from groupid based map will happen in the
848 //map update listener
849 getGroupStoreKeyMap().remove(new GroupStoreKeyMapKey(existing.deviceId(),
850 existing.appCookie()));
alshabib10580802015-02-18 18:30:33 -0800851 break;
852 case MODIFY:
853 notifyDelegate(new GroupEvent(Type.GROUP_UPDATE_FAILED, existing));
854 break;
855 case DELETE:
856 notifyDelegate(new GroupEvent(Type.GROUP_REMOVE_FAILED, existing));
857 break;
858 default:
859 log.warn("Unknown group operation type {}", operation.opType());
860 }
alshabib10580802015-02-18 18:30:33 -0800861 }
862
863 @Override
864 public void addOrUpdateExtraneousGroupEntry(Group group) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700865 log.debug("add/update extraneous group entry {} in device {}",
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700866 group.id(),
867 group.deviceId());
alshabib10580802015-02-18 18:30:33 -0800868 ConcurrentMap<GroupId, Group> extraneousIdTable =
869 getExtraneousGroupIdTable(group.deviceId());
870 extraneousIdTable.put(group.id(), group);
871 // Check the reference counter
872 if (group.referenceCount() == 0) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700873 log.debug("Flow reference counter is zero and triggering remove",
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700874 group.id(),
875 group.deviceId());
alshabib10580802015-02-18 18:30:33 -0800876 notifyDelegate(new GroupEvent(Type.GROUP_REMOVE_REQUESTED, group));
877 }
878 }
879
880 @Override
881 public void removeExtraneousGroupEntry(Group group) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700882 log.debug("remove extraneous group entry {} of device {} from store",
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700883 group.id(),
884 group.deviceId());
alshabib10580802015-02-18 18:30:33 -0800885 ConcurrentMap<GroupId, Group> extraneousIdTable =
886 getExtraneousGroupIdTable(group.deviceId());
887 extraneousIdTable.remove(group.id());
888 }
889
890 @Override
891 public Iterable<Group> getExtraneousGroups(DeviceId deviceId) {
892 // flatten and make iterator unmodifiable
893 return FluentIterable.from(
894 getExtraneousGroupIdTable(deviceId).values());
895 }
896
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700897 /**
898 * ClockService that generates wallclock based timestamps.
899 */
900 private class GroupStoreLogicalClockManager<T, U>
901 implements ClockService<T, U> {
alshabib10580802015-02-18 18:30:33 -0800902
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700903 private final AtomicLong sequenceNumber = new AtomicLong(0);
904
905 @Override
906 public Timestamp getTimestamp(T t1, U u1) {
907 return new MultiValuedTimestamp<>(System.currentTimeMillis(),
908 sequenceNumber.getAndIncrement());
909 }
910 }
911
912 /**
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700913 * Map handler to receive any events when the group key map is updated.
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700914 */
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700915 private class GroupStoreKeyMapListener implements
916 EventuallyConsistentMapListener<GroupStoreKeyMapKey, StoredGroupEntry> {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700917
918 @Override
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700919 public void event(EventuallyConsistentMapEvent<GroupStoreKeyMapKey,
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700920 StoredGroupEntry> mapEvent) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700921 GroupEvent groupEvent = null;
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700922 GroupStoreKeyMapKey key = mapEvent.key();
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700923 StoredGroupEntry group = mapEvent.value();
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700924 if ((key == null) && (group == null)) {
925 log.error("GroupStoreKeyMapListener: Received "
926 + "event {} with null entry", mapEvent.type());
927 return;
928 } else if (group == null) {
929 group = getGroupIdTable(key.deviceId()).values()
930 .stream()
931 .filter((storedGroup) -> (storedGroup.appCookie().equals(key.appCookie)))
932 .findFirst().get();
933 if (group == null) {
934 log.error("GroupStoreKeyMapListener: Received "
935 + "event {} with null entry... can not process", mapEvent.type());
936 return;
937 }
938 }
939 log.trace("received groupid map event {} for id {} in device {}",
940 mapEvent.type(),
941 group.id(),
942 key.deviceId());
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700943 if (mapEvent.type() == EventuallyConsistentMapEvent.Type.PUT) {
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700944 // Update the group ID table
945 getGroupIdTable(group.deviceId()).put(group.id(), group);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700946 if (mapEvent.value().state() == Group.GroupState.ADDED) {
947 if (mapEvent.value().isGroupStateAddedFirstTime()) {
948 groupEvent = new GroupEvent(Type.GROUP_ADDED,
949 mapEvent.value());
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700950 log.trace("Received first time GROUP_ADDED state update for id {} in device {}",
951 group.id(),
952 group.deviceId());
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700953 } else {
954 groupEvent = new GroupEvent(Type.GROUP_UPDATED,
955 mapEvent.value());
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700956 log.trace("Received following GROUP_ADDED state update for id {} in device {}",
957 group.id(),
958 group.deviceId());
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700959 }
960 }
961 } else if (mapEvent.type() == EventuallyConsistentMapEvent.Type.REMOVE) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700962 groupEvent = new GroupEvent(Type.GROUP_REMOVED, group);
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700963 // Remove the entry from the group ID table
964 getGroupIdTable(group.deviceId()).remove(group.id(), group);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700965 }
966
967 if (groupEvent != null) {
968 notifyDelegate(groupEvent);
969 }
970 }
971 }
972 /**
973 * Message handler to receive messages from group subsystems of
974 * other cluster members.
975 */
976 private final class ClusterGroupMsgHandler
977 implements ClusterMessageHandler {
978 @Override
979 public void handle(ClusterMessage message) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700980 if (message.subject().equals(
981 GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST)) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700982 GroupStoreMessage groupOp = kryoBuilder.
983 build().deserialize(message.payload());
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700984 log.debug("received remote group operation {} request for device {}",
985 groupOp.type(),
986 groupOp.deviceId());
987 if (mastershipService.
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700988 getLocalRole(groupOp.deviceId()) !=
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700989 MastershipRole.MASTER) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700990 log.warn("ClusterGroupMsgHandler: This node is not "
991 + "MASTER for device {}", groupOp.deviceId());
992 return;
993 }
994 if (groupOp.type() == GroupStoreMessage.Type.ADD) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700995 storeGroupDescriptionInternal(groupOp.groupDesc());
996 } else if (groupOp.type() == GroupStoreMessage.Type.UPDATE) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700997 updateGroupDescriptionInternal(groupOp.deviceId(),
998 groupOp.appCookie(),
999 groupOp.updateType(),
1000 groupOp.updateBuckets(),
1001 groupOp.newAppCookie());
1002 } else if (groupOp.type() == GroupStoreMessage.Type.DELETE) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001003 deleteGroupDescriptionInternal(groupOp.deviceId(),
1004 groupOp.appCookie());
1005 }
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001006 } else {
1007 log.warn("ClusterGroupMsgHandler: Unknown remote message type {}",
1008 message.subject());
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001009 }
1010 }
1011 }
1012
1013 /**
1014 * Flattened map key to be used to store group entries.
1015 */
1016 private class GroupStoreMapKey {
1017 private final DeviceId deviceId;
1018
1019 public GroupStoreMapKey(DeviceId deviceId) {
1020 this.deviceId = deviceId;
1021 }
1022
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001023 public DeviceId deviceId() {
1024 return deviceId;
1025 }
1026
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001027 @Override
1028 public boolean equals(Object o) {
1029 if (this == o) {
1030 return true;
1031 }
1032 if (!(o instanceof GroupStoreMapKey)) {
1033 return false;
1034 }
1035 GroupStoreMapKey that = (GroupStoreMapKey) o;
1036 return this.deviceId.equals(that.deviceId);
1037 }
1038
1039 @Override
1040 public int hashCode() {
1041 int result = 17;
1042
1043 result = 31 * result + Objects.hash(this.deviceId);
1044
1045 return result;
1046 }
1047 }
1048
1049 private class GroupStoreKeyMapKey extends GroupStoreMapKey {
1050 private final GroupKey appCookie;
1051 public GroupStoreKeyMapKey(DeviceId deviceId,
1052 GroupKey appCookie) {
1053 super(deviceId);
1054 this.appCookie = appCookie;
1055 }
1056
1057 @Override
1058 public boolean equals(Object o) {
1059 if (this == o) {
1060 return true;
1061 }
1062 if (!(o instanceof GroupStoreKeyMapKey)) {
1063 return false;
1064 }
1065 GroupStoreKeyMapKey that = (GroupStoreKeyMapKey) o;
1066 return (super.equals(that) &&
1067 this.appCookie.equals(that.appCookie));
1068 }
1069
1070 @Override
1071 public int hashCode() {
1072 int result = 17;
1073
1074 result = 31 * result + super.hashCode() + Objects.hash(this.appCookie);
1075
1076 return result;
1077 }
1078 }
1079
1080 private class GroupStoreIdMapKey extends GroupStoreMapKey {
1081 private final GroupId groupId;
1082 public GroupStoreIdMapKey(DeviceId deviceId,
1083 GroupId groupId) {
1084 super(deviceId);
1085 this.groupId = groupId;
1086 }
1087
1088 @Override
1089 public boolean equals(Object o) {
1090 if (this == o) {
1091 return true;
1092 }
1093 if (!(o instanceof GroupStoreIdMapKey)) {
1094 return false;
1095 }
1096 GroupStoreIdMapKey that = (GroupStoreIdMapKey) o;
1097 return (super.equals(that) &&
1098 this.groupId.equals(that.groupId));
1099 }
1100
1101 @Override
1102 public int hashCode() {
1103 int result = 17;
1104
1105 result = 31 * result + super.hashCode() + Objects.hash(this.groupId);
1106
1107 return result;
1108 }
1109 }
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001110
1111 @Override
1112 public void pushGroupMetrics(DeviceId deviceId,
1113 Collection<Group> groupEntries) {
1114 boolean deviceInitialAuditStatus =
1115 deviceInitialAuditStatus(deviceId);
1116 Set<Group> southboundGroupEntries =
1117 Sets.newHashSet(groupEntries);
1118 Set<StoredGroupEntry> storedGroupEntries =
1119 Sets.newHashSet(getStoredGroups(deviceId));
1120 Set<Group> extraneousStoredEntries =
1121 Sets.newHashSet(getExtraneousGroups(deviceId));
1122
1123 log.trace("pushGroupMetrics: Displaying all ({}) southboundGroupEntries for device {}",
1124 southboundGroupEntries.size(),
1125 deviceId);
1126 for (Iterator<Group> it = southboundGroupEntries.iterator(); it.hasNext();) {
1127 Group group = it.next();
1128 log.trace("Group {} in device {}", group, deviceId);
1129 }
1130
1131 log.trace("Displaying all ({}) stored group entries for device {}",
1132 storedGroupEntries.size(),
1133 deviceId);
1134 for (Iterator<StoredGroupEntry> it1 = storedGroupEntries.iterator();
1135 it1.hasNext();) {
1136 Group group = it1.next();
1137 log.trace("Stored Group {} for device {}", group, deviceId);
1138 }
1139
1140 for (Iterator<Group> it2 = southboundGroupEntries.iterator(); it2.hasNext();) {
1141 Group group = it2.next();
1142 if (storedGroupEntries.remove(group)) {
1143 // we both have the group, let's update some info then.
1144 log.trace("Group AUDIT: group {} exists in both planes for device {}",
1145 group.id(), deviceId);
1146 groupAdded(group);
1147 it2.remove();
1148 }
1149 }
1150 for (Group group : southboundGroupEntries) {
1151 if (getGroup(group.deviceId(), group.id()) != null) {
1152 // There is a group existing with the same id
1153 // It is possible that group update is
1154 // in progress while we got a stale info from switch
1155 if (!storedGroupEntries.remove(getGroup(
1156 group.deviceId(), group.id()))) {
1157 log.warn("Group AUDIT: Inconsistent state:"
1158 + "Group exists in ID based table while "
1159 + "not present in key based table");
1160 }
1161 } else {
1162 // there are groups in the switch that aren't in the store
1163 log.debug("Group AUDIT: extraneous group {} exists in data plane for device {}",
1164 group.id(), deviceId);
1165 extraneousStoredEntries.remove(group);
1166 extraneousGroup(group);
1167 }
1168 }
1169 for (Group group : storedGroupEntries) {
1170 // there are groups in the store that aren't in the switch
1171 log.debug("Group AUDIT: group {} missing in data plane for device {}",
1172 group.id(), deviceId);
1173 groupMissing(group);
1174 }
1175 for (Group group : extraneousStoredEntries) {
1176 // there are groups in the extraneous store that
1177 // aren't in the switch
1178 log.debug("Group AUDIT: clearing extransoeus group {} from store for device {}",
1179 group.id(), deviceId);
1180 removeExtraneousGroupEntry(group);
1181 }
1182
1183 if (!deviceInitialAuditStatus) {
1184 log.debug("Group AUDIT: Setting device {} initial AUDIT completed",
1185 deviceId);
1186 deviceInitialAuditCompleted(deviceId, true);
1187 }
1188 }
1189
1190 private void groupMissing(Group group) {
1191 switch (group.state()) {
1192 case PENDING_DELETE:
1193 log.debug("Group {} delete confirmation from device {}",
1194 group, group.deviceId());
1195 removeGroupEntry(group);
1196 break;
1197 case ADDED:
1198 case PENDING_ADD:
1199 case PENDING_UPDATE:
1200 log.debug("Group {} is in store but not on device {}",
1201 group, group.deviceId());
1202 StoredGroupEntry existing =
1203 getStoredGroupEntry(group.deviceId(), group.id());
1204 log.debug("groupMissing: group entry {} in device {} moving from {} to PENDING_ADD",
1205 existing.id(),
1206 existing.deviceId(),
1207 existing.state());
1208 existing.setState(Group.GroupState.PENDING_ADD);
1209 //Re-PUT map entries to trigger map update events
1210 getGroupStoreKeyMap().
1211 put(new GroupStoreKeyMapKey(existing.deviceId(),
1212 existing.appCookie()), existing);
1213 notifyDelegate(new GroupEvent(GroupEvent.Type.GROUP_ADD_REQUESTED,
1214 group));
1215 break;
1216 default:
1217 log.debug("Group {} has not been installed.", group);
1218 break;
1219 }
1220 }
1221
1222 private void extraneousGroup(Group group) {
1223 log.debug("Group {} is on device {} but not in store.",
1224 group, group.deviceId());
1225 addOrUpdateExtraneousGroupEntry(group);
1226 }
1227
1228 private void groupAdded(Group group) {
1229 log.trace("Group {} Added or Updated in device {}",
1230 group, group.deviceId());
1231 addOrUpdateGroupEntry(group);
1232 }
alshabib10580802015-02-18 18:30:33 -08001233}