blob: cd42b171ddd680b3b123713c01334bf4330e7b1a [file] [log] [blame]
alshabib10580802015-02-18 18:30:33 -08001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.group.impl;
17
Jonathan Hart6ec029a2015-03-24 17:12:35 -070018import com.google.common.collect.FluentIterable;
19import com.google.common.collect.Iterables;
Srikanth Vavilapalli23181912015-05-04 09:48:09 -070020import com.google.common.collect.Sets;
Srikanth Vavilapalli10e75cd2015-04-13 16:21:24 -070021
alshabib10580802015-02-18 18:30:33 -080022import org.apache.felix.scr.annotations.Activate;
23import org.apache.felix.scr.annotations.Component;
24import org.apache.felix.scr.annotations.Deactivate;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070025import org.apache.felix.scr.annotations.Reference;
26import org.apache.felix.scr.annotations.ReferenceCardinality;
alshabib10580802015-02-18 18:30:33 -080027import org.apache.felix.scr.annotations.Service;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070028import org.onlab.util.KryoNamespace;
alshabib10580802015-02-18 18:30:33 -080029import org.onlab.util.NewConcurrentHashMap;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070030import org.onosproject.cluster.ClusterService;
31import org.onosproject.core.DefaultApplicationId;
alshabib10580802015-02-18 18:30:33 -080032import org.onosproject.core.DefaultGroupId;
33import org.onosproject.core.GroupId;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070034import org.onosproject.mastership.MastershipService;
alshabib10580802015-02-18 18:30:33 -080035import org.onosproject.net.DeviceId;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070036import org.onosproject.net.MastershipRole;
37import org.onosproject.net.PortNumber;
38import org.onosproject.net.flow.DefaultTrafficTreatment;
39import org.onosproject.net.flow.FlowRule;
40import org.onosproject.net.flow.instructions.Instructions;
41import org.onosproject.net.flow.instructions.L0ModificationInstruction;
42import org.onosproject.net.flow.instructions.L2ModificationInstruction;
43import org.onosproject.net.flow.instructions.L3ModificationInstruction;
alshabib10580802015-02-18 18:30:33 -080044import org.onosproject.net.group.DefaultGroup;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070045import org.onosproject.net.group.DefaultGroupBucket;
alshabib10580802015-02-18 18:30:33 -080046import org.onosproject.net.group.DefaultGroupDescription;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070047import org.onosproject.net.group.DefaultGroupKey;
alshabib10580802015-02-18 18:30:33 -080048import org.onosproject.net.group.Group;
49import org.onosproject.net.group.Group.GroupState;
50import org.onosproject.net.group.GroupBucket;
51import org.onosproject.net.group.GroupBuckets;
52import org.onosproject.net.group.GroupDescription;
53import org.onosproject.net.group.GroupEvent;
54import org.onosproject.net.group.GroupEvent.Type;
55import org.onosproject.net.group.GroupKey;
56import org.onosproject.net.group.GroupOperation;
57import org.onosproject.net.group.GroupStore;
58import org.onosproject.net.group.GroupStoreDelegate;
Srikanth Vavilapalli10e75cd2015-04-13 16:21:24 -070059import org.onosproject.net.group.StoredGroupBucketEntry;
alshabib10580802015-02-18 18:30:33 -080060import org.onosproject.net.group.StoredGroupEntry;
61import org.onosproject.store.AbstractStore;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070062import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
63import org.onosproject.store.cluster.messaging.ClusterMessage;
64import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
Jonathan Hart63939a32015-05-08 11:57:03 -070065import org.onosproject.store.service.MultiValuedTimestamp;
Srikanth Vavilapalli23181912015-05-04 09:48:09 -070066import org.onosproject.store.serializers.DeviceIdSerializer;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070067import org.onosproject.store.serializers.KryoNamespaces;
Srikanth Vavilapalli23181912015-05-04 09:48:09 -070068import org.onosproject.store.serializers.URISerializer;
Jonathan Hart6ec029a2015-03-24 17:12:35 -070069import org.onosproject.store.service.EventuallyConsistentMap;
70import org.onosproject.store.service.EventuallyConsistentMapBuilder;
71import org.onosproject.store.service.EventuallyConsistentMapEvent;
72import org.onosproject.store.service.EventuallyConsistentMapListener;
73import org.onosproject.store.service.StorageService;
alshabib10580802015-02-18 18:30:33 -080074import org.slf4j.Logger;
75
Jonathan Hart6ec029a2015-03-24 17:12:35 -070076import java.net.URI;
77import java.util.ArrayList;
Srikanth Vavilapalli23181912015-05-04 09:48:09 -070078import java.util.Collection;
Jonathan Hart6ec029a2015-03-24 17:12:35 -070079import java.util.HashMap;
Srikanth Vavilapalli23181912015-05-04 09:48:09 -070080import java.util.Iterator;
Jonathan Hart6ec029a2015-03-24 17:12:35 -070081import java.util.List;
82import java.util.Objects;
Sho SHIMIZU30d639b2015-05-05 09:30:35 -070083import java.util.Optional;
Srikanth Vavilapalli23181912015-05-04 09:48:09 -070084import java.util.Set;
Jonathan Hart6ec029a2015-03-24 17:12:35 -070085import java.util.concurrent.ConcurrentHashMap;
86import java.util.concurrent.ConcurrentMap;
87import java.util.concurrent.ExecutorService;
88import java.util.concurrent.Executors;
89import java.util.concurrent.atomic.AtomicInteger;
90import java.util.concurrent.atomic.AtomicLong;
91import java.util.stream.Collectors;
92
93import static org.apache.commons.lang3.concurrent.ConcurrentUtils.createIfAbsentUnchecked;
94import static org.onlab.util.Tools.groupedThreads;
95import static org.slf4j.LoggerFactory.getLogger;
alshabib10580802015-02-18 18:30:33 -080096
97/**
98 * Manages inventory of group entries using trivial in-memory implementation.
99 */
100@Component(immediate = true)
101@Service
102public class DistributedGroupStore
103 extends AbstractStore<GroupEvent, GroupStoreDelegate>
104 implements GroupStore {
105
106 private final Logger log = getLogger(getClass());
107
108 private final int dummyId = 0xffffffff;
109 private final GroupId dummyGroupId = new DefaultGroupId(dummyId);
110
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700111 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
112 protected ClusterCommunicationService clusterCommunicator;
113
114 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
115 protected ClusterService clusterService;
116
117 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700118 protected StorageService storageService;
119
120 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700121 protected MastershipService mastershipService;
122
123 // Per device group table with (device id + app cookie) as key
124 private EventuallyConsistentMap<GroupStoreKeyMapKey,
125 StoredGroupEntry> groupStoreEntriesByKey = null;
126 // Per device group table with (device id + group id) as key
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700127 private final ConcurrentMap<DeviceId, ConcurrentMap<GroupId, StoredGroupEntry>>
128 groupEntriesById = new ConcurrentHashMap<>();
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700129 private EventuallyConsistentMap<GroupStoreKeyMapKey,
130 StoredGroupEntry> auditPendingReqQueue = null;
alshabib10580802015-02-18 18:30:33 -0800131 private final ConcurrentMap<DeviceId, ConcurrentMap<GroupId, Group>>
132 extraneousGroupEntriesById = new ConcurrentHashMap<>();
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700133 private ExecutorService messageHandlingExecutor;
134 private static final int MESSAGE_HANDLER_THREAD_POOL_SIZE = 1;
alshabib10580802015-02-18 18:30:33 -0800135
136 private final HashMap<DeviceId, Boolean> deviceAuditStatus =
137 new HashMap<DeviceId, Boolean>();
138
139 private final AtomicInteger groupIdGen = new AtomicInteger();
140
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700141 private KryoNamespace.Builder kryoBuilder = null;
142
Madan Jampanibcf1a482015-06-24 19:05:56 -0700143 private final AtomicLong sequenceNumber = new AtomicLong(0);
144
alshabib10580802015-02-18 18:30:33 -0800145 @Activate
146 public void activate() {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700147 kryoBuilder = new KryoNamespace.Builder()
148 .register(DefaultGroup.class,
149 DefaultGroupBucket.class,
150 DefaultGroupDescription.class,
151 DefaultGroupKey.class,
152 GroupDescription.Type.class,
153 Group.GroupState.class,
154 GroupBuckets.class,
155 DefaultGroupId.class,
156 GroupStoreMessage.class,
157 GroupStoreMessage.Type.class,
158 UpdateType.class,
159 GroupStoreMessageSubjects.class,
160 MultiValuedTimestamp.class,
161 GroupStoreKeyMapKey.class,
162 GroupStoreIdMapKey.class,
163 GroupStoreMapKey.class
164 )
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700165 .register(new URISerializer(), URI.class)
166 .register(new DeviceIdSerializer(), DeviceId.class)
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700167 .register(PortNumber.class)
168 .register(DefaultApplicationId.class)
169 .register(DefaultTrafficTreatment.class,
170 Instructions.DropInstruction.class,
171 Instructions.OutputInstruction.class,
172 Instructions.GroupInstruction.class,
173 Instructions.TableTypeTransition.class,
174 FlowRule.Type.class,
175 L0ModificationInstruction.class,
176 L0ModificationInstruction.L0SubType.class,
177 L0ModificationInstruction.ModLambdaInstruction.class,
178 L2ModificationInstruction.class,
179 L2ModificationInstruction.L2SubType.class,
180 L2ModificationInstruction.ModEtherInstruction.class,
181 L2ModificationInstruction.PushHeaderInstructions.class,
182 L2ModificationInstruction.ModVlanIdInstruction.class,
183 L2ModificationInstruction.ModVlanPcpInstruction.class,
184 L2ModificationInstruction.ModMplsLabelInstruction.class,
185 L2ModificationInstruction.ModMplsTtlInstruction.class,
186 L3ModificationInstruction.class,
187 L3ModificationInstruction.L3SubType.class,
188 L3ModificationInstruction.ModIPInstruction.class,
189 L3ModificationInstruction.ModIPv6FlowLabelInstruction.class,
190 L3ModificationInstruction.ModTtlInstruction.class,
191 org.onlab.packet.MplsLabel.class
192 )
193 .register(org.onosproject.cluster.NodeId.class)
194 .register(KryoNamespaces.BASIC)
195 .register(KryoNamespaces.MISC);
196
197 messageHandlingExecutor = Executors.
198 newFixedThreadPool(MESSAGE_HANDLER_THREAD_POOL_SIZE,
199 groupedThreads("onos/store/group",
200 "message-handlers"));
201 clusterCommunicator.
202 addSubscriber(GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST,
203 new ClusterGroupMsgHandler(),
204 messageHandlingExecutor);
205
206 log.debug("Creating EC map groupstorekeymap");
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700207 EventuallyConsistentMapBuilder<GroupStoreKeyMapKey, StoredGroupEntry>
208 keyMapBuilder = storageService.eventuallyConsistentMapBuilder();
209
210 groupStoreEntriesByKey = keyMapBuilder
211 .withName("groupstorekeymap")
212 .withSerializer(kryoBuilder)
Madan Jampanibcf1a482015-06-24 19:05:56 -0700213 .withTimestampProvider((k, v) -> new MultiValuedTimestamp<>(System.currentTimeMillis(),
214 sequenceNumber.getAndIncrement()))
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700215 .build();
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700216 groupStoreEntriesByKey.addListener(new GroupStoreKeyMapListener());
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700217 log.debug("Current size of groupstorekeymap:{}",
218 groupStoreEntriesByKey.size());
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700219
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700220 log.debug("Creating EC map pendinggroupkeymap");
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700221 EventuallyConsistentMapBuilder<GroupStoreKeyMapKey, StoredGroupEntry>
222 auditMapBuilder = storageService.eventuallyConsistentMapBuilder();
223
224 auditPendingReqQueue = auditMapBuilder
225 .withName("pendinggroupkeymap")
226 .withSerializer(kryoBuilder)
Madan Jampanibcf1a482015-06-24 19:05:56 -0700227 .withTimestampProvider((k, v) -> new MultiValuedTimestamp<>(System.currentTimeMillis(),
228 sequenceNumber.getAndIncrement()))
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700229 .build();
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700230 log.debug("Current size of pendinggroupkeymap:{}",
231 auditPendingReqQueue.size());
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700232
alshabib10580802015-02-18 18:30:33 -0800233 log.info("Started");
234 }
235
236 @Deactivate
237 public void deactivate() {
Thomas Vachuska152f9fd2015-04-02 16:28:13 -0700238 groupStoreEntriesByKey.destroy();
Thomas Vachuska152f9fd2015-04-02 16:28:13 -0700239 auditPendingReqQueue.destroy();
alshabib10580802015-02-18 18:30:33 -0800240 log.info("Stopped");
241 }
242
alshabib10580802015-02-18 18:30:33 -0800243 private static NewConcurrentHashMap<GroupId, Group>
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700244 lazyEmptyExtraneousGroupIdTable() {
alshabib10580802015-02-18 18:30:33 -0800245 return NewConcurrentHashMap.<GroupId, Group>ifNeeded();
246 }
247
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700248 private static NewConcurrentHashMap<GroupId, StoredGroupEntry>
249 lazyEmptyGroupIdTable() {
250 return NewConcurrentHashMap.<GroupId, StoredGroupEntry>ifNeeded();
251 }
252
alshabib10580802015-02-18 18:30:33 -0800253 /**
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700254 * Returns the group store eventual consistent key map.
alshabib10580802015-02-18 18:30:33 -0800255 *
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700256 * @return Map representing group key table.
alshabib10580802015-02-18 18:30:33 -0800257 */
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700258 private EventuallyConsistentMap<GroupStoreKeyMapKey, StoredGroupEntry>
259 getGroupStoreKeyMap() {
260 return groupStoreEntriesByKey;
alshabib10580802015-02-18 18:30:33 -0800261 }
262
263 /**
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700264 * Returns the group id table for specified device.
alshabib10580802015-02-18 18:30:33 -0800265 *
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700266 * @param deviceId identifier of the device
267 * @return Map representing group key table of given device.
alshabib10580802015-02-18 18:30:33 -0800268 */
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700269 private ConcurrentMap<GroupId, StoredGroupEntry> getGroupIdTable(DeviceId deviceId) {
270 return createIfAbsentUnchecked(groupEntriesById,
271 deviceId, lazyEmptyGroupIdTable());
alshabib10580802015-02-18 18:30:33 -0800272 }
273
274 /**
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700275 * Returns the pending group request table.
alshabib10580802015-02-18 18:30:33 -0800276 *
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700277 * @return Map representing group key table.
alshabib10580802015-02-18 18:30:33 -0800278 */
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700279 private EventuallyConsistentMap<GroupStoreKeyMapKey, StoredGroupEntry>
280 getPendingGroupKeyTable() {
281 return auditPendingReqQueue;
alshabib10580802015-02-18 18:30:33 -0800282 }
283
284 /**
285 * Returns the extraneous group id table for specified device.
286 *
287 * @param deviceId identifier of the device
288 * @return Map representing group key table of given device.
289 */
290 private ConcurrentMap<GroupId, Group>
291 getExtraneousGroupIdTable(DeviceId deviceId) {
292 return createIfAbsentUnchecked(extraneousGroupEntriesById,
293 deviceId,
294 lazyEmptyExtraneousGroupIdTable());
295 }
296
297 /**
298 * Returns the number of groups for the specified device in the store.
299 *
300 * @return number of groups for the specified device
301 */
302 @Override
303 public int getGroupCount(DeviceId deviceId) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700304 return (getGroups(deviceId) != null) ?
305 Iterables.size(getGroups(deviceId)) : 0;
alshabib10580802015-02-18 18:30:33 -0800306 }
307
308 /**
309 * Returns the groups associated with a device.
310 *
311 * @param deviceId the device ID
312 *
313 * @return the group entries
314 */
315 @Override
316 public Iterable<Group> getGroups(DeviceId deviceId) {
317 // flatten and make iterator unmodifiable
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700318 log.debug("getGroups: for device {} total number of groups {}",
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700319 deviceId, getGroupStoreKeyMap().values().size());
320 return FluentIterable.from(getGroupStoreKeyMap().values())
321 .filter(input -> input.deviceId().equals(deviceId))
322 .transform(input -> input);
alshabib10580802015-02-18 18:30:33 -0800323 }
324
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700325 private Iterable<StoredGroupEntry> getStoredGroups(DeviceId deviceId) {
326 // flatten and make iterator unmodifiable
327 log.debug("getGroups: for device {} total number of groups {}",
328 deviceId, getGroupStoreKeyMap().values().size());
329 return FluentIterable.from(getGroupStoreKeyMap().values())
330 .filter(input -> input.deviceId().equals(deviceId));
331 }
332
alshabib10580802015-02-18 18:30:33 -0800333 /**
334 * Returns the stored group entry.
335 *
336 * @param deviceId the device ID
337 * @param appCookie the group key
338 *
339 * @return a group associated with the key
340 */
341 @Override
342 public Group getGroup(DeviceId deviceId, GroupKey appCookie) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700343 return getStoredGroupEntry(deviceId, appCookie);
344 }
345
346 private StoredGroupEntry getStoredGroupEntry(DeviceId deviceId,
347 GroupKey appCookie) {
348 return getGroupStoreKeyMap().get(new GroupStoreKeyMapKey(deviceId,
349 appCookie));
350 }
351
352 @Override
353 public Group getGroup(DeviceId deviceId, GroupId groupId) {
354 return getStoredGroupEntry(deviceId, groupId);
355 }
356
357 private StoredGroupEntry getStoredGroupEntry(DeviceId deviceId,
358 GroupId groupId) {
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700359 return getGroupIdTable(deviceId).get(groupId);
alshabib10580802015-02-18 18:30:33 -0800360 }
361
362 private int getFreeGroupIdValue(DeviceId deviceId) {
363 int freeId = groupIdGen.incrementAndGet();
364
365 while (true) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700366 Group existing = getGroup(deviceId, new DefaultGroupId(freeId));
alshabib10580802015-02-18 18:30:33 -0800367 if (existing == null) {
368 existing = (
369 extraneousGroupEntriesById.get(deviceId) != null) ?
370 extraneousGroupEntriesById.get(deviceId).
371 get(new DefaultGroupId(freeId)) :
372 null;
373 }
374 if (existing != null) {
375 freeId = groupIdGen.incrementAndGet();
376 } else {
377 break;
378 }
379 }
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700380 log.debug("getFreeGroupIdValue: Next Free ID is {}", freeId);
alshabib10580802015-02-18 18:30:33 -0800381 return freeId;
382 }
383
384 /**
385 * Stores a new group entry using the information from group description.
386 *
387 * @param groupDesc group description to be used to create group entry
388 */
389 @Override
390 public void storeGroupDescription(GroupDescription groupDesc) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700391 log.debug("In storeGroupDescription");
alshabib10580802015-02-18 18:30:33 -0800392 // Check if a group is existing with the same key
393 if (getGroup(groupDesc.deviceId(), groupDesc.appCookie()) != null) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700394 log.warn("Group already exists with the same key {}",
395 groupDesc.appCookie());
alshabib10580802015-02-18 18:30:33 -0800396 return;
397 }
398
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700399 // Check if group to be created by a remote instance
Madan Jampani175e8fd2015-05-20 14:10:45 -0700400 if (mastershipService.getLocalRole(groupDesc.deviceId()) != MastershipRole.MASTER) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700401 log.debug("storeGroupDescription: Device {} local role is not MASTER",
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700402 groupDesc.deviceId());
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700403 if (mastershipService.getMasterFor(groupDesc.deviceId()) == null) {
404 log.error("No Master for device {}..."
405 + "Can not perform add group operation",
406 groupDesc.deviceId());
407 //TODO: Send Group operation failure event
408 return;
409 }
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700410 GroupStoreMessage groupOp = GroupStoreMessage.
411 createGroupAddRequestMsg(groupDesc.deviceId(),
412 groupDesc);
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700413
Madan Jampani175e8fd2015-05-20 14:10:45 -0700414 clusterCommunicator.unicast(groupOp,
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700415 GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST,
416 m -> kryoBuilder.build().serialize(m),
Madan Jampani175e8fd2015-05-20 14:10:45 -0700417 mastershipService.getMasterFor(groupDesc.deviceId())).whenComplete((result, error) -> {
418 if (error != null) {
419 log.warn("Failed to send request to master: {} to {}",
420 groupOp,
421 mastershipService.getMasterFor(groupDesc.deviceId()));
422 //TODO: Send Group operation failure event
423 } else {
424 log.debug("Sent Group operation request for device {} "
425 + "to remote MASTER {}",
426 groupDesc.deviceId(),
427 mastershipService.getMasterFor(groupDesc.deviceId()));
428 }
429 });
alshabib10580802015-02-18 18:30:33 -0800430 return;
431 }
432
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700433 log.debug("Store group for device {} is getting handled locally",
434 groupDesc.deviceId());
alshabib10580802015-02-18 18:30:33 -0800435 storeGroupDescriptionInternal(groupDesc);
436 }
437
438 private void storeGroupDescriptionInternal(GroupDescription groupDesc) {
439 // Check if a group is existing with the same key
440 if (getGroup(groupDesc.deviceId(), groupDesc.appCookie()) != null) {
441 return;
442 }
443
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700444 if (deviceAuditStatus.get(groupDesc.deviceId()) == null) {
445 // Device group audit has not completed yet
446 // Add this group description to pending group key table
447 // Create a group entry object with Dummy Group ID
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700448 log.debug("storeGroupDescriptionInternal: Device {} AUDIT pending...Queuing Group ADD request",
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700449 groupDesc.deviceId());
450 StoredGroupEntry group = new DefaultGroup(dummyGroupId, groupDesc);
451 group.setState(GroupState.WAITING_AUDIT_COMPLETE);
452 EventuallyConsistentMap<GroupStoreKeyMapKey, StoredGroupEntry> pendingKeyTable =
453 getPendingGroupKeyTable();
454 pendingKeyTable.put(new GroupStoreKeyMapKey(groupDesc.deviceId(),
455 groupDesc.appCookie()),
456 group);
457 return;
458 }
459
Saurav Das100e3b82015-04-30 11:12:10 -0700460 GroupId id = null;
461 if (groupDesc.givenGroupId() == null) {
462 // Get a new group identifier
463 id = new DefaultGroupId(getFreeGroupIdValue(groupDesc.deviceId()));
464 } else {
465 id = new DefaultGroupId(groupDesc.givenGroupId());
466 }
alshabib10580802015-02-18 18:30:33 -0800467 // Create a group entry object
468 StoredGroupEntry group = new DefaultGroup(id, groupDesc);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700469 // Insert the newly created group entry into key and id maps
470 getGroupStoreKeyMap().
471 put(new GroupStoreKeyMapKey(groupDesc.deviceId(),
472 groupDesc.appCookie()), group);
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700473 // Ensure it also inserted into group id based table to
474 // avoid any chances of duplication in group id generation
475 getGroupIdTable(groupDesc.deviceId()).
476 put(id, group);
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700477 log.debug("storeGroupDescriptionInternal: Processing Group ADD request for Id {} in device {}",
478 id,
479 groupDesc.deviceId());
alshabib10580802015-02-18 18:30:33 -0800480 notifyDelegate(new GroupEvent(GroupEvent.Type.GROUP_ADD_REQUESTED,
481 group));
482 }
483
484 /**
485 * Updates the existing group entry with the information
486 * from group description.
487 *
488 * @param deviceId the device ID
489 * @param oldAppCookie the current group key
490 * @param type update type
491 * @param newBuckets group buckets for updates
492 * @param newAppCookie optional new group key
493 */
494 @Override
495 public void updateGroupDescription(DeviceId deviceId,
496 GroupKey oldAppCookie,
497 UpdateType type,
498 GroupBuckets newBuckets,
499 GroupKey newAppCookie) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700500 // Check if group update to be done by a remote instance
sangho52abe3a2015-05-05 14:13:34 -0700501 if (mastershipService.getMasterFor(deviceId) != null &&
502 mastershipService.getLocalRole(deviceId) != MastershipRole.MASTER) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700503 log.debug("updateGroupDescription: Device {} local role is not MASTER",
504 deviceId);
505 if (mastershipService.getMasterFor(deviceId) == null) {
506 log.error("No Master for device {}..."
507 + "Can not perform update group operation",
508 deviceId);
509 //TODO: Send Group operation failure event
510 return;
511 }
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700512 GroupStoreMessage groupOp = GroupStoreMessage.
513 createGroupUpdateRequestMsg(deviceId,
514 oldAppCookie,
515 type,
516 newBuckets,
517 newAppCookie);
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700518
Madan Jampani175e8fd2015-05-20 14:10:45 -0700519 clusterCommunicator.unicast(groupOp,
520 GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST,
521 m -> kryoBuilder.build().serialize(m),
522 mastershipService.getMasterFor(deviceId)).whenComplete((result, error) -> {
523 if (error != null) {
524 log.warn("Failed to send request to master: {} to {}",
525 groupOp,
526 mastershipService.getMasterFor(deviceId), error);
527 }
528 //TODO: Send Group operation failure event
529 });
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700530 return;
531 }
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700532 log.debug("updateGroupDescription for device {} is getting handled locally",
533 deviceId);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700534 updateGroupDescriptionInternal(deviceId,
535 oldAppCookie,
536 type,
537 newBuckets,
538 newAppCookie);
539 }
540
541 private void updateGroupDescriptionInternal(DeviceId deviceId,
542 GroupKey oldAppCookie,
543 UpdateType type,
544 GroupBuckets newBuckets,
545 GroupKey newAppCookie) {
alshabib10580802015-02-18 18:30:33 -0800546 // Check if a group is existing with the provided key
547 Group oldGroup = getGroup(deviceId, oldAppCookie);
548 if (oldGroup == null) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700549 log.warn("updateGroupDescriptionInternal: Group not found...strange");
alshabib10580802015-02-18 18:30:33 -0800550 return;
551 }
552
553 List<GroupBucket> newBucketList = getUpdatedBucketList(oldGroup,
554 type,
555 newBuckets);
556 if (newBucketList != null) {
557 // Create a new group object from the old group
558 GroupBuckets updatedBuckets = new GroupBuckets(newBucketList);
559 GroupKey newCookie = (newAppCookie != null) ? newAppCookie : oldAppCookie;
560 GroupDescription updatedGroupDesc = new DefaultGroupDescription(
561 oldGroup.deviceId(),
562 oldGroup.type(),
563 updatedBuckets,
564 newCookie,
Saurav Das100e3b82015-04-30 11:12:10 -0700565 oldGroup.givenGroupId(),
alshabib10580802015-02-18 18:30:33 -0800566 oldGroup.appId());
567 StoredGroupEntry newGroup = new DefaultGroup(oldGroup.id(),
568 updatedGroupDesc);
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700569 log.debug("updateGroupDescriptionInternal: group entry {} in device {} moving from {} to PENDING_UPDATE",
570 oldGroup.id(),
571 oldGroup.deviceId(),
572 oldGroup.state());
alshabib10580802015-02-18 18:30:33 -0800573 newGroup.setState(GroupState.PENDING_UPDATE);
574 newGroup.setLife(oldGroup.life());
575 newGroup.setPackets(oldGroup.packets());
576 newGroup.setBytes(oldGroup.bytes());
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700577 //Update the group entry in groupkey based map.
578 //Update to groupid based map will happen in the
579 //groupkey based map update listener
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700580 log.debug("updateGroupDescriptionInternal with type {}: Group updated with buckets",
581 type);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700582 getGroupStoreKeyMap().
583 put(new GroupStoreKeyMapKey(newGroup.deviceId(),
584 newGroup.appCookie()), newGroup);
alshabib10580802015-02-18 18:30:33 -0800585 notifyDelegate(new GroupEvent(Type.GROUP_UPDATE_REQUESTED, newGroup));
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700586 } else {
587 log.warn("updateGroupDescriptionInternal with type {}: No "
588 + "change in the buckets in update", type);
alshabib10580802015-02-18 18:30:33 -0800589 }
590 }
591
592 private List<GroupBucket> getUpdatedBucketList(Group oldGroup,
593 UpdateType type,
594 GroupBuckets buckets) {
595 GroupBuckets oldBuckets = oldGroup.buckets();
596 List<GroupBucket> newBucketList = new ArrayList<GroupBucket>(
597 oldBuckets.buckets());
598 boolean groupDescUpdated = false;
599
600 if (type == UpdateType.ADD) {
601 // Check if the any of the new buckets are part of
602 // the old bucket list
603 for (GroupBucket addBucket:buckets.buckets()) {
604 if (!newBucketList.contains(addBucket)) {
605 newBucketList.add(addBucket);
606 groupDescUpdated = true;
607 }
608 }
609 } else if (type == UpdateType.REMOVE) {
610 // Check if the to be removed buckets are part of the
611 // old bucket list
612 for (GroupBucket removeBucket:buckets.buckets()) {
613 if (newBucketList.contains(removeBucket)) {
614 newBucketList.remove(removeBucket);
615 groupDescUpdated = true;
616 }
617 }
618 }
619
620 if (groupDescUpdated) {
621 return newBucketList;
622 } else {
623 return null;
624 }
625 }
626
627 /**
628 * Triggers deleting the existing group entry.
629 *
630 * @param deviceId the device ID
631 * @param appCookie the group key
632 */
633 @Override
634 public void deleteGroupDescription(DeviceId deviceId,
635 GroupKey appCookie) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700636 // Check if group to be deleted by a remote instance
637 if (mastershipService.
638 getLocalRole(deviceId) != MastershipRole.MASTER) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700639 log.debug("deleteGroupDescription: Device {} local role is not MASTER",
640 deviceId);
641 if (mastershipService.getMasterFor(deviceId) == null) {
642 log.error("No Master for device {}..."
643 + "Can not perform delete group operation",
644 deviceId);
645 //TODO: Send Group operation failure event
646 return;
647 }
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700648 GroupStoreMessage groupOp = GroupStoreMessage.
649 createGroupDeleteRequestMsg(deviceId,
650 appCookie);
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700651
Madan Jampani175e8fd2015-05-20 14:10:45 -0700652 clusterCommunicator.unicast(groupOp,
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700653 GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST,
654 m -> kryoBuilder.build().serialize(m),
Madan Jampani175e8fd2015-05-20 14:10:45 -0700655 mastershipService.getMasterFor(deviceId)).whenComplete((result, error) -> {
656 if (error != null) {
657 log.warn("Failed to send request to master: {} to {}",
658 groupOp,
659 mastershipService.getMasterFor(deviceId), error);
660 }
661 //TODO: Send Group operation failure event
662 });
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700663 return;
664 }
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700665 log.debug("deleteGroupDescription in device {} is getting handled locally",
666 deviceId);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700667 deleteGroupDescriptionInternal(deviceId, appCookie);
668 }
669
670 private void deleteGroupDescriptionInternal(DeviceId deviceId,
671 GroupKey appCookie) {
alshabib10580802015-02-18 18:30:33 -0800672 // Check if a group is existing with the provided key
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700673 StoredGroupEntry existing = getStoredGroupEntry(deviceId, appCookie);
alshabib10580802015-02-18 18:30:33 -0800674 if (existing == null) {
675 return;
676 }
677
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700678 log.debug("deleteGroupDescriptionInternal: group entry {} in device {} moving from {} to PENDING_DELETE",
679 existing.id(),
680 existing.deviceId(),
681 existing.state());
alshabib10580802015-02-18 18:30:33 -0800682 synchronized (existing) {
683 existing.setState(GroupState.PENDING_DELETE);
684 }
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700685 log.debug("deleteGroupDescriptionInternal: in device {} issuing GROUP_REMOVE_REQUESTED",
686 deviceId);
alshabib10580802015-02-18 18:30:33 -0800687 notifyDelegate(new GroupEvent(Type.GROUP_REMOVE_REQUESTED, existing));
688 }
689
690 /**
691 * Stores a new group entry, or updates an existing entry.
692 *
693 * @param group group entry
694 */
695 @Override
696 public void addOrUpdateGroupEntry(Group group) {
697 // check if this new entry is an update to an existing entry
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700698 StoredGroupEntry existing = getStoredGroupEntry(group.deviceId(),
699 group.id());
alshabib10580802015-02-18 18:30:33 -0800700 GroupEvent event = null;
701
702 if (existing != null) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700703 log.debug("addOrUpdateGroupEntry: updating group entry {} in device {}",
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700704 group.id(),
705 group.deviceId());
alshabib10580802015-02-18 18:30:33 -0800706 synchronized (existing) {
Srikanth Vavilapalli10e75cd2015-04-13 16:21:24 -0700707 for (GroupBucket bucket:group.buckets().buckets()) {
Sho SHIMIZU30d639b2015-05-05 09:30:35 -0700708 Optional<GroupBucket> matchingBucket =
Srikanth Vavilapalli10e75cd2015-04-13 16:21:24 -0700709 existing.buckets().buckets()
710 .stream()
711 .filter((existingBucket)->(existingBucket.equals(bucket)))
712 .findFirst();
713 if (matchingBucket.isPresent()) {
714 ((StoredGroupBucketEntry) matchingBucket.
715 get()).setPackets(bucket.packets());
716 ((StoredGroupBucketEntry) matchingBucket.
717 get()).setBytes(bucket.bytes());
718 } else {
719 log.warn("addOrUpdateGroupEntry: No matching "
720 + "buckets to update stats");
721 }
722 }
alshabib10580802015-02-18 18:30:33 -0800723 existing.setLife(group.life());
724 existing.setPackets(group.packets());
725 existing.setBytes(group.bytes());
Srikanth Vavilapalli5428b6c2015-05-14 20:22:47 -0700726 if ((existing.state() == GroupState.PENDING_ADD) ||
727 (existing.state() == GroupState.PENDING_ADD_RETRY)) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700728 log.debug("addOrUpdateGroupEntry: group entry {} in device {} moving from {} to ADDED",
729 existing.id(),
730 existing.deviceId(),
Srikanth Vavilapalli5428b6c2015-05-14 20:22:47 -0700731 existing.state());
alshabib10580802015-02-18 18:30:33 -0800732 existing.setState(GroupState.ADDED);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700733 existing.setIsGroupStateAddedFirstTime(true);
alshabib10580802015-02-18 18:30:33 -0800734 event = new GroupEvent(Type.GROUP_ADDED, existing);
735 } else {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700736 log.debug("addOrUpdateGroupEntry: group entry {} in device {} moving from {} to ADDED",
737 existing.id(),
738 existing.deviceId(),
739 GroupState.PENDING_UPDATE);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700740 existing.setState(GroupState.ADDED);
741 existing.setIsGroupStateAddedFirstTime(false);
alshabib10580802015-02-18 18:30:33 -0800742 event = new GroupEvent(Type.GROUP_UPDATED, existing);
743 }
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700744 //Re-PUT map entries to trigger map update events
745 getGroupStoreKeyMap().
746 put(new GroupStoreKeyMapKey(existing.deviceId(),
747 existing.appCookie()), existing);
alshabib10580802015-02-18 18:30:33 -0800748 }
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700749 } else {
750 log.warn("addOrUpdateGroupEntry: Group update "
751 + "happening for a non-existing entry in the map");
alshabib10580802015-02-18 18:30:33 -0800752 }
753
754 if (event != null) {
755 notifyDelegate(event);
756 }
757 }
758
759 /**
760 * Removes the group entry from store.
761 *
762 * @param group group entry
763 */
764 @Override
765 public void removeGroupEntry(Group group) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700766 StoredGroupEntry existing = getStoredGroupEntry(group.deviceId(),
767 group.id());
alshabib10580802015-02-18 18:30:33 -0800768
769 if (existing != null) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700770 log.debug("removeGroupEntry: removing group entry {} in device {}",
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700771 group.id(),
772 group.deviceId());
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700773 //Removal from groupid based map will happen in the
774 //map update listener
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700775 getGroupStoreKeyMap().remove(new GroupStoreKeyMapKey(existing.deviceId(),
776 existing.appCookie()));
alshabib10580802015-02-18 18:30:33 -0800777 notifyDelegate(new GroupEvent(Type.GROUP_REMOVED, existing));
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700778 } else {
779 log.warn("removeGroupEntry for {} in device{} is "
780 + "not existing in our maps",
781 group.id(),
782 group.deviceId());
alshabib10580802015-02-18 18:30:33 -0800783 }
784 }
785
786 @Override
787 public void deviceInitialAuditCompleted(DeviceId deviceId,
788 boolean completed) {
789 synchronized (deviceAuditStatus) {
790 if (completed) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700791 log.debug("AUDIT completed for device {}",
792 deviceId);
alshabib10580802015-02-18 18:30:33 -0800793 deviceAuditStatus.put(deviceId, true);
794 // Execute all pending group requests
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700795 List<StoredGroupEntry> pendingGroupRequests =
796 getPendingGroupKeyTable().values()
797 .stream()
798 .filter(g-> g.deviceId().equals(deviceId))
799 .collect(Collectors.toList());
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700800 log.debug("processing pending group add requests for device {} and number of pending requests {}",
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700801 deviceId,
802 pendingGroupRequests.size());
803 for (Group group:pendingGroupRequests) {
alshabib10580802015-02-18 18:30:33 -0800804 GroupDescription tmp = new DefaultGroupDescription(
805 group.deviceId(),
806 group.type(),
807 group.buckets(),
808 group.appCookie(),
Saurav Das100e3b82015-04-30 11:12:10 -0700809 group.givenGroupId(),
alshabib10580802015-02-18 18:30:33 -0800810 group.appId());
811 storeGroupDescriptionInternal(tmp);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700812 getPendingGroupKeyTable().
813 remove(new GroupStoreKeyMapKey(deviceId, group.appCookie()));
alshabib10580802015-02-18 18:30:33 -0800814 }
alshabib10580802015-02-18 18:30:33 -0800815 } else {
Thomas Vachuskac40d4632015-04-09 16:55:03 -0700816 Boolean audited = deviceAuditStatus.get(deviceId);
817 if (audited != null && audited) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700818 log.debug("Clearing AUDIT status for device {}", deviceId);
alshabib10580802015-02-18 18:30:33 -0800819 deviceAuditStatus.put(deviceId, false);
820 }
821 }
822 }
823 }
824
825 @Override
826 public boolean deviceInitialAuditStatus(DeviceId deviceId) {
827 synchronized (deviceAuditStatus) {
Thomas Vachuskac40d4632015-04-09 16:55:03 -0700828 Boolean audited = deviceAuditStatus.get(deviceId);
829 return audited != null && audited;
alshabib10580802015-02-18 18:30:33 -0800830 }
831 }
832
833 @Override
834 public void groupOperationFailed(DeviceId deviceId, GroupOperation operation) {
835
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700836 StoredGroupEntry existing = getStoredGroupEntry(deviceId,
837 operation.groupId());
alshabib10580802015-02-18 18:30:33 -0800838
839 if (existing == null) {
840 log.warn("No group entry with ID {} found ", operation.groupId());
841 return;
842 }
843
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700844 log.warn("groupOperationFailed: group operation {} failed"
845 + "for group {} in device {}",
846 operation.opType(),
847 existing.id(),
848 existing.deviceId());
alshabib10580802015-02-18 18:30:33 -0800849 switch (operation.opType()) {
850 case ADD:
Srikanth Vavilapalli5428b6c2015-05-14 20:22:47 -0700851 if (existing.state() == GroupState.PENDING_ADD) {
852 //TODO: Need to add support for passing the group
853 //operation failure reason from group provider.
854 //If the error type is anything other than GROUP_EXISTS,
855 //then the GROUP_ADD_FAILED event should be raised even
856 //in PENDING_ADD_RETRY state also.
857 notifyDelegate(new GroupEvent(Type.GROUP_ADD_FAILED, existing));
858 log.warn("groupOperationFailed: cleaningup "
859 + "group {} from store in device {}....",
860 existing.id(),
861 existing.deviceId());
862 //Removal from groupid based map will happen in the
863 //map update listener
864 getGroupStoreKeyMap().remove(new GroupStoreKeyMapKey(existing.deviceId(),
865 existing.appCookie()));
866 }
alshabib10580802015-02-18 18:30:33 -0800867 break;
868 case MODIFY:
869 notifyDelegate(new GroupEvent(Type.GROUP_UPDATE_FAILED, existing));
870 break;
871 case DELETE:
872 notifyDelegate(new GroupEvent(Type.GROUP_REMOVE_FAILED, existing));
873 break;
874 default:
875 log.warn("Unknown group operation type {}", operation.opType());
876 }
alshabib10580802015-02-18 18:30:33 -0800877 }
878
879 @Override
880 public void addOrUpdateExtraneousGroupEntry(Group group) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700881 log.debug("add/update extraneous group entry {} in device {}",
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700882 group.id(),
883 group.deviceId());
alshabib10580802015-02-18 18:30:33 -0800884 ConcurrentMap<GroupId, Group> extraneousIdTable =
885 getExtraneousGroupIdTable(group.deviceId());
886 extraneousIdTable.put(group.id(), group);
887 // Check the reference counter
888 if (group.referenceCount() == 0) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700889 log.debug("Flow reference counter is zero and triggering remove",
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700890 group.id(),
891 group.deviceId());
alshabib10580802015-02-18 18:30:33 -0800892 notifyDelegate(new GroupEvent(Type.GROUP_REMOVE_REQUESTED, group));
893 }
894 }
895
896 @Override
897 public void removeExtraneousGroupEntry(Group group) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700898 log.debug("remove extraneous group entry {} of device {} from store",
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700899 group.id(),
900 group.deviceId());
alshabib10580802015-02-18 18:30:33 -0800901 ConcurrentMap<GroupId, Group> extraneousIdTable =
902 getExtraneousGroupIdTable(group.deviceId());
903 extraneousIdTable.remove(group.id());
904 }
905
906 @Override
907 public Iterable<Group> getExtraneousGroups(DeviceId deviceId) {
908 // flatten and make iterator unmodifiable
909 return FluentIterable.from(
910 getExtraneousGroupIdTable(deviceId).values());
911 }
912
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700913 /**
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700914 * Map handler to receive any events when the group key map is updated.
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700915 */
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700916 private class GroupStoreKeyMapListener implements
917 EventuallyConsistentMapListener<GroupStoreKeyMapKey, StoredGroupEntry> {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700918
919 @Override
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700920 public void event(EventuallyConsistentMapEvent<GroupStoreKeyMapKey,
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700921 StoredGroupEntry> mapEvent) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700922 GroupEvent groupEvent = null;
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700923 GroupStoreKeyMapKey key = mapEvent.key();
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700924 StoredGroupEntry group = mapEvent.value();
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700925 if ((key == null) && (group == null)) {
926 log.error("GroupStoreKeyMapListener: Received "
927 + "event {} with null entry", mapEvent.type());
928 return;
929 } else if (group == null) {
930 group = getGroupIdTable(key.deviceId()).values()
931 .stream()
932 .filter((storedGroup) -> (storedGroup.appCookie().equals(key.appCookie)))
933 .findFirst().get();
934 if (group == null) {
935 log.error("GroupStoreKeyMapListener: Received "
936 + "event {} with null entry... can not process", mapEvent.type());
937 return;
938 }
939 }
940 log.trace("received groupid map event {} for id {} in device {}",
941 mapEvent.type(),
942 group.id(),
943 key.deviceId());
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700944 if (mapEvent.type() == EventuallyConsistentMapEvent.Type.PUT) {
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700945 // Update the group ID table
946 getGroupIdTable(group.deviceId()).put(group.id(), group);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700947 if (mapEvent.value().state() == Group.GroupState.ADDED) {
948 if (mapEvent.value().isGroupStateAddedFirstTime()) {
949 groupEvent = new GroupEvent(Type.GROUP_ADDED,
950 mapEvent.value());
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700951 log.trace("Received first time GROUP_ADDED state update for id {} in device {}",
952 group.id(),
953 group.deviceId());
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700954 } else {
955 groupEvent = new GroupEvent(Type.GROUP_UPDATED,
956 mapEvent.value());
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700957 log.trace("Received following GROUP_ADDED state update for id {} in device {}",
958 group.id(),
959 group.deviceId());
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700960 }
961 }
962 } else if (mapEvent.type() == EventuallyConsistentMapEvent.Type.REMOVE) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700963 groupEvent = new GroupEvent(Type.GROUP_REMOVED, group);
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700964 // Remove the entry from the group ID table
965 getGroupIdTable(group.deviceId()).remove(group.id(), group);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700966 }
967
968 if (groupEvent != null) {
969 notifyDelegate(groupEvent);
970 }
971 }
972 }
973 /**
974 * Message handler to receive messages from group subsystems of
975 * other cluster members.
976 */
977 private final class ClusterGroupMsgHandler
978 implements ClusterMessageHandler {
979 @Override
980 public void handle(ClusterMessage message) {
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700981 if (message.subject().equals(
982 GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST)) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700983 GroupStoreMessage groupOp = kryoBuilder.
984 build().deserialize(message.payload());
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700985 log.debug("received remote group operation {} request for device {}",
986 groupOp.type(),
987 groupOp.deviceId());
988 if (mastershipService.
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700989 getLocalRole(groupOp.deviceId()) !=
Srikanth Vavilapalli23181912015-05-04 09:48:09 -0700990 MastershipRole.MASTER) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700991 log.warn("ClusterGroupMsgHandler: This node is not "
992 + "MASTER for device {}", groupOp.deviceId());
993 return;
994 }
995 if (groupOp.type() == GroupStoreMessage.Type.ADD) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700996 storeGroupDescriptionInternal(groupOp.groupDesc());
997 } else if (groupOp.type() == GroupStoreMessage.Type.UPDATE) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700998 updateGroupDescriptionInternal(groupOp.deviceId(),
999 groupOp.appCookie(),
1000 groupOp.updateType(),
1001 groupOp.updateBuckets(),
1002 groupOp.newAppCookie());
1003 } else if (groupOp.type() == GroupStoreMessage.Type.DELETE) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001004 deleteGroupDescriptionInternal(groupOp.deviceId(),
1005 groupOp.appCookie());
1006 }
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001007 } else {
1008 log.warn("ClusterGroupMsgHandler: Unknown remote message type {}",
1009 message.subject());
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001010 }
1011 }
1012 }
1013
1014 /**
1015 * Flattened map key to be used to store group entries.
1016 */
Ray Milkeyb3c5ce22015-08-10 09:07:36 -07001017 protected static class GroupStoreMapKey {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001018 private final DeviceId deviceId;
1019
1020 public GroupStoreMapKey(DeviceId deviceId) {
1021 this.deviceId = deviceId;
1022 }
1023
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001024 public DeviceId deviceId() {
1025 return deviceId;
1026 }
1027
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001028 @Override
1029 public boolean equals(Object o) {
1030 if (this == o) {
1031 return true;
1032 }
1033 if (!(o instanceof GroupStoreMapKey)) {
1034 return false;
1035 }
1036 GroupStoreMapKey that = (GroupStoreMapKey) o;
1037 return this.deviceId.equals(that.deviceId);
1038 }
1039
1040 @Override
1041 public int hashCode() {
1042 int result = 17;
1043
1044 result = 31 * result + Objects.hash(this.deviceId);
1045
1046 return result;
1047 }
1048 }
1049
Ray Milkeyb3c5ce22015-08-10 09:07:36 -07001050 protected static class GroupStoreKeyMapKey extends GroupStoreMapKey {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001051 private final GroupKey appCookie;
1052 public GroupStoreKeyMapKey(DeviceId deviceId,
1053 GroupKey appCookie) {
1054 super(deviceId);
1055 this.appCookie = appCookie;
1056 }
1057
1058 @Override
1059 public boolean equals(Object o) {
1060 if (this == o) {
1061 return true;
1062 }
1063 if (!(o instanceof GroupStoreKeyMapKey)) {
1064 return false;
1065 }
1066 GroupStoreKeyMapKey that = (GroupStoreKeyMapKey) o;
1067 return (super.equals(that) &&
1068 this.appCookie.equals(that.appCookie));
1069 }
1070
1071 @Override
1072 public int hashCode() {
1073 int result = 17;
1074
1075 result = 31 * result + super.hashCode() + Objects.hash(this.appCookie);
1076
1077 return result;
1078 }
1079 }
1080
Ray Milkeyb3c5ce22015-08-10 09:07:36 -07001081 protected static class GroupStoreIdMapKey extends GroupStoreMapKey {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -07001082 private final GroupId groupId;
1083 public GroupStoreIdMapKey(DeviceId deviceId,
1084 GroupId groupId) {
1085 super(deviceId);
1086 this.groupId = groupId;
1087 }
1088
1089 @Override
1090 public boolean equals(Object o) {
1091 if (this == o) {
1092 return true;
1093 }
1094 if (!(o instanceof GroupStoreIdMapKey)) {
1095 return false;
1096 }
1097 GroupStoreIdMapKey that = (GroupStoreIdMapKey) o;
1098 return (super.equals(that) &&
1099 this.groupId.equals(that.groupId));
1100 }
1101
1102 @Override
1103 public int hashCode() {
1104 int result = 17;
1105
1106 result = 31 * result + super.hashCode() + Objects.hash(this.groupId);
1107
1108 return result;
1109 }
1110 }
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001111
1112 @Override
1113 public void pushGroupMetrics(DeviceId deviceId,
1114 Collection<Group> groupEntries) {
1115 boolean deviceInitialAuditStatus =
1116 deviceInitialAuditStatus(deviceId);
1117 Set<Group> southboundGroupEntries =
1118 Sets.newHashSet(groupEntries);
1119 Set<StoredGroupEntry> storedGroupEntries =
1120 Sets.newHashSet(getStoredGroups(deviceId));
1121 Set<Group> extraneousStoredEntries =
1122 Sets.newHashSet(getExtraneousGroups(deviceId));
1123
1124 log.trace("pushGroupMetrics: Displaying all ({}) southboundGroupEntries for device {}",
1125 southboundGroupEntries.size(),
1126 deviceId);
1127 for (Iterator<Group> it = southboundGroupEntries.iterator(); it.hasNext();) {
1128 Group group = it.next();
1129 log.trace("Group {} in device {}", group, deviceId);
1130 }
1131
1132 log.trace("Displaying all ({}) stored group entries for device {}",
1133 storedGroupEntries.size(),
1134 deviceId);
1135 for (Iterator<StoredGroupEntry> it1 = storedGroupEntries.iterator();
1136 it1.hasNext();) {
1137 Group group = it1.next();
1138 log.trace("Stored Group {} for device {}", group, deviceId);
1139 }
1140
1141 for (Iterator<Group> it2 = southboundGroupEntries.iterator(); it2.hasNext();) {
1142 Group group = it2.next();
1143 if (storedGroupEntries.remove(group)) {
1144 // we both have the group, let's update some info then.
1145 log.trace("Group AUDIT: group {} exists in both planes for device {}",
1146 group.id(), deviceId);
1147 groupAdded(group);
1148 it2.remove();
1149 }
1150 }
1151 for (Group group : southboundGroupEntries) {
1152 if (getGroup(group.deviceId(), group.id()) != null) {
1153 // There is a group existing with the same id
1154 // It is possible that group update is
1155 // in progress while we got a stale info from switch
1156 if (!storedGroupEntries.remove(getGroup(
1157 group.deviceId(), group.id()))) {
1158 log.warn("Group AUDIT: Inconsistent state:"
1159 + "Group exists in ID based table while "
1160 + "not present in key based table");
1161 }
1162 } else {
1163 // there are groups in the switch that aren't in the store
1164 log.debug("Group AUDIT: extraneous group {} exists in data plane for device {}",
1165 group.id(), deviceId);
1166 extraneousStoredEntries.remove(group);
1167 extraneousGroup(group);
1168 }
1169 }
1170 for (Group group : storedGroupEntries) {
1171 // there are groups in the store that aren't in the switch
1172 log.debug("Group AUDIT: group {} missing in data plane for device {}",
1173 group.id(), deviceId);
1174 groupMissing(group);
1175 }
1176 for (Group group : extraneousStoredEntries) {
1177 // there are groups in the extraneous store that
1178 // aren't in the switch
1179 log.debug("Group AUDIT: clearing extransoeus group {} from store for device {}",
1180 group.id(), deviceId);
1181 removeExtraneousGroupEntry(group);
1182 }
1183
1184 if (!deviceInitialAuditStatus) {
1185 log.debug("Group AUDIT: Setting device {} initial AUDIT completed",
1186 deviceId);
1187 deviceInitialAuditCompleted(deviceId, true);
1188 }
1189 }
1190
1191 private void groupMissing(Group group) {
1192 switch (group.state()) {
1193 case PENDING_DELETE:
1194 log.debug("Group {} delete confirmation from device {}",
1195 group, group.deviceId());
1196 removeGroupEntry(group);
1197 break;
1198 case ADDED:
1199 case PENDING_ADD:
Srikanth Vavilapalli5428b6c2015-05-14 20:22:47 -07001200 case PENDING_ADD_RETRY:
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001201 case PENDING_UPDATE:
1202 log.debug("Group {} is in store but not on device {}",
1203 group, group.deviceId());
1204 StoredGroupEntry existing =
1205 getStoredGroupEntry(group.deviceId(), group.id());
Srikanth Vavilapalli5428b6c2015-05-14 20:22:47 -07001206 log.debug("groupMissing: group entry {} in device {} moving from {} to PENDING_ADD_RETRY",
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001207 existing.id(),
1208 existing.deviceId(),
1209 existing.state());
Srikanth Vavilapalli5428b6c2015-05-14 20:22:47 -07001210 existing.setState(Group.GroupState.PENDING_ADD_RETRY);
Srikanth Vavilapalli23181912015-05-04 09:48:09 -07001211 //Re-PUT map entries to trigger map update events
1212 getGroupStoreKeyMap().
1213 put(new GroupStoreKeyMapKey(existing.deviceId(),
1214 existing.appCookie()), existing);
1215 notifyDelegate(new GroupEvent(GroupEvent.Type.GROUP_ADD_REQUESTED,
1216 group));
1217 break;
1218 default:
1219 log.debug("Group {} has not been installed.", group);
1220 break;
1221 }
1222 }
1223
1224 private void extraneousGroup(Group group) {
1225 log.debug("Group {} is on device {} but not in store.",
1226 group, group.deviceId());
1227 addOrUpdateExtraneousGroupEntry(group);
1228 }
1229
1230 private void groupAdded(Group group) {
1231 log.trace("Group {} Added or Updated in device {}",
1232 group, group.deviceId());
1233 addOrUpdateGroupEntry(group);
1234 }
alshabib10580802015-02-18 18:30:33 -08001235}