blob: ae1669bed756f260e97230a946a5064492ed2cb0 [file] [log] [blame]
alshabib10580802015-02-18 18:30:33 -08001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.group.impl;
17
Jonathan Hart6ec029a2015-03-24 17:12:35 -070018import com.google.common.collect.FluentIterable;
19import com.google.common.collect.Iterables;
alshabib10580802015-02-18 18:30:33 -080020import org.apache.felix.scr.annotations.Activate;
21import org.apache.felix.scr.annotations.Component;
22import org.apache.felix.scr.annotations.Deactivate;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070023import org.apache.felix.scr.annotations.Reference;
24import org.apache.felix.scr.annotations.ReferenceCardinality;
alshabib10580802015-02-18 18:30:33 -080025import org.apache.felix.scr.annotations.Service;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070026import org.onlab.util.KryoNamespace;
alshabib10580802015-02-18 18:30:33 -080027import org.onlab.util.NewConcurrentHashMap;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070028import org.onosproject.cluster.ClusterService;
29import org.onosproject.core.DefaultApplicationId;
alshabib10580802015-02-18 18:30:33 -080030import org.onosproject.core.DefaultGroupId;
31import org.onosproject.core.GroupId;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070032import org.onosproject.mastership.MastershipService;
alshabib10580802015-02-18 18:30:33 -080033import org.onosproject.net.DeviceId;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070034import org.onosproject.net.MastershipRole;
35import org.onosproject.net.PortNumber;
36import org.onosproject.net.flow.DefaultTrafficTreatment;
37import org.onosproject.net.flow.FlowRule;
38import org.onosproject.net.flow.instructions.Instructions;
39import org.onosproject.net.flow.instructions.L0ModificationInstruction;
40import org.onosproject.net.flow.instructions.L2ModificationInstruction;
41import org.onosproject.net.flow.instructions.L3ModificationInstruction;
alshabib10580802015-02-18 18:30:33 -080042import org.onosproject.net.group.DefaultGroup;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070043import org.onosproject.net.group.DefaultGroupBucket;
alshabib10580802015-02-18 18:30:33 -080044import org.onosproject.net.group.DefaultGroupDescription;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070045import org.onosproject.net.group.DefaultGroupKey;
alshabib10580802015-02-18 18:30:33 -080046import org.onosproject.net.group.Group;
47import org.onosproject.net.group.Group.GroupState;
48import org.onosproject.net.group.GroupBucket;
49import org.onosproject.net.group.GroupBuckets;
50import org.onosproject.net.group.GroupDescription;
51import org.onosproject.net.group.GroupEvent;
52import org.onosproject.net.group.GroupEvent.Type;
53import org.onosproject.net.group.GroupKey;
54import org.onosproject.net.group.GroupOperation;
55import org.onosproject.net.group.GroupStore;
56import org.onosproject.net.group.GroupStoreDelegate;
57import org.onosproject.net.group.StoredGroupEntry;
58import org.onosproject.store.AbstractStore;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070059import org.onosproject.store.Timestamp;
60import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
61import org.onosproject.store.cluster.messaging.ClusterMessage;
62import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070063import org.onosproject.store.impl.MultiValuedTimestamp;
64import org.onosproject.store.serializers.KryoNamespaces;
Jonathan Hart6ec029a2015-03-24 17:12:35 -070065import org.onosproject.store.service.ClockService;
66import org.onosproject.store.service.EventuallyConsistentMap;
67import org.onosproject.store.service.EventuallyConsistentMapBuilder;
68import org.onosproject.store.service.EventuallyConsistentMapEvent;
69import org.onosproject.store.service.EventuallyConsistentMapListener;
70import org.onosproject.store.service.StorageService;
alshabib10580802015-02-18 18:30:33 -080071import org.slf4j.Logger;
72
Jonathan Hart6ec029a2015-03-24 17:12:35 -070073import java.net.URI;
74import java.util.ArrayList;
75import java.util.HashMap;
76import java.util.List;
77import java.util.Objects;
78import java.util.concurrent.ConcurrentHashMap;
79import java.util.concurrent.ConcurrentMap;
80import java.util.concurrent.ExecutorService;
81import java.util.concurrent.Executors;
82import java.util.concurrent.atomic.AtomicInteger;
83import java.util.concurrent.atomic.AtomicLong;
84import java.util.stream.Collectors;
85
86import static org.apache.commons.lang3.concurrent.ConcurrentUtils.createIfAbsentUnchecked;
87import static org.onlab.util.Tools.groupedThreads;
88import static org.slf4j.LoggerFactory.getLogger;
alshabib10580802015-02-18 18:30:33 -080089
90/**
91 * Manages inventory of group entries using trivial in-memory implementation.
92 */
93@Component(immediate = true)
94@Service
95public class DistributedGroupStore
96 extends AbstractStore<GroupEvent, GroupStoreDelegate>
97 implements GroupStore {
98
99 private final Logger log = getLogger(getClass());
100
101 private final int dummyId = 0xffffffff;
102 private final GroupId dummyGroupId = new DefaultGroupId(dummyId);
103
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700104 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
105 protected ClusterCommunicationService clusterCommunicator;
106
107 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
108 protected ClusterService clusterService;
109
110 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700111 protected StorageService storageService;
112
113 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700114 protected MastershipService mastershipService;
115
116 // Per device group table with (device id + app cookie) as key
117 private EventuallyConsistentMap<GroupStoreKeyMapKey,
118 StoredGroupEntry> groupStoreEntriesByKey = null;
119 // Per device group table with (device id + group id) as key
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700120 private final ConcurrentMap<DeviceId, ConcurrentMap<GroupId, StoredGroupEntry>>
121 groupEntriesById = new ConcurrentHashMap<>();
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700122 private EventuallyConsistentMap<GroupStoreKeyMapKey,
123 StoredGroupEntry> auditPendingReqQueue = null;
alshabib10580802015-02-18 18:30:33 -0800124 private final ConcurrentMap<DeviceId, ConcurrentMap<GroupId, Group>>
125 extraneousGroupEntriesById = new ConcurrentHashMap<>();
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700126 private ExecutorService messageHandlingExecutor;
127 private static final int MESSAGE_HANDLER_THREAD_POOL_SIZE = 1;
alshabib10580802015-02-18 18:30:33 -0800128
129 private final HashMap<DeviceId, Boolean> deviceAuditStatus =
130 new HashMap<DeviceId, Boolean>();
131
132 private final AtomicInteger groupIdGen = new AtomicInteger();
133
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700134 private KryoNamespace.Builder kryoBuilder = null;
135
alshabib10580802015-02-18 18:30:33 -0800136 @Activate
137 public void activate() {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700138 kryoBuilder = new KryoNamespace.Builder()
139 .register(DefaultGroup.class,
140 DefaultGroupBucket.class,
141 DefaultGroupDescription.class,
142 DefaultGroupKey.class,
143 GroupDescription.Type.class,
144 Group.GroupState.class,
145 GroupBuckets.class,
146 DefaultGroupId.class,
147 GroupStoreMessage.class,
148 GroupStoreMessage.Type.class,
149 UpdateType.class,
150 GroupStoreMessageSubjects.class,
151 MultiValuedTimestamp.class,
152 GroupStoreKeyMapKey.class,
153 GroupStoreIdMapKey.class,
154 GroupStoreMapKey.class
155 )
156 .register(URI.class)
157 .register(DeviceId.class)
158 .register(PortNumber.class)
159 .register(DefaultApplicationId.class)
160 .register(DefaultTrafficTreatment.class,
161 Instructions.DropInstruction.class,
162 Instructions.OutputInstruction.class,
163 Instructions.GroupInstruction.class,
164 Instructions.TableTypeTransition.class,
165 FlowRule.Type.class,
166 L0ModificationInstruction.class,
167 L0ModificationInstruction.L0SubType.class,
168 L0ModificationInstruction.ModLambdaInstruction.class,
169 L2ModificationInstruction.class,
170 L2ModificationInstruction.L2SubType.class,
171 L2ModificationInstruction.ModEtherInstruction.class,
172 L2ModificationInstruction.PushHeaderInstructions.class,
173 L2ModificationInstruction.ModVlanIdInstruction.class,
174 L2ModificationInstruction.ModVlanPcpInstruction.class,
175 L2ModificationInstruction.ModMplsLabelInstruction.class,
176 L2ModificationInstruction.ModMplsTtlInstruction.class,
177 L3ModificationInstruction.class,
178 L3ModificationInstruction.L3SubType.class,
179 L3ModificationInstruction.ModIPInstruction.class,
180 L3ModificationInstruction.ModIPv6FlowLabelInstruction.class,
181 L3ModificationInstruction.ModTtlInstruction.class,
182 org.onlab.packet.MplsLabel.class
183 )
184 .register(org.onosproject.cluster.NodeId.class)
185 .register(KryoNamespaces.BASIC)
186 .register(KryoNamespaces.MISC);
187
188 messageHandlingExecutor = Executors.
189 newFixedThreadPool(MESSAGE_HANDLER_THREAD_POOL_SIZE,
190 groupedThreads("onos/store/group",
191 "message-handlers"));
192 clusterCommunicator.
193 addSubscriber(GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST,
194 new ClusterGroupMsgHandler(),
195 messageHandlingExecutor);
196
197 log.debug("Creating EC map groupstorekeymap");
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700198 EventuallyConsistentMapBuilder<GroupStoreKeyMapKey, StoredGroupEntry>
199 keyMapBuilder = storageService.eventuallyConsistentMapBuilder();
200
201 groupStoreEntriesByKey = keyMapBuilder
202 .withName("groupstorekeymap")
203 .withSerializer(kryoBuilder)
204 .withClockService(new GroupStoreLogicalClockManager<>())
205 .build();
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700206 groupStoreEntriesByKey.addListener(new GroupStoreKeyMapListener());
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700207 log.trace("Current size {}", groupStoreEntriesByKey.size());
208
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700209 log.debug("Creating EC map pendinggroupkeymap");
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700210 EventuallyConsistentMapBuilder<GroupStoreKeyMapKey, StoredGroupEntry>
211 auditMapBuilder = storageService.eventuallyConsistentMapBuilder();
212
213 auditPendingReqQueue = auditMapBuilder
214 .withName("pendinggroupkeymap")
215 .withSerializer(kryoBuilder)
216 .withClockService(new GroupStoreLogicalClockManager<>())
217 .build();
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700218 log.trace("Current size {}", auditPendingReqQueue.size());
219
alshabib10580802015-02-18 18:30:33 -0800220 log.info("Started");
221 }
222
223 @Deactivate
224 public void deactivate() {
Thomas Vachuska152f9fd2015-04-02 16:28:13 -0700225 groupStoreEntriesByKey.destroy();
Thomas Vachuska152f9fd2015-04-02 16:28:13 -0700226 auditPendingReqQueue.destroy();
alshabib10580802015-02-18 18:30:33 -0800227 log.info("Stopped");
228 }
229
alshabib10580802015-02-18 18:30:33 -0800230 private static NewConcurrentHashMap<GroupId, Group>
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700231 lazyEmptyExtraneousGroupIdTable() {
alshabib10580802015-02-18 18:30:33 -0800232 return NewConcurrentHashMap.<GroupId, Group>ifNeeded();
233 }
234
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700235 private static NewConcurrentHashMap<GroupId, StoredGroupEntry>
236 lazyEmptyGroupIdTable() {
237 return NewConcurrentHashMap.<GroupId, StoredGroupEntry>ifNeeded();
238 }
239
alshabib10580802015-02-18 18:30:33 -0800240 /**
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700241 * Returns the group store eventual consistent key map.
alshabib10580802015-02-18 18:30:33 -0800242 *
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700243 * @return Map representing group key table.
alshabib10580802015-02-18 18:30:33 -0800244 */
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700245 private EventuallyConsistentMap<GroupStoreKeyMapKey, StoredGroupEntry>
246 getGroupStoreKeyMap() {
247 return groupStoreEntriesByKey;
alshabib10580802015-02-18 18:30:33 -0800248 }
249
250 /**
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700251 * Returns the group id table for specified device.
alshabib10580802015-02-18 18:30:33 -0800252 *
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700253 * @param deviceId identifier of the device
254 * @return Map representing group key table of given device.
alshabib10580802015-02-18 18:30:33 -0800255 */
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700256 private ConcurrentMap<GroupId, StoredGroupEntry> getGroupIdTable(DeviceId deviceId) {
257 return createIfAbsentUnchecked(groupEntriesById,
258 deviceId, lazyEmptyGroupIdTable());
alshabib10580802015-02-18 18:30:33 -0800259 }
260
261 /**
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700262 * Returns the pending group request table.
alshabib10580802015-02-18 18:30:33 -0800263 *
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700264 * @return Map representing group key table.
alshabib10580802015-02-18 18:30:33 -0800265 */
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700266 private EventuallyConsistentMap<GroupStoreKeyMapKey, StoredGroupEntry>
267 getPendingGroupKeyTable() {
268 return auditPendingReqQueue;
alshabib10580802015-02-18 18:30:33 -0800269 }
270
271 /**
272 * Returns the extraneous group id table for specified device.
273 *
274 * @param deviceId identifier of the device
275 * @return Map representing group key table of given device.
276 */
277 private ConcurrentMap<GroupId, Group>
278 getExtraneousGroupIdTable(DeviceId deviceId) {
279 return createIfAbsentUnchecked(extraneousGroupEntriesById,
280 deviceId,
281 lazyEmptyExtraneousGroupIdTable());
282 }
283
284 /**
285 * Returns the number of groups for the specified device in the store.
286 *
287 * @return number of groups for the specified device
288 */
289 @Override
290 public int getGroupCount(DeviceId deviceId) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700291 return (getGroups(deviceId) != null) ?
292 Iterables.size(getGroups(deviceId)) : 0;
alshabib10580802015-02-18 18:30:33 -0800293 }
294
295 /**
296 * Returns the groups associated with a device.
297 *
298 * @param deviceId the device ID
299 *
300 * @return the group entries
301 */
302 @Override
303 public Iterable<Group> getGroups(DeviceId deviceId) {
304 // flatten and make iterator unmodifiable
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700305 log.trace("getGroups: for device {} total number of groups {}",
306 deviceId, getGroupStoreKeyMap().values().size());
307 return FluentIterable.from(getGroupStoreKeyMap().values())
308 .filter(input -> input.deviceId().equals(deviceId))
309 .transform(input -> input);
alshabib10580802015-02-18 18:30:33 -0800310 }
311
312 /**
313 * Returns the stored group entry.
314 *
315 * @param deviceId the device ID
316 * @param appCookie the group key
317 *
318 * @return a group associated with the key
319 */
320 @Override
321 public Group getGroup(DeviceId deviceId, GroupKey appCookie) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700322 return getStoredGroupEntry(deviceId, appCookie);
323 }
324
325 private StoredGroupEntry getStoredGroupEntry(DeviceId deviceId,
326 GroupKey appCookie) {
327 return getGroupStoreKeyMap().get(new GroupStoreKeyMapKey(deviceId,
328 appCookie));
329 }
330
331 @Override
332 public Group getGroup(DeviceId deviceId, GroupId groupId) {
333 return getStoredGroupEntry(deviceId, groupId);
334 }
335
336 private StoredGroupEntry getStoredGroupEntry(DeviceId deviceId,
337 GroupId groupId) {
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700338 return getGroupIdTable(deviceId).get(groupId);
alshabib10580802015-02-18 18:30:33 -0800339 }
340
341 private int getFreeGroupIdValue(DeviceId deviceId) {
342 int freeId = groupIdGen.incrementAndGet();
343
344 while (true) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700345 Group existing = getGroup(deviceId, new DefaultGroupId(freeId));
alshabib10580802015-02-18 18:30:33 -0800346 if (existing == null) {
347 existing = (
348 extraneousGroupEntriesById.get(deviceId) != null) ?
349 extraneousGroupEntriesById.get(deviceId).
350 get(new DefaultGroupId(freeId)) :
351 null;
352 }
353 if (existing != null) {
354 freeId = groupIdGen.incrementAndGet();
355 } else {
356 break;
357 }
358 }
359 return freeId;
360 }
361
362 /**
363 * Stores a new group entry using the information from group description.
364 *
365 * @param groupDesc group description to be used to create group entry
366 */
367 @Override
368 public void storeGroupDescription(GroupDescription groupDesc) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700369 log.trace("In storeGroupDescription");
alshabib10580802015-02-18 18:30:33 -0800370 // Check if a group is existing with the same key
371 if (getGroup(groupDesc.deviceId(), groupDesc.appCookie()) != null) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700372 log.warn("Group already exists with the same key {}",
373 groupDesc.appCookie());
alshabib10580802015-02-18 18:30:33 -0800374 return;
375 }
376
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700377 // Check if group to be created by a remote instance
378 if (mastershipService.getLocalRole(
379 groupDesc.deviceId()) != MastershipRole.MASTER) {
380 log.debug("Device {} local role is not MASTER",
381 groupDesc.deviceId());
382 GroupStoreMessage groupOp = GroupStoreMessage.
383 createGroupAddRequestMsg(groupDesc.deviceId(),
384 groupDesc);
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700385
386 if (!clusterCommunicator.unicast(groupOp,
387 GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST,
388 m -> kryoBuilder.build().serialize(m),
389 mastershipService.getMasterFor(groupDesc.deviceId()))) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700390 log.warn("Failed to send request to master: {} to {}",
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700391 groupOp,
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700392 mastershipService.getMasterFor(groupDesc.deviceId()));
393 //TODO: Send Group operation failure event
394 }
395 log.debug("Sent Group operation request for device {} "
396 + "to remote MASTER {}",
397 groupDesc.deviceId(),
398 mastershipService.getMasterFor(groupDesc.deviceId()));
alshabib10580802015-02-18 18:30:33 -0800399 return;
400 }
401
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700402 log.debug("Store group for device {} is getting handled locally",
403 groupDesc.deviceId());
alshabib10580802015-02-18 18:30:33 -0800404 storeGroupDescriptionInternal(groupDesc);
405 }
406
407 private void storeGroupDescriptionInternal(GroupDescription groupDesc) {
408 // Check if a group is existing with the same key
409 if (getGroup(groupDesc.deviceId(), groupDesc.appCookie()) != null) {
410 return;
411 }
412
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700413 if (deviceAuditStatus.get(groupDesc.deviceId()) == null) {
414 // Device group audit has not completed yet
415 // Add this group description to pending group key table
416 // Create a group entry object with Dummy Group ID
417 log.debug("storeGroupDescriptionInternal: Device {} AUDIT "
418 + "pending...Queuing Group ADD request",
419 groupDesc.deviceId());
420 StoredGroupEntry group = new DefaultGroup(dummyGroupId, groupDesc);
421 group.setState(GroupState.WAITING_AUDIT_COMPLETE);
422 EventuallyConsistentMap<GroupStoreKeyMapKey, StoredGroupEntry> pendingKeyTable =
423 getPendingGroupKeyTable();
424 pendingKeyTable.put(new GroupStoreKeyMapKey(groupDesc.deviceId(),
425 groupDesc.appCookie()),
426 group);
427 return;
428 }
429
alshabib10580802015-02-18 18:30:33 -0800430 // Get a new group identifier
431 GroupId id = new DefaultGroupId(getFreeGroupIdValue(groupDesc.deviceId()));
432 // Create a group entry object
433 StoredGroupEntry group = new DefaultGroup(id, groupDesc);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700434 // Insert the newly created group entry into key and id maps
435 getGroupStoreKeyMap().
436 put(new GroupStoreKeyMapKey(groupDesc.deviceId(),
437 groupDesc.appCookie()), group);
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700438 // Ensure it also inserted into group id based table to
439 // avoid any chances of duplication in group id generation
440 getGroupIdTable(groupDesc.deviceId()).
441 put(id, group);
alshabib10580802015-02-18 18:30:33 -0800442 notifyDelegate(new GroupEvent(GroupEvent.Type.GROUP_ADD_REQUESTED,
443 group));
444 }
445
446 /**
447 * Updates the existing group entry with the information
448 * from group description.
449 *
450 * @param deviceId the device ID
451 * @param oldAppCookie the current group key
452 * @param type update type
453 * @param newBuckets group buckets for updates
454 * @param newAppCookie optional new group key
455 */
456 @Override
457 public void updateGroupDescription(DeviceId deviceId,
458 GroupKey oldAppCookie,
459 UpdateType type,
460 GroupBuckets newBuckets,
461 GroupKey newAppCookie) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700462 // Check if group update to be done by a remote instance
463 if (mastershipService.
464 getLocalRole(deviceId) != MastershipRole.MASTER) {
465 GroupStoreMessage groupOp = GroupStoreMessage.
466 createGroupUpdateRequestMsg(deviceId,
467 oldAppCookie,
468 type,
469 newBuckets,
470 newAppCookie);
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700471
472 if (!clusterCommunicator.unicast(groupOp,
473 GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST,
474 m -> kryoBuilder.build().serialize(m),
475 mastershipService.getMasterFor(deviceId))) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700476 log.warn("Failed to send request to master: {} to {}",
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700477 groupOp,
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700478 mastershipService.getMasterFor(deviceId));
479 //TODO: Send Group operation failure event
480 }
481 return;
482 }
483 updateGroupDescriptionInternal(deviceId,
484 oldAppCookie,
485 type,
486 newBuckets,
487 newAppCookie);
488 }
489
490 private void updateGroupDescriptionInternal(DeviceId deviceId,
491 GroupKey oldAppCookie,
492 UpdateType type,
493 GroupBuckets newBuckets,
494 GroupKey newAppCookie) {
alshabib10580802015-02-18 18:30:33 -0800495 // Check if a group is existing with the provided key
496 Group oldGroup = getGroup(deviceId, oldAppCookie);
497 if (oldGroup == null) {
498 return;
499 }
500
501 List<GroupBucket> newBucketList = getUpdatedBucketList(oldGroup,
502 type,
503 newBuckets);
504 if (newBucketList != null) {
505 // Create a new group object from the old group
506 GroupBuckets updatedBuckets = new GroupBuckets(newBucketList);
507 GroupKey newCookie = (newAppCookie != null) ? newAppCookie : oldAppCookie;
508 GroupDescription updatedGroupDesc = new DefaultGroupDescription(
509 oldGroup.deviceId(),
510 oldGroup.type(),
511 updatedBuckets,
512 newCookie,
513 oldGroup.appId());
514 StoredGroupEntry newGroup = new DefaultGroup(oldGroup.id(),
515 updatedGroupDesc);
516 newGroup.setState(GroupState.PENDING_UPDATE);
517 newGroup.setLife(oldGroup.life());
518 newGroup.setPackets(oldGroup.packets());
519 newGroup.setBytes(oldGroup.bytes());
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700520 //Update the group entry in groupkey based map.
521 //Update to groupid based map will happen in the
522 //groupkey based map update listener
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700523 getGroupStoreKeyMap().
524 put(new GroupStoreKeyMapKey(newGroup.deviceId(),
525 newGroup.appCookie()), newGroup);
alshabib10580802015-02-18 18:30:33 -0800526 notifyDelegate(new GroupEvent(Type.GROUP_UPDATE_REQUESTED, newGroup));
527 }
528 }
529
530 private List<GroupBucket> getUpdatedBucketList(Group oldGroup,
531 UpdateType type,
532 GroupBuckets buckets) {
533 GroupBuckets oldBuckets = oldGroup.buckets();
534 List<GroupBucket> newBucketList = new ArrayList<GroupBucket>(
535 oldBuckets.buckets());
536 boolean groupDescUpdated = false;
537
538 if (type == UpdateType.ADD) {
539 // Check if the any of the new buckets are part of
540 // the old bucket list
541 for (GroupBucket addBucket:buckets.buckets()) {
542 if (!newBucketList.contains(addBucket)) {
543 newBucketList.add(addBucket);
544 groupDescUpdated = true;
545 }
546 }
547 } else if (type == UpdateType.REMOVE) {
548 // Check if the to be removed buckets are part of the
549 // old bucket list
550 for (GroupBucket removeBucket:buckets.buckets()) {
551 if (newBucketList.contains(removeBucket)) {
552 newBucketList.remove(removeBucket);
553 groupDescUpdated = true;
554 }
555 }
556 }
557
558 if (groupDescUpdated) {
559 return newBucketList;
560 } else {
561 return null;
562 }
563 }
564
565 /**
566 * Triggers deleting the existing group entry.
567 *
568 * @param deviceId the device ID
569 * @param appCookie the group key
570 */
571 @Override
572 public void deleteGroupDescription(DeviceId deviceId,
573 GroupKey appCookie) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700574 // Check if group to be deleted by a remote instance
575 if (mastershipService.
576 getLocalRole(deviceId) != MastershipRole.MASTER) {
577 GroupStoreMessage groupOp = GroupStoreMessage.
578 createGroupDeleteRequestMsg(deviceId,
579 appCookie);
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700580
581 if (!clusterCommunicator.unicast(groupOp,
582 GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST,
583 m -> kryoBuilder.build().serialize(m),
584 mastershipService.getMasterFor(deviceId))) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700585 log.warn("Failed to send request to master: {} to {}",
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700586 groupOp,
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700587 mastershipService.getMasterFor(deviceId));
588 //TODO: Send Group operation failure event
589 }
590 return;
591 }
592 deleteGroupDescriptionInternal(deviceId, appCookie);
593 }
594
595 private void deleteGroupDescriptionInternal(DeviceId deviceId,
596 GroupKey appCookie) {
alshabib10580802015-02-18 18:30:33 -0800597 // Check if a group is existing with the provided key
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700598 StoredGroupEntry existing = getStoredGroupEntry(deviceId, appCookie);
alshabib10580802015-02-18 18:30:33 -0800599 if (existing == null) {
600 return;
601 }
602
603 synchronized (existing) {
604 existing.setState(GroupState.PENDING_DELETE);
605 }
606 notifyDelegate(new GroupEvent(Type.GROUP_REMOVE_REQUESTED, existing));
607 }
608
609 /**
610 * Stores a new group entry, or updates an existing entry.
611 *
612 * @param group group entry
613 */
614 @Override
615 public void addOrUpdateGroupEntry(Group group) {
616 // check if this new entry is an update to an existing entry
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700617 StoredGroupEntry existing = getStoredGroupEntry(group.deviceId(),
618 group.id());
alshabib10580802015-02-18 18:30:33 -0800619 GroupEvent event = null;
620
621 if (existing != null) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700622 log.trace("addOrUpdateGroupEntry: updating group "
623 + "entry {} in device {}",
624 group.id(),
625 group.deviceId());
alshabib10580802015-02-18 18:30:33 -0800626 synchronized (existing) {
627 existing.setLife(group.life());
628 existing.setPackets(group.packets());
629 existing.setBytes(group.bytes());
630 if (existing.state() == GroupState.PENDING_ADD) {
631 existing.setState(GroupState.ADDED);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700632 existing.setIsGroupStateAddedFirstTime(true);
alshabib10580802015-02-18 18:30:33 -0800633 event = new GroupEvent(Type.GROUP_ADDED, existing);
634 } else {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700635 existing.setState(GroupState.ADDED);
636 existing.setIsGroupStateAddedFirstTime(false);
alshabib10580802015-02-18 18:30:33 -0800637 event = new GroupEvent(Type.GROUP_UPDATED, existing);
638 }
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700639 //Re-PUT map entries to trigger map update events
640 getGroupStoreKeyMap().
641 put(new GroupStoreKeyMapKey(existing.deviceId(),
642 existing.appCookie()), existing);
alshabib10580802015-02-18 18:30:33 -0800643 }
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700644 } else {
645 log.warn("addOrUpdateGroupEntry: Group update "
646 + "happening for a non-existing entry in the map");
alshabib10580802015-02-18 18:30:33 -0800647 }
648
649 if (event != null) {
650 notifyDelegate(event);
651 }
652 }
653
654 /**
655 * Removes the group entry from store.
656 *
657 * @param group group entry
658 */
659 @Override
660 public void removeGroupEntry(Group group) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700661 StoredGroupEntry existing = getStoredGroupEntry(group.deviceId(),
662 group.id());
alshabib10580802015-02-18 18:30:33 -0800663
664 if (existing != null) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700665 log.trace("removeGroupEntry: removing group "
666 + "entry {} in device {}",
667 group.id(),
668 group.deviceId());
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700669 //Removal from groupid based map will happen in the
670 //map update listener
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700671 getGroupStoreKeyMap().remove(new GroupStoreKeyMapKey(existing.deviceId(),
672 existing.appCookie()));
alshabib10580802015-02-18 18:30:33 -0800673 notifyDelegate(new GroupEvent(Type.GROUP_REMOVED, existing));
674 }
675 }
676
677 @Override
678 public void deviceInitialAuditCompleted(DeviceId deviceId,
679 boolean completed) {
680 synchronized (deviceAuditStatus) {
681 if (completed) {
682 log.debug("deviceInitialAuditCompleted: AUDIT "
683 + "completed for device {}", deviceId);
684 deviceAuditStatus.put(deviceId, true);
685 // Execute all pending group requests
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700686 List<StoredGroupEntry> pendingGroupRequests =
687 getPendingGroupKeyTable().values()
688 .stream()
689 .filter(g-> g.deviceId().equals(deviceId))
690 .collect(Collectors.toList());
691 log.trace("deviceInitialAuditCompleted: processing "
692 + "pending group add requests for device {} and "
693 + "number of pending requests {}",
694 deviceId,
695 pendingGroupRequests.size());
696 for (Group group:pendingGroupRequests) {
alshabib10580802015-02-18 18:30:33 -0800697 GroupDescription tmp = new DefaultGroupDescription(
698 group.deviceId(),
699 group.type(),
700 group.buckets(),
701 group.appCookie(),
702 group.appId());
703 storeGroupDescriptionInternal(tmp);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700704 getPendingGroupKeyTable().
705 remove(new GroupStoreKeyMapKey(deviceId, group.appCookie()));
alshabib10580802015-02-18 18:30:33 -0800706 }
alshabib10580802015-02-18 18:30:33 -0800707 } else {
708 if (deviceAuditStatus.get(deviceId)) {
709 log.debug("deviceInitialAuditCompleted: Clearing AUDIT "
710 + "status for device {}", deviceId);
711 deviceAuditStatus.put(deviceId, false);
712 }
713 }
714 }
715 }
716
717 @Override
718 public boolean deviceInitialAuditStatus(DeviceId deviceId) {
719 synchronized (deviceAuditStatus) {
720 return (deviceAuditStatus.get(deviceId) != null)
721 ? deviceAuditStatus.get(deviceId) : false;
722 }
723 }
724
725 @Override
726 public void groupOperationFailed(DeviceId deviceId, GroupOperation operation) {
727
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700728 StoredGroupEntry existing = getStoredGroupEntry(deviceId,
729 operation.groupId());
alshabib10580802015-02-18 18:30:33 -0800730
731 if (existing == null) {
732 log.warn("No group entry with ID {} found ", operation.groupId());
733 return;
734 }
735
736 switch (operation.opType()) {
737 case ADD:
738 notifyDelegate(new GroupEvent(Type.GROUP_ADD_FAILED, existing));
739 break;
740 case MODIFY:
741 notifyDelegate(new GroupEvent(Type.GROUP_UPDATE_FAILED, existing));
742 break;
743 case DELETE:
744 notifyDelegate(new GroupEvent(Type.GROUP_REMOVE_FAILED, existing));
745 break;
746 default:
747 log.warn("Unknown group operation type {}", operation.opType());
748 }
749
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700750 //Removal from groupid based map will happen in the
751 //map update listener
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700752 getGroupStoreKeyMap().remove(new GroupStoreKeyMapKey(existing.deviceId(),
753 existing.appCookie()));
alshabib10580802015-02-18 18:30:33 -0800754 }
755
756 @Override
757 public void addOrUpdateExtraneousGroupEntry(Group group) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700758 log.trace("addOrUpdateExtraneousGroupEntry: add/update extraneous "
759 + "group entry {} in device {}",
760 group.id(),
761 group.deviceId());
alshabib10580802015-02-18 18:30:33 -0800762 ConcurrentMap<GroupId, Group> extraneousIdTable =
763 getExtraneousGroupIdTable(group.deviceId());
764 extraneousIdTable.put(group.id(), group);
765 // Check the reference counter
766 if (group.referenceCount() == 0) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700767 log.trace("addOrUpdateExtraneousGroupEntry: Flow reference "
768 + "counter is zero and triggering remove",
769 group.id(),
770 group.deviceId());
alshabib10580802015-02-18 18:30:33 -0800771 notifyDelegate(new GroupEvent(Type.GROUP_REMOVE_REQUESTED, group));
772 }
773 }
774
775 @Override
776 public void removeExtraneousGroupEntry(Group group) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700777 log.trace("removeExtraneousGroupEntry: remove extraneous "
778 + "group entry {} of device {} from store",
779 group.id(),
780 group.deviceId());
alshabib10580802015-02-18 18:30:33 -0800781 ConcurrentMap<GroupId, Group> extraneousIdTable =
782 getExtraneousGroupIdTable(group.deviceId());
783 extraneousIdTable.remove(group.id());
784 }
785
786 @Override
787 public Iterable<Group> getExtraneousGroups(DeviceId deviceId) {
788 // flatten and make iterator unmodifiable
789 return FluentIterable.from(
790 getExtraneousGroupIdTable(deviceId).values());
791 }
792
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700793 /**
794 * ClockService that generates wallclock based timestamps.
795 */
796 private class GroupStoreLogicalClockManager<T, U>
797 implements ClockService<T, U> {
alshabib10580802015-02-18 18:30:33 -0800798
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700799 private final AtomicLong sequenceNumber = new AtomicLong(0);
800
801 @Override
802 public Timestamp getTimestamp(T t1, U u1) {
803 return new MultiValuedTimestamp<>(System.currentTimeMillis(),
804 sequenceNumber.getAndIncrement());
805 }
806 }
807
808 /**
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700809 * Map handler to receive any events when the group key map is updated.
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700810 */
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700811 private class GroupStoreKeyMapListener implements
812 EventuallyConsistentMapListener<GroupStoreKeyMapKey, StoredGroupEntry> {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700813
814 @Override
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700815 public void event(EventuallyConsistentMapEvent<GroupStoreKeyMapKey,
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700816 StoredGroupEntry> mapEvent) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700817 GroupEvent groupEvent = null;
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700818 StoredGroupEntry group = mapEvent.value();
819 log.trace("GroupStoreKeyMapListener: received groupid map event {}",
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700820 mapEvent.type());
821 if (mapEvent.type() == EventuallyConsistentMapEvent.Type.PUT) {
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700822 log.trace("GroupStoreKeyMapListener: Received PUT event");
823 // Update the group ID table
824 getGroupIdTable(group.deviceId()).put(group.id(), group);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700825 if (mapEvent.value().state() == Group.GroupState.ADDED) {
826 if (mapEvent.value().isGroupStateAddedFirstTime()) {
827 groupEvent = new GroupEvent(Type.GROUP_ADDED,
828 mapEvent.value());
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700829 log.trace("GroupStoreKeyMapListener: Received first time "
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700830 + "GROUP_ADDED state update");
831 } else {
832 groupEvent = new GroupEvent(Type.GROUP_UPDATED,
833 mapEvent.value());
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700834 log.trace("GroupStoreKeyMapListener: Received following "
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700835 + "GROUP_ADDED state update");
836 }
837 }
838 } else if (mapEvent.type() == EventuallyConsistentMapEvent.Type.REMOVE) {
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700839 log.trace("GroupStoreKeyMapListener: Received REMOVE event");
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700840 groupEvent = new GroupEvent(Type.GROUP_REMOVED, mapEvent.value());
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700841 // Remove the entry from the group ID table
842 getGroupIdTable(group.deviceId()).remove(group.id(), group);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700843 }
844
845 if (groupEvent != null) {
846 notifyDelegate(groupEvent);
847 }
848 }
849 }
850 /**
851 * Message handler to receive messages from group subsystems of
852 * other cluster members.
853 */
854 private final class ClusterGroupMsgHandler
855 implements ClusterMessageHandler {
856 @Override
857 public void handle(ClusterMessage message) {
858 log.trace("ClusterGroupMsgHandler: received remote group message");
859 if (message.subject() ==
860 GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST) {
861 GroupStoreMessage groupOp = kryoBuilder.
862 build().deserialize(message.payload());
863 log.trace("received remote group operation request");
864 if (!(mastershipService.
865 getLocalRole(groupOp.deviceId()) !=
866 MastershipRole.MASTER)) {
867 log.warn("ClusterGroupMsgHandler: This node is not "
868 + "MASTER for device {}", groupOp.deviceId());
869 return;
870 }
871 if (groupOp.type() == GroupStoreMessage.Type.ADD) {
872 log.trace("processing remote group "
873 + "add operation request");
874 storeGroupDescriptionInternal(groupOp.groupDesc());
875 } else if (groupOp.type() == GroupStoreMessage.Type.UPDATE) {
876 log.trace("processing remote group "
877 + "update operation request");
878 updateGroupDescriptionInternal(groupOp.deviceId(),
879 groupOp.appCookie(),
880 groupOp.updateType(),
881 groupOp.updateBuckets(),
882 groupOp.newAppCookie());
883 } else if (groupOp.type() == GroupStoreMessage.Type.DELETE) {
884 log.trace("processing remote group "
885 + "delete operation request");
886 deleteGroupDescriptionInternal(groupOp.deviceId(),
887 groupOp.appCookie());
888 }
889 }
890 }
891 }
892
893 /**
894 * Flattened map key to be used to store group entries.
895 */
896 private class GroupStoreMapKey {
897 private final DeviceId deviceId;
898
899 public GroupStoreMapKey(DeviceId deviceId) {
900 this.deviceId = deviceId;
901 }
902
903 @Override
904 public boolean equals(Object o) {
905 if (this == o) {
906 return true;
907 }
908 if (!(o instanceof GroupStoreMapKey)) {
909 return false;
910 }
911 GroupStoreMapKey that = (GroupStoreMapKey) o;
912 return this.deviceId.equals(that.deviceId);
913 }
914
915 @Override
916 public int hashCode() {
917 int result = 17;
918
919 result = 31 * result + Objects.hash(this.deviceId);
920
921 return result;
922 }
923 }
924
925 private class GroupStoreKeyMapKey extends GroupStoreMapKey {
926 private final GroupKey appCookie;
927 public GroupStoreKeyMapKey(DeviceId deviceId,
928 GroupKey appCookie) {
929 super(deviceId);
930 this.appCookie = appCookie;
931 }
932
933 @Override
934 public boolean equals(Object o) {
935 if (this == o) {
936 return true;
937 }
938 if (!(o instanceof GroupStoreKeyMapKey)) {
939 return false;
940 }
941 GroupStoreKeyMapKey that = (GroupStoreKeyMapKey) o;
942 return (super.equals(that) &&
943 this.appCookie.equals(that.appCookie));
944 }
945
946 @Override
947 public int hashCode() {
948 int result = 17;
949
950 result = 31 * result + super.hashCode() + Objects.hash(this.appCookie);
951
952 return result;
953 }
954 }
955
956 private class GroupStoreIdMapKey extends GroupStoreMapKey {
957 private final GroupId groupId;
958 public GroupStoreIdMapKey(DeviceId deviceId,
959 GroupId groupId) {
960 super(deviceId);
961 this.groupId = groupId;
962 }
963
964 @Override
965 public boolean equals(Object o) {
966 if (this == o) {
967 return true;
968 }
969 if (!(o instanceof GroupStoreIdMapKey)) {
970 return false;
971 }
972 GroupStoreIdMapKey that = (GroupStoreIdMapKey) o;
973 return (super.equals(that) &&
974 this.groupId.equals(that.groupId));
975 }
976
977 @Override
978 public int hashCode() {
979 int result = 17;
980
981 result = 31 * result + super.hashCode() + Objects.hash(this.groupId);
982
983 return result;
984 }
985 }
alshabib10580802015-02-18 18:30:33 -0800986}