blob: 51b4111c0825b1fdd625eb880a7fe5a849b97a90 [file] [log] [blame]
alshabib10580802015-02-18 18:30:33 -08001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.group.impl;
17
Jonathan Hart6ec029a2015-03-24 17:12:35 -070018import com.google.common.collect.FluentIterable;
19import com.google.common.collect.Iterables;
alshabib10580802015-02-18 18:30:33 -080020import org.apache.felix.scr.annotations.Activate;
21import org.apache.felix.scr.annotations.Component;
22import org.apache.felix.scr.annotations.Deactivate;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070023import org.apache.felix.scr.annotations.Reference;
24import org.apache.felix.scr.annotations.ReferenceCardinality;
alshabib10580802015-02-18 18:30:33 -080025import org.apache.felix.scr.annotations.Service;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070026import org.onlab.util.KryoNamespace;
alshabib10580802015-02-18 18:30:33 -080027import org.onlab.util.NewConcurrentHashMap;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070028import org.onosproject.cluster.ClusterService;
29import org.onosproject.core.DefaultApplicationId;
alshabib10580802015-02-18 18:30:33 -080030import org.onosproject.core.DefaultGroupId;
31import org.onosproject.core.GroupId;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070032import org.onosproject.mastership.MastershipService;
alshabib10580802015-02-18 18:30:33 -080033import org.onosproject.net.DeviceId;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070034import org.onosproject.net.MastershipRole;
35import org.onosproject.net.PortNumber;
36import org.onosproject.net.flow.DefaultTrafficTreatment;
37import org.onosproject.net.flow.FlowRule;
38import org.onosproject.net.flow.instructions.Instructions;
39import org.onosproject.net.flow.instructions.L0ModificationInstruction;
40import org.onosproject.net.flow.instructions.L2ModificationInstruction;
41import org.onosproject.net.flow.instructions.L3ModificationInstruction;
alshabib10580802015-02-18 18:30:33 -080042import org.onosproject.net.group.DefaultGroup;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070043import org.onosproject.net.group.DefaultGroupBucket;
alshabib10580802015-02-18 18:30:33 -080044import org.onosproject.net.group.DefaultGroupDescription;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070045import org.onosproject.net.group.DefaultGroupKey;
alshabib10580802015-02-18 18:30:33 -080046import org.onosproject.net.group.Group;
47import org.onosproject.net.group.Group.GroupState;
48import org.onosproject.net.group.GroupBucket;
49import org.onosproject.net.group.GroupBuckets;
50import org.onosproject.net.group.GroupDescription;
51import org.onosproject.net.group.GroupEvent;
52import org.onosproject.net.group.GroupEvent.Type;
53import org.onosproject.net.group.GroupKey;
54import org.onosproject.net.group.GroupOperation;
55import org.onosproject.net.group.GroupStore;
56import org.onosproject.net.group.GroupStoreDelegate;
57import org.onosproject.net.group.StoredGroupEntry;
58import org.onosproject.store.AbstractStore;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070059import org.onosproject.store.Timestamp;
60import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
61import org.onosproject.store.cluster.messaging.ClusterMessage;
62import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070063import org.onosproject.store.impl.MultiValuedTimestamp;
64import org.onosproject.store.serializers.KryoNamespaces;
Jonathan Hart6ec029a2015-03-24 17:12:35 -070065import org.onosproject.store.service.ClockService;
66import org.onosproject.store.service.EventuallyConsistentMap;
67import org.onosproject.store.service.EventuallyConsistentMapBuilder;
68import org.onosproject.store.service.EventuallyConsistentMapEvent;
69import org.onosproject.store.service.EventuallyConsistentMapListener;
70import org.onosproject.store.service.StorageService;
alshabib10580802015-02-18 18:30:33 -080071import org.slf4j.Logger;
72
Jonathan Hart6ec029a2015-03-24 17:12:35 -070073import java.net.URI;
74import java.util.ArrayList;
75import java.util.HashMap;
76import java.util.List;
77import java.util.Objects;
78import java.util.concurrent.ConcurrentHashMap;
79import java.util.concurrent.ConcurrentMap;
80import java.util.concurrent.ExecutorService;
81import java.util.concurrent.Executors;
82import java.util.concurrent.atomic.AtomicInteger;
83import java.util.concurrent.atomic.AtomicLong;
84import java.util.stream.Collectors;
85
86import static org.apache.commons.lang3.concurrent.ConcurrentUtils.createIfAbsentUnchecked;
87import static org.onlab.util.Tools.groupedThreads;
88import static org.slf4j.LoggerFactory.getLogger;
alshabib10580802015-02-18 18:30:33 -080089
90/**
91 * Manages inventory of group entries using trivial in-memory implementation.
92 */
93@Component(immediate = true)
94@Service
95public class DistributedGroupStore
96 extends AbstractStore<GroupEvent, GroupStoreDelegate>
97 implements GroupStore {
98
99 private final Logger log = getLogger(getClass());
100
101 private final int dummyId = 0xffffffff;
102 private final GroupId dummyGroupId = new DefaultGroupId(dummyId);
103
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700104 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
105 protected ClusterCommunicationService clusterCommunicator;
106
107 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
108 protected ClusterService clusterService;
109
110 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700111 protected StorageService storageService;
112
113 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700114 protected MastershipService mastershipService;
115
116 // Per device group table with (device id + app cookie) as key
117 private EventuallyConsistentMap<GroupStoreKeyMapKey,
118 StoredGroupEntry> groupStoreEntriesByKey = null;
119 // Per device group table with (device id + group id) as key
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700120 private final ConcurrentMap<DeviceId, ConcurrentMap<GroupId, StoredGroupEntry>>
121 groupEntriesById = new ConcurrentHashMap<>();
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700122 private EventuallyConsistentMap<GroupStoreKeyMapKey,
123 StoredGroupEntry> auditPendingReqQueue = null;
alshabib10580802015-02-18 18:30:33 -0800124 private final ConcurrentMap<DeviceId, ConcurrentMap<GroupId, Group>>
125 extraneousGroupEntriesById = new ConcurrentHashMap<>();
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700126 private ExecutorService messageHandlingExecutor;
127 private static final int MESSAGE_HANDLER_THREAD_POOL_SIZE = 1;
alshabib10580802015-02-18 18:30:33 -0800128
129 private final HashMap<DeviceId, Boolean> deviceAuditStatus =
130 new HashMap<DeviceId, Boolean>();
131
132 private final AtomicInteger groupIdGen = new AtomicInteger();
133
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700134 private KryoNamespace.Builder kryoBuilder = null;
135
alshabib10580802015-02-18 18:30:33 -0800136 @Activate
137 public void activate() {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700138 kryoBuilder = new KryoNamespace.Builder()
139 .register(DefaultGroup.class,
140 DefaultGroupBucket.class,
141 DefaultGroupDescription.class,
142 DefaultGroupKey.class,
143 GroupDescription.Type.class,
144 Group.GroupState.class,
145 GroupBuckets.class,
146 DefaultGroupId.class,
147 GroupStoreMessage.class,
148 GroupStoreMessage.Type.class,
149 UpdateType.class,
150 GroupStoreMessageSubjects.class,
151 MultiValuedTimestamp.class,
152 GroupStoreKeyMapKey.class,
153 GroupStoreIdMapKey.class,
154 GroupStoreMapKey.class
155 )
156 .register(URI.class)
157 .register(DeviceId.class)
158 .register(PortNumber.class)
159 .register(DefaultApplicationId.class)
160 .register(DefaultTrafficTreatment.class,
161 Instructions.DropInstruction.class,
162 Instructions.OutputInstruction.class,
163 Instructions.GroupInstruction.class,
164 Instructions.TableTypeTransition.class,
165 FlowRule.Type.class,
166 L0ModificationInstruction.class,
167 L0ModificationInstruction.L0SubType.class,
168 L0ModificationInstruction.ModLambdaInstruction.class,
169 L2ModificationInstruction.class,
170 L2ModificationInstruction.L2SubType.class,
171 L2ModificationInstruction.ModEtherInstruction.class,
172 L2ModificationInstruction.PushHeaderInstructions.class,
173 L2ModificationInstruction.ModVlanIdInstruction.class,
174 L2ModificationInstruction.ModVlanPcpInstruction.class,
175 L2ModificationInstruction.ModMplsLabelInstruction.class,
176 L2ModificationInstruction.ModMplsTtlInstruction.class,
177 L3ModificationInstruction.class,
178 L3ModificationInstruction.L3SubType.class,
179 L3ModificationInstruction.ModIPInstruction.class,
180 L3ModificationInstruction.ModIPv6FlowLabelInstruction.class,
181 L3ModificationInstruction.ModTtlInstruction.class,
182 org.onlab.packet.MplsLabel.class
183 )
184 .register(org.onosproject.cluster.NodeId.class)
185 .register(KryoNamespaces.BASIC)
186 .register(KryoNamespaces.MISC);
187
188 messageHandlingExecutor = Executors.
189 newFixedThreadPool(MESSAGE_HANDLER_THREAD_POOL_SIZE,
190 groupedThreads("onos/store/group",
191 "message-handlers"));
192 clusterCommunicator.
193 addSubscriber(GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST,
194 new ClusterGroupMsgHandler(),
195 messageHandlingExecutor);
196
197 log.debug("Creating EC map groupstorekeymap");
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700198 EventuallyConsistentMapBuilder<GroupStoreKeyMapKey, StoredGroupEntry>
199 keyMapBuilder = storageService.eventuallyConsistentMapBuilder();
200
201 groupStoreEntriesByKey = keyMapBuilder
202 .withName("groupstorekeymap")
203 .withSerializer(kryoBuilder)
204 .withClockService(new GroupStoreLogicalClockManager<>())
205 .build();
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700206 groupStoreEntriesByKey.addListener(new GroupStoreKeyMapListener());
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700207 log.trace("Current size {}", groupStoreEntriesByKey.size());
208
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700209 log.debug("Creating EC map pendinggroupkeymap");
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700210 EventuallyConsistentMapBuilder<GroupStoreKeyMapKey, StoredGroupEntry>
211 auditMapBuilder = storageService.eventuallyConsistentMapBuilder();
212
213 auditPendingReqQueue = auditMapBuilder
214 .withName("pendinggroupkeymap")
215 .withSerializer(kryoBuilder)
216 .withClockService(new GroupStoreLogicalClockManager<>())
217 .build();
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700218 log.trace("Current size {}", auditPendingReqQueue.size());
219
alshabib10580802015-02-18 18:30:33 -0800220 log.info("Started");
221 }
222
223 @Deactivate
224 public void deactivate() {
Thomas Vachuska152f9fd2015-04-02 16:28:13 -0700225 groupStoreEntriesByKey.destroy();
Thomas Vachuska152f9fd2015-04-02 16:28:13 -0700226 auditPendingReqQueue.destroy();
alshabib10580802015-02-18 18:30:33 -0800227 log.info("Stopped");
228 }
229
alshabib10580802015-02-18 18:30:33 -0800230 private static NewConcurrentHashMap<GroupId, Group>
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700231 lazyEmptyExtraneousGroupIdTable() {
alshabib10580802015-02-18 18:30:33 -0800232 return NewConcurrentHashMap.<GroupId, Group>ifNeeded();
233 }
234
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700235 private static NewConcurrentHashMap<GroupId, StoredGroupEntry>
236 lazyEmptyGroupIdTable() {
237 return NewConcurrentHashMap.<GroupId, StoredGroupEntry>ifNeeded();
238 }
239
alshabib10580802015-02-18 18:30:33 -0800240 /**
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700241 * Returns the group store eventual consistent key map.
alshabib10580802015-02-18 18:30:33 -0800242 *
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700243 * @return Map representing group key table.
alshabib10580802015-02-18 18:30:33 -0800244 */
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700245 private EventuallyConsistentMap<GroupStoreKeyMapKey, StoredGroupEntry>
246 getGroupStoreKeyMap() {
247 return groupStoreEntriesByKey;
alshabib10580802015-02-18 18:30:33 -0800248 }
249
250 /**
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700251 * Returns the group id table for specified device.
alshabib10580802015-02-18 18:30:33 -0800252 *
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700253 * @param deviceId identifier of the device
254 * @return Map representing group key table of given device.
alshabib10580802015-02-18 18:30:33 -0800255 */
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700256 private ConcurrentMap<GroupId, StoredGroupEntry> getGroupIdTable(DeviceId deviceId) {
257 return createIfAbsentUnchecked(groupEntriesById,
258 deviceId, lazyEmptyGroupIdTable());
alshabib10580802015-02-18 18:30:33 -0800259 }
260
261 /**
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700262 * Returns the pending group request table.
alshabib10580802015-02-18 18:30:33 -0800263 *
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700264 * @return Map representing group key table.
alshabib10580802015-02-18 18:30:33 -0800265 */
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700266 private EventuallyConsistentMap<GroupStoreKeyMapKey, StoredGroupEntry>
267 getPendingGroupKeyTable() {
268 return auditPendingReqQueue;
alshabib10580802015-02-18 18:30:33 -0800269 }
270
271 /**
272 * Returns the extraneous group id table for specified device.
273 *
274 * @param deviceId identifier of the device
275 * @return Map representing group key table of given device.
276 */
277 private ConcurrentMap<GroupId, Group>
278 getExtraneousGroupIdTable(DeviceId deviceId) {
279 return createIfAbsentUnchecked(extraneousGroupEntriesById,
280 deviceId,
281 lazyEmptyExtraneousGroupIdTable());
282 }
283
284 /**
285 * Returns the number of groups for the specified device in the store.
286 *
287 * @return number of groups for the specified device
288 */
289 @Override
290 public int getGroupCount(DeviceId deviceId) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700291 return (getGroups(deviceId) != null) ?
292 Iterables.size(getGroups(deviceId)) : 0;
alshabib10580802015-02-18 18:30:33 -0800293 }
294
295 /**
296 * Returns the groups associated with a device.
297 *
298 * @param deviceId the device ID
299 *
300 * @return the group entries
301 */
302 @Override
303 public Iterable<Group> getGroups(DeviceId deviceId) {
304 // flatten and make iterator unmodifiable
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700305 log.trace("getGroups: for device {} total number of groups {}",
306 deviceId, getGroupStoreKeyMap().values().size());
307 return FluentIterable.from(getGroupStoreKeyMap().values())
308 .filter(input -> input.deviceId().equals(deviceId))
309 .transform(input -> input);
alshabib10580802015-02-18 18:30:33 -0800310 }
311
312 /**
313 * Returns the stored group entry.
314 *
315 * @param deviceId the device ID
316 * @param appCookie the group key
317 *
318 * @return a group associated with the key
319 */
320 @Override
321 public Group getGroup(DeviceId deviceId, GroupKey appCookie) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700322 return getStoredGroupEntry(deviceId, appCookie);
323 }
324
325 private StoredGroupEntry getStoredGroupEntry(DeviceId deviceId,
326 GroupKey appCookie) {
327 return getGroupStoreKeyMap().get(new GroupStoreKeyMapKey(deviceId,
328 appCookie));
329 }
330
331 @Override
332 public Group getGroup(DeviceId deviceId, GroupId groupId) {
333 return getStoredGroupEntry(deviceId, groupId);
334 }
335
336 private StoredGroupEntry getStoredGroupEntry(DeviceId deviceId,
337 GroupId groupId) {
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700338 return getGroupIdTable(deviceId).get(groupId);
alshabib10580802015-02-18 18:30:33 -0800339 }
340
341 private int getFreeGroupIdValue(DeviceId deviceId) {
342 int freeId = groupIdGen.incrementAndGet();
343
344 while (true) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700345 Group existing = getGroup(deviceId, new DefaultGroupId(freeId));
alshabib10580802015-02-18 18:30:33 -0800346 if (existing == null) {
347 existing = (
348 extraneousGroupEntriesById.get(deviceId) != null) ?
349 extraneousGroupEntriesById.get(deviceId).
350 get(new DefaultGroupId(freeId)) :
351 null;
352 }
353 if (existing != null) {
354 freeId = groupIdGen.incrementAndGet();
355 } else {
356 break;
357 }
358 }
359 return freeId;
360 }
361
362 /**
363 * Stores a new group entry using the information from group description.
364 *
365 * @param groupDesc group description to be used to create group entry
366 */
367 @Override
368 public void storeGroupDescription(GroupDescription groupDesc) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700369 log.trace("In storeGroupDescription");
alshabib10580802015-02-18 18:30:33 -0800370 // Check if a group is existing with the same key
371 if (getGroup(groupDesc.deviceId(), groupDesc.appCookie()) != null) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700372 log.warn("Group already exists with the same key {}",
373 groupDesc.appCookie());
alshabib10580802015-02-18 18:30:33 -0800374 return;
375 }
376
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700377 // Check if group to be created by a remote instance
378 if (mastershipService.getLocalRole(
379 groupDesc.deviceId()) != MastershipRole.MASTER) {
380 log.debug("Device {} local role is not MASTER",
381 groupDesc.deviceId());
382 GroupStoreMessage groupOp = GroupStoreMessage.
383 createGroupAddRequestMsg(groupDesc.deviceId(),
384 groupDesc);
385 ClusterMessage message = new ClusterMessage(
386 clusterService.getLocalNode().id(),
387 GroupStoreMessageSubjects.
388 REMOTE_GROUP_OP_REQUEST,
389 kryoBuilder.build().serialize(groupOp));
390 if (!clusterCommunicator.unicast(message,
391 mastershipService.
392 getMasterFor(
393 groupDesc.deviceId()))) {
394 log.warn("Failed to send request to master: {} to {}",
395 message,
396 mastershipService.getMasterFor(groupDesc.deviceId()));
397 //TODO: Send Group operation failure event
398 }
399 log.debug("Sent Group operation request for device {} "
400 + "to remote MASTER {}",
401 groupDesc.deviceId(),
402 mastershipService.getMasterFor(groupDesc.deviceId()));
alshabib10580802015-02-18 18:30:33 -0800403 return;
404 }
405
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700406 log.debug("Store group for device {} is getting handled locally",
407 groupDesc.deviceId());
alshabib10580802015-02-18 18:30:33 -0800408 storeGroupDescriptionInternal(groupDesc);
409 }
410
411 private void storeGroupDescriptionInternal(GroupDescription groupDesc) {
412 // Check if a group is existing with the same key
413 if (getGroup(groupDesc.deviceId(), groupDesc.appCookie()) != null) {
414 return;
415 }
416
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700417 if (deviceAuditStatus.get(groupDesc.deviceId()) == null) {
418 // Device group audit has not completed yet
419 // Add this group description to pending group key table
420 // Create a group entry object with Dummy Group ID
421 log.debug("storeGroupDescriptionInternal: Device {} AUDIT "
422 + "pending...Queuing Group ADD request",
423 groupDesc.deviceId());
424 StoredGroupEntry group = new DefaultGroup(dummyGroupId, groupDesc);
425 group.setState(GroupState.WAITING_AUDIT_COMPLETE);
426 EventuallyConsistentMap<GroupStoreKeyMapKey, StoredGroupEntry> pendingKeyTable =
427 getPendingGroupKeyTable();
428 pendingKeyTable.put(new GroupStoreKeyMapKey(groupDesc.deviceId(),
429 groupDesc.appCookie()),
430 group);
431 return;
432 }
433
alshabib10580802015-02-18 18:30:33 -0800434 // Get a new group identifier
435 GroupId id = new DefaultGroupId(getFreeGroupIdValue(groupDesc.deviceId()));
436 // Create a group entry object
437 StoredGroupEntry group = new DefaultGroup(id, groupDesc);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700438 // Insert the newly created group entry into key and id maps
439 getGroupStoreKeyMap().
440 put(new GroupStoreKeyMapKey(groupDesc.deviceId(),
441 groupDesc.appCookie()), group);
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700442 // Ensure it also inserted into group id based table to
443 // avoid any chances of duplication in group id generation
444 getGroupIdTable(groupDesc.deviceId()).
445 put(id, group);
alshabib10580802015-02-18 18:30:33 -0800446 notifyDelegate(new GroupEvent(GroupEvent.Type.GROUP_ADD_REQUESTED,
447 group));
448 }
449
450 /**
451 * Updates the existing group entry with the information
452 * from group description.
453 *
454 * @param deviceId the device ID
455 * @param oldAppCookie the current group key
456 * @param type update type
457 * @param newBuckets group buckets for updates
458 * @param newAppCookie optional new group key
459 */
460 @Override
461 public void updateGroupDescription(DeviceId deviceId,
462 GroupKey oldAppCookie,
463 UpdateType type,
464 GroupBuckets newBuckets,
465 GroupKey newAppCookie) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700466 // Check if group update to be done by a remote instance
467 if (mastershipService.
468 getLocalRole(deviceId) != MastershipRole.MASTER) {
469 GroupStoreMessage groupOp = GroupStoreMessage.
470 createGroupUpdateRequestMsg(deviceId,
471 oldAppCookie,
472 type,
473 newBuckets,
474 newAppCookie);
475 ClusterMessage message =
476 new ClusterMessage(clusterService.getLocalNode().id(),
477 GroupStoreMessageSubjects.
478 REMOTE_GROUP_OP_REQUEST,
479 kryoBuilder.build().serialize(groupOp));
480 if (!clusterCommunicator.unicast(message,
481 mastershipService.
482 getMasterFor(deviceId))) {
483 log.warn("Failed to send request to master: {} to {}",
484 message,
485 mastershipService.getMasterFor(deviceId));
486 //TODO: Send Group operation failure event
487 }
488 return;
489 }
490 updateGroupDescriptionInternal(deviceId,
491 oldAppCookie,
492 type,
493 newBuckets,
494 newAppCookie);
495 }
496
497 private void updateGroupDescriptionInternal(DeviceId deviceId,
498 GroupKey oldAppCookie,
499 UpdateType type,
500 GroupBuckets newBuckets,
501 GroupKey newAppCookie) {
alshabib10580802015-02-18 18:30:33 -0800502 // Check if a group is existing with the provided key
503 Group oldGroup = getGroup(deviceId, oldAppCookie);
504 if (oldGroup == null) {
505 return;
506 }
507
508 List<GroupBucket> newBucketList = getUpdatedBucketList(oldGroup,
509 type,
510 newBuckets);
511 if (newBucketList != null) {
512 // Create a new group object from the old group
513 GroupBuckets updatedBuckets = new GroupBuckets(newBucketList);
514 GroupKey newCookie = (newAppCookie != null) ? newAppCookie : oldAppCookie;
515 GroupDescription updatedGroupDesc = new DefaultGroupDescription(
516 oldGroup.deviceId(),
517 oldGroup.type(),
518 updatedBuckets,
519 newCookie,
520 oldGroup.appId());
521 StoredGroupEntry newGroup = new DefaultGroup(oldGroup.id(),
522 updatedGroupDesc);
523 newGroup.setState(GroupState.PENDING_UPDATE);
524 newGroup.setLife(oldGroup.life());
525 newGroup.setPackets(oldGroup.packets());
526 newGroup.setBytes(oldGroup.bytes());
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700527 //Update the group entry in groupkey based map.
528 //Update to groupid based map will happen in the
529 //groupkey based map update listener
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700530 getGroupStoreKeyMap().
531 put(new GroupStoreKeyMapKey(newGroup.deviceId(),
532 newGroup.appCookie()), newGroup);
alshabib10580802015-02-18 18:30:33 -0800533 notifyDelegate(new GroupEvent(Type.GROUP_UPDATE_REQUESTED, newGroup));
534 }
535 }
536
537 private List<GroupBucket> getUpdatedBucketList(Group oldGroup,
538 UpdateType type,
539 GroupBuckets buckets) {
540 GroupBuckets oldBuckets = oldGroup.buckets();
541 List<GroupBucket> newBucketList = new ArrayList<GroupBucket>(
542 oldBuckets.buckets());
543 boolean groupDescUpdated = false;
544
545 if (type == UpdateType.ADD) {
546 // Check if the any of the new buckets are part of
547 // the old bucket list
548 for (GroupBucket addBucket:buckets.buckets()) {
549 if (!newBucketList.contains(addBucket)) {
550 newBucketList.add(addBucket);
551 groupDescUpdated = true;
552 }
553 }
554 } else if (type == UpdateType.REMOVE) {
555 // Check if the to be removed buckets are part of the
556 // old bucket list
557 for (GroupBucket removeBucket:buckets.buckets()) {
558 if (newBucketList.contains(removeBucket)) {
559 newBucketList.remove(removeBucket);
560 groupDescUpdated = true;
561 }
562 }
563 }
564
565 if (groupDescUpdated) {
566 return newBucketList;
567 } else {
568 return null;
569 }
570 }
571
572 /**
573 * Triggers deleting the existing group entry.
574 *
575 * @param deviceId the device ID
576 * @param appCookie the group key
577 */
578 @Override
579 public void deleteGroupDescription(DeviceId deviceId,
580 GroupKey appCookie) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700581 // Check if group to be deleted by a remote instance
582 if (mastershipService.
583 getLocalRole(deviceId) != MastershipRole.MASTER) {
584 GroupStoreMessage groupOp = GroupStoreMessage.
585 createGroupDeleteRequestMsg(deviceId,
586 appCookie);
587 ClusterMessage message =
588 new ClusterMessage(clusterService.getLocalNode().id(),
589 GroupStoreMessageSubjects.
590 REMOTE_GROUP_OP_REQUEST,
591 kryoBuilder.build().serialize(groupOp));
592 if (!clusterCommunicator.unicast(message,
593 mastershipService.
594 getMasterFor(deviceId))) {
595 log.warn("Failed to send request to master: {} to {}",
596 message,
597 mastershipService.getMasterFor(deviceId));
598 //TODO: Send Group operation failure event
599 }
600 return;
601 }
602 deleteGroupDescriptionInternal(deviceId, appCookie);
603 }
604
605 private void deleteGroupDescriptionInternal(DeviceId deviceId,
606 GroupKey appCookie) {
alshabib10580802015-02-18 18:30:33 -0800607 // Check if a group is existing with the provided key
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700608 StoredGroupEntry existing = getStoredGroupEntry(deviceId, appCookie);
alshabib10580802015-02-18 18:30:33 -0800609 if (existing == null) {
610 return;
611 }
612
613 synchronized (existing) {
614 existing.setState(GroupState.PENDING_DELETE);
615 }
616 notifyDelegate(new GroupEvent(Type.GROUP_REMOVE_REQUESTED, existing));
617 }
618
619 /**
620 * Stores a new group entry, or updates an existing entry.
621 *
622 * @param group group entry
623 */
624 @Override
625 public void addOrUpdateGroupEntry(Group group) {
626 // check if this new entry is an update to an existing entry
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700627 StoredGroupEntry existing = getStoredGroupEntry(group.deviceId(),
628 group.id());
alshabib10580802015-02-18 18:30:33 -0800629 GroupEvent event = null;
630
631 if (existing != null) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700632 log.trace("addOrUpdateGroupEntry: updating group "
633 + "entry {} in device {}",
634 group.id(),
635 group.deviceId());
alshabib10580802015-02-18 18:30:33 -0800636 synchronized (existing) {
637 existing.setLife(group.life());
638 existing.setPackets(group.packets());
639 existing.setBytes(group.bytes());
640 if (existing.state() == GroupState.PENDING_ADD) {
641 existing.setState(GroupState.ADDED);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700642 existing.setIsGroupStateAddedFirstTime(true);
alshabib10580802015-02-18 18:30:33 -0800643 event = new GroupEvent(Type.GROUP_ADDED, existing);
644 } else {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700645 existing.setState(GroupState.ADDED);
646 existing.setIsGroupStateAddedFirstTime(false);
alshabib10580802015-02-18 18:30:33 -0800647 event = new GroupEvent(Type.GROUP_UPDATED, existing);
648 }
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700649 //Re-PUT map entries to trigger map update events
650 getGroupStoreKeyMap().
651 put(new GroupStoreKeyMapKey(existing.deviceId(),
652 existing.appCookie()), existing);
alshabib10580802015-02-18 18:30:33 -0800653 }
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700654 } else {
655 log.warn("addOrUpdateGroupEntry: Group update "
656 + "happening for a non-existing entry in the map");
alshabib10580802015-02-18 18:30:33 -0800657 }
658
659 if (event != null) {
660 notifyDelegate(event);
661 }
662 }
663
664 /**
665 * Removes the group entry from store.
666 *
667 * @param group group entry
668 */
669 @Override
670 public void removeGroupEntry(Group group) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700671 StoredGroupEntry existing = getStoredGroupEntry(group.deviceId(),
672 group.id());
alshabib10580802015-02-18 18:30:33 -0800673
674 if (existing != null) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700675 log.trace("removeGroupEntry: removing group "
676 + "entry {} in device {}",
677 group.id(),
678 group.deviceId());
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700679 //Removal from groupid based map will happen in the
680 //map update listener
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700681 getGroupStoreKeyMap().remove(new GroupStoreKeyMapKey(existing.deviceId(),
682 existing.appCookie()));
alshabib10580802015-02-18 18:30:33 -0800683 notifyDelegate(new GroupEvent(Type.GROUP_REMOVED, existing));
684 }
685 }
686
687 @Override
688 public void deviceInitialAuditCompleted(DeviceId deviceId,
689 boolean completed) {
690 synchronized (deviceAuditStatus) {
691 if (completed) {
692 log.debug("deviceInitialAuditCompleted: AUDIT "
693 + "completed for device {}", deviceId);
694 deviceAuditStatus.put(deviceId, true);
695 // Execute all pending group requests
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700696 List<StoredGroupEntry> pendingGroupRequests =
697 getPendingGroupKeyTable().values()
698 .stream()
699 .filter(g-> g.deviceId().equals(deviceId))
700 .collect(Collectors.toList());
701 log.trace("deviceInitialAuditCompleted: processing "
702 + "pending group add requests for device {} and "
703 + "number of pending requests {}",
704 deviceId,
705 pendingGroupRequests.size());
706 for (Group group:pendingGroupRequests) {
alshabib10580802015-02-18 18:30:33 -0800707 GroupDescription tmp = new DefaultGroupDescription(
708 group.deviceId(),
709 group.type(),
710 group.buckets(),
711 group.appCookie(),
712 group.appId());
713 storeGroupDescriptionInternal(tmp);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700714 getPendingGroupKeyTable().
715 remove(new GroupStoreKeyMapKey(deviceId, group.appCookie()));
alshabib10580802015-02-18 18:30:33 -0800716 }
alshabib10580802015-02-18 18:30:33 -0800717 } else {
718 if (deviceAuditStatus.get(deviceId)) {
719 log.debug("deviceInitialAuditCompleted: Clearing AUDIT "
720 + "status for device {}", deviceId);
721 deviceAuditStatus.put(deviceId, false);
722 }
723 }
724 }
725 }
726
727 @Override
728 public boolean deviceInitialAuditStatus(DeviceId deviceId) {
729 synchronized (deviceAuditStatus) {
730 return (deviceAuditStatus.get(deviceId) != null)
731 ? deviceAuditStatus.get(deviceId) : false;
732 }
733 }
734
735 @Override
736 public void groupOperationFailed(DeviceId deviceId, GroupOperation operation) {
737
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700738 StoredGroupEntry existing = getStoredGroupEntry(deviceId,
739 operation.groupId());
alshabib10580802015-02-18 18:30:33 -0800740
741 if (existing == null) {
742 log.warn("No group entry with ID {} found ", operation.groupId());
743 return;
744 }
745
746 switch (operation.opType()) {
747 case ADD:
748 notifyDelegate(new GroupEvent(Type.GROUP_ADD_FAILED, existing));
749 break;
750 case MODIFY:
751 notifyDelegate(new GroupEvent(Type.GROUP_UPDATE_FAILED, existing));
752 break;
753 case DELETE:
754 notifyDelegate(new GroupEvent(Type.GROUP_REMOVE_FAILED, existing));
755 break;
756 default:
757 log.warn("Unknown group operation type {}", operation.opType());
758 }
759
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700760 //Removal from groupid based map will happen in the
761 //map update listener
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700762 getGroupStoreKeyMap().remove(new GroupStoreKeyMapKey(existing.deviceId(),
763 existing.appCookie()));
alshabib10580802015-02-18 18:30:33 -0800764 }
765
766 @Override
767 public void addOrUpdateExtraneousGroupEntry(Group group) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700768 log.trace("addOrUpdateExtraneousGroupEntry: add/update extraneous "
769 + "group entry {} in device {}",
770 group.id(),
771 group.deviceId());
alshabib10580802015-02-18 18:30:33 -0800772 ConcurrentMap<GroupId, Group> extraneousIdTable =
773 getExtraneousGroupIdTable(group.deviceId());
774 extraneousIdTable.put(group.id(), group);
775 // Check the reference counter
776 if (group.referenceCount() == 0) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700777 log.trace("addOrUpdateExtraneousGroupEntry: Flow reference "
778 + "counter is zero and triggering remove",
779 group.id(),
780 group.deviceId());
alshabib10580802015-02-18 18:30:33 -0800781 notifyDelegate(new GroupEvent(Type.GROUP_REMOVE_REQUESTED, group));
782 }
783 }
784
785 @Override
786 public void removeExtraneousGroupEntry(Group group) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700787 log.trace("removeExtraneousGroupEntry: remove extraneous "
788 + "group entry {} of device {} from store",
789 group.id(),
790 group.deviceId());
alshabib10580802015-02-18 18:30:33 -0800791 ConcurrentMap<GroupId, Group> extraneousIdTable =
792 getExtraneousGroupIdTable(group.deviceId());
793 extraneousIdTable.remove(group.id());
794 }
795
796 @Override
797 public Iterable<Group> getExtraneousGroups(DeviceId deviceId) {
798 // flatten and make iterator unmodifiable
799 return FluentIterable.from(
800 getExtraneousGroupIdTable(deviceId).values());
801 }
802
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700803 /**
804 * ClockService that generates wallclock based timestamps.
805 */
806 private class GroupStoreLogicalClockManager<T, U>
807 implements ClockService<T, U> {
alshabib10580802015-02-18 18:30:33 -0800808
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700809 private final AtomicLong sequenceNumber = new AtomicLong(0);
810
811 @Override
812 public Timestamp getTimestamp(T t1, U u1) {
813 return new MultiValuedTimestamp<>(System.currentTimeMillis(),
814 sequenceNumber.getAndIncrement());
815 }
816 }
817
818 /**
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700819 * Map handler to receive any events when the group key map is updated.
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700820 */
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700821 private class GroupStoreKeyMapListener implements
822 EventuallyConsistentMapListener<GroupStoreKeyMapKey, StoredGroupEntry> {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700823
824 @Override
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700825 public void event(EventuallyConsistentMapEvent<GroupStoreKeyMapKey,
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700826 StoredGroupEntry> mapEvent) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700827 GroupEvent groupEvent = null;
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700828 StoredGroupEntry group = mapEvent.value();
829 log.trace("GroupStoreKeyMapListener: received groupid map event {}",
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700830 mapEvent.type());
831 if (mapEvent.type() == EventuallyConsistentMapEvent.Type.PUT) {
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700832 log.trace("GroupStoreKeyMapListener: Received PUT event");
833 // Update the group ID table
834 getGroupIdTable(group.deviceId()).put(group.id(), group);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700835 if (mapEvent.value().state() == Group.GroupState.ADDED) {
836 if (mapEvent.value().isGroupStateAddedFirstTime()) {
837 groupEvent = new GroupEvent(Type.GROUP_ADDED,
838 mapEvent.value());
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700839 log.trace("GroupStoreKeyMapListener: Received first time "
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700840 + "GROUP_ADDED state update");
841 } else {
842 groupEvent = new GroupEvent(Type.GROUP_UPDATED,
843 mapEvent.value());
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700844 log.trace("GroupStoreKeyMapListener: Received following "
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700845 + "GROUP_ADDED state update");
846 }
847 }
848 } else if (mapEvent.type() == EventuallyConsistentMapEvent.Type.REMOVE) {
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700849 log.trace("GroupStoreKeyMapListener: Received REMOVE event");
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700850 groupEvent = new GroupEvent(Type.GROUP_REMOVED, mapEvent.value());
Srikanth Vavilapalli6a9d4e42015-03-30 19:41:56 -0700851 // Remove the entry from the group ID table
852 getGroupIdTable(group.deviceId()).remove(group.id(), group);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700853 }
854
855 if (groupEvent != null) {
856 notifyDelegate(groupEvent);
857 }
858 }
859 }
860 /**
861 * Message handler to receive messages from group subsystems of
862 * other cluster members.
863 */
864 private final class ClusterGroupMsgHandler
865 implements ClusterMessageHandler {
866 @Override
867 public void handle(ClusterMessage message) {
868 log.trace("ClusterGroupMsgHandler: received remote group message");
869 if (message.subject() ==
870 GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST) {
871 GroupStoreMessage groupOp = kryoBuilder.
872 build().deserialize(message.payload());
873 log.trace("received remote group operation request");
874 if (!(mastershipService.
875 getLocalRole(groupOp.deviceId()) !=
876 MastershipRole.MASTER)) {
877 log.warn("ClusterGroupMsgHandler: This node is not "
878 + "MASTER for device {}", groupOp.deviceId());
879 return;
880 }
881 if (groupOp.type() == GroupStoreMessage.Type.ADD) {
882 log.trace("processing remote group "
883 + "add operation request");
884 storeGroupDescriptionInternal(groupOp.groupDesc());
885 } else if (groupOp.type() == GroupStoreMessage.Type.UPDATE) {
886 log.trace("processing remote group "
887 + "update operation request");
888 updateGroupDescriptionInternal(groupOp.deviceId(),
889 groupOp.appCookie(),
890 groupOp.updateType(),
891 groupOp.updateBuckets(),
892 groupOp.newAppCookie());
893 } else if (groupOp.type() == GroupStoreMessage.Type.DELETE) {
894 log.trace("processing remote group "
895 + "delete operation request");
896 deleteGroupDescriptionInternal(groupOp.deviceId(),
897 groupOp.appCookie());
898 }
899 }
900 }
901 }
902
903 /**
904 * Flattened map key to be used to store group entries.
905 */
906 private class GroupStoreMapKey {
907 private final DeviceId deviceId;
908
909 public GroupStoreMapKey(DeviceId deviceId) {
910 this.deviceId = deviceId;
911 }
912
913 @Override
914 public boolean equals(Object o) {
915 if (this == o) {
916 return true;
917 }
918 if (!(o instanceof GroupStoreMapKey)) {
919 return false;
920 }
921 GroupStoreMapKey that = (GroupStoreMapKey) o;
922 return this.deviceId.equals(that.deviceId);
923 }
924
925 @Override
926 public int hashCode() {
927 int result = 17;
928
929 result = 31 * result + Objects.hash(this.deviceId);
930
931 return result;
932 }
933 }
934
935 private class GroupStoreKeyMapKey extends GroupStoreMapKey {
936 private final GroupKey appCookie;
937 public GroupStoreKeyMapKey(DeviceId deviceId,
938 GroupKey appCookie) {
939 super(deviceId);
940 this.appCookie = appCookie;
941 }
942
943 @Override
944 public boolean equals(Object o) {
945 if (this == o) {
946 return true;
947 }
948 if (!(o instanceof GroupStoreKeyMapKey)) {
949 return false;
950 }
951 GroupStoreKeyMapKey that = (GroupStoreKeyMapKey) o;
952 return (super.equals(that) &&
953 this.appCookie.equals(that.appCookie));
954 }
955
956 @Override
957 public int hashCode() {
958 int result = 17;
959
960 result = 31 * result + super.hashCode() + Objects.hash(this.appCookie);
961
962 return result;
963 }
964 }
965
966 private class GroupStoreIdMapKey extends GroupStoreMapKey {
967 private final GroupId groupId;
968 public GroupStoreIdMapKey(DeviceId deviceId,
969 GroupId groupId) {
970 super(deviceId);
971 this.groupId = groupId;
972 }
973
974 @Override
975 public boolean equals(Object o) {
976 if (this == o) {
977 return true;
978 }
979 if (!(o instanceof GroupStoreIdMapKey)) {
980 return false;
981 }
982 GroupStoreIdMapKey that = (GroupStoreIdMapKey) o;
983 return (super.equals(that) &&
984 this.groupId.equals(that.groupId));
985 }
986
987 @Override
988 public int hashCode() {
989 int result = 17;
990
991 result = 31 * result + super.hashCode() + Objects.hash(this.groupId);
992
993 return result;
994 }
995 }
alshabib10580802015-02-18 18:30:33 -0800996}