blob: d6d92a9fdbb3ff01bb019c759b9812c1a943fb2f [file] [log] [blame]
alshabib10580802015-02-18 18:30:33 -08001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.group.impl;
17
Jonathan Hart6ec029a2015-03-24 17:12:35 -070018import com.google.common.collect.FluentIterable;
19import com.google.common.collect.Iterables;
alshabib10580802015-02-18 18:30:33 -080020import org.apache.felix.scr.annotations.Activate;
21import org.apache.felix.scr.annotations.Component;
22import org.apache.felix.scr.annotations.Deactivate;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070023import org.apache.felix.scr.annotations.Reference;
24import org.apache.felix.scr.annotations.ReferenceCardinality;
alshabib10580802015-02-18 18:30:33 -080025import org.apache.felix.scr.annotations.Service;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070026import org.onlab.util.KryoNamespace;
alshabib10580802015-02-18 18:30:33 -080027import org.onlab.util.NewConcurrentHashMap;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070028import org.onosproject.cluster.ClusterService;
29import org.onosproject.core.DefaultApplicationId;
alshabib10580802015-02-18 18:30:33 -080030import org.onosproject.core.DefaultGroupId;
31import org.onosproject.core.GroupId;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070032import org.onosproject.mastership.MastershipService;
alshabib10580802015-02-18 18:30:33 -080033import org.onosproject.net.DeviceId;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070034import org.onosproject.net.MastershipRole;
35import org.onosproject.net.PortNumber;
36import org.onosproject.net.flow.DefaultTrafficTreatment;
37import org.onosproject.net.flow.FlowRule;
38import org.onosproject.net.flow.instructions.Instructions;
39import org.onosproject.net.flow.instructions.L0ModificationInstruction;
40import org.onosproject.net.flow.instructions.L2ModificationInstruction;
41import org.onosproject.net.flow.instructions.L3ModificationInstruction;
alshabib10580802015-02-18 18:30:33 -080042import org.onosproject.net.group.DefaultGroup;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070043import org.onosproject.net.group.DefaultGroupBucket;
alshabib10580802015-02-18 18:30:33 -080044import org.onosproject.net.group.DefaultGroupDescription;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070045import org.onosproject.net.group.DefaultGroupKey;
alshabib10580802015-02-18 18:30:33 -080046import org.onosproject.net.group.Group;
47import org.onosproject.net.group.Group.GroupState;
48import org.onosproject.net.group.GroupBucket;
49import org.onosproject.net.group.GroupBuckets;
50import org.onosproject.net.group.GroupDescription;
51import org.onosproject.net.group.GroupEvent;
52import org.onosproject.net.group.GroupEvent.Type;
53import org.onosproject.net.group.GroupKey;
54import org.onosproject.net.group.GroupOperation;
55import org.onosproject.net.group.GroupStore;
56import org.onosproject.net.group.GroupStoreDelegate;
57import org.onosproject.net.group.StoredGroupEntry;
58import org.onosproject.store.AbstractStore;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070059import org.onosproject.store.Timestamp;
60import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
61import org.onosproject.store.cluster.messaging.ClusterMessage;
62import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -070063import org.onosproject.store.impl.MultiValuedTimestamp;
64import org.onosproject.store.serializers.KryoNamespaces;
Jonathan Hart6ec029a2015-03-24 17:12:35 -070065import org.onosproject.store.service.ClockService;
66import org.onosproject.store.service.EventuallyConsistentMap;
67import org.onosproject.store.service.EventuallyConsistentMapBuilder;
68import org.onosproject.store.service.EventuallyConsistentMapEvent;
69import org.onosproject.store.service.EventuallyConsistentMapListener;
70import org.onosproject.store.service.StorageService;
alshabib10580802015-02-18 18:30:33 -080071import org.slf4j.Logger;
72
Jonathan Hart6ec029a2015-03-24 17:12:35 -070073import java.net.URI;
74import java.util.ArrayList;
75import java.util.HashMap;
76import java.util.List;
77import java.util.Objects;
78import java.util.concurrent.ConcurrentHashMap;
79import java.util.concurrent.ConcurrentMap;
80import java.util.concurrent.ExecutorService;
81import java.util.concurrent.Executors;
82import java.util.concurrent.atomic.AtomicInteger;
83import java.util.concurrent.atomic.AtomicLong;
84import java.util.stream.Collectors;
85
86import static org.apache.commons.lang3.concurrent.ConcurrentUtils.createIfAbsentUnchecked;
87import static org.onlab.util.Tools.groupedThreads;
88import static org.slf4j.LoggerFactory.getLogger;
alshabib10580802015-02-18 18:30:33 -080089
90/**
91 * Manages inventory of group entries using trivial in-memory implementation.
92 */
93@Component(immediate = true)
94@Service
95public class DistributedGroupStore
96 extends AbstractStore<GroupEvent, GroupStoreDelegate>
97 implements GroupStore {
98
99 private final Logger log = getLogger(getClass());
100
101 private final int dummyId = 0xffffffff;
102 private final GroupId dummyGroupId = new DefaultGroupId(dummyId);
103
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700104 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
105 protected ClusterCommunicationService clusterCommunicator;
106
107 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
108 protected ClusterService clusterService;
109
110 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700111 protected StorageService storageService;
112
113 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700114 protected MastershipService mastershipService;
115
116 // Per device group table with (device id + app cookie) as key
117 private EventuallyConsistentMap<GroupStoreKeyMapKey,
118 StoredGroupEntry> groupStoreEntriesByKey = null;
119 // Per device group table with (device id + group id) as key
120 private EventuallyConsistentMap<GroupStoreIdMapKey,
121 StoredGroupEntry> groupStoreEntriesById = null;
122 private EventuallyConsistentMap<GroupStoreKeyMapKey,
123 StoredGroupEntry> auditPendingReqQueue = null;
alshabib10580802015-02-18 18:30:33 -0800124 private final ConcurrentMap<DeviceId, ConcurrentMap<GroupId, Group>>
125 extraneousGroupEntriesById = new ConcurrentHashMap<>();
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700126 private ExecutorService messageHandlingExecutor;
127 private static final int MESSAGE_HANDLER_THREAD_POOL_SIZE = 1;
alshabib10580802015-02-18 18:30:33 -0800128
129 private final HashMap<DeviceId, Boolean> deviceAuditStatus =
130 new HashMap<DeviceId, Boolean>();
131
132 private final AtomicInteger groupIdGen = new AtomicInteger();
133
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700134 private KryoNamespace.Builder kryoBuilder = null;
135
alshabib10580802015-02-18 18:30:33 -0800136 @Activate
137 public void activate() {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700138 kryoBuilder = new KryoNamespace.Builder()
139 .register(DefaultGroup.class,
140 DefaultGroupBucket.class,
141 DefaultGroupDescription.class,
142 DefaultGroupKey.class,
143 GroupDescription.Type.class,
144 Group.GroupState.class,
145 GroupBuckets.class,
146 DefaultGroupId.class,
147 GroupStoreMessage.class,
148 GroupStoreMessage.Type.class,
149 UpdateType.class,
150 GroupStoreMessageSubjects.class,
151 MultiValuedTimestamp.class,
152 GroupStoreKeyMapKey.class,
153 GroupStoreIdMapKey.class,
154 GroupStoreMapKey.class
155 )
156 .register(URI.class)
157 .register(DeviceId.class)
158 .register(PortNumber.class)
159 .register(DefaultApplicationId.class)
160 .register(DefaultTrafficTreatment.class,
161 Instructions.DropInstruction.class,
162 Instructions.OutputInstruction.class,
163 Instructions.GroupInstruction.class,
164 Instructions.TableTypeTransition.class,
165 FlowRule.Type.class,
166 L0ModificationInstruction.class,
167 L0ModificationInstruction.L0SubType.class,
168 L0ModificationInstruction.ModLambdaInstruction.class,
169 L2ModificationInstruction.class,
170 L2ModificationInstruction.L2SubType.class,
171 L2ModificationInstruction.ModEtherInstruction.class,
172 L2ModificationInstruction.PushHeaderInstructions.class,
173 L2ModificationInstruction.ModVlanIdInstruction.class,
174 L2ModificationInstruction.ModVlanPcpInstruction.class,
175 L2ModificationInstruction.ModMplsLabelInstruction.class,
176 L2ModificationInstruction.ModMplsTtlInstruction.class,
177 L3ModificationInstruction.class,
178 L3ModificationInstruction.L3SubType.class,
179 L3ModificationInstruction.ModIPInstruction.class,
180 L3ModificationInstruction.ModIPv6FlowLabelInstruction.class,
181 L3ModificationInstruction.ModTtlInstruction.class,
182 org.onlab.packet.MplsLabel.class
183 )
184 .register(org.onosproject.cluster.NodeId.class)
185 .register(KryoNamespaces.BASIC)
186 .register(KryoNamespaces.MISC);
187
188 messageHandlingExecutor = Executors.
189 newFixedThreadPool(MESSAGE_HANDLER_THREAD_POOL_SIZE,
190 groupedThreads("onos/store/group",
191 "message-handlers"));
192 clusterCommunicator.
193 addSubscriber(GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST,
194 new ClusterGroupMsgHandler(),
195 messageHandlingExecutor);
196
197 log.debug("Creating EC map groupstorekeymap");
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700198 EventuallyConsistentMapBuilder<GroupStoreKeyMapKey, StoredGroupEntry>
199 keyMapBuilder = storageService.eventuallyConsistentMapBuilder();
200
201 groupStoreEntriesByKey = keyMapBuilder
202 .withName("groupstorekeymap")
203 .withSerializer(kryoBuilder)
204 .withClockService(new GroupStoreLogicalClockManager<>())
205 .build();
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700206 log.trace("Current size {}", groupStoreEntriesByKey.size());
207
208 log.debug("Creating EC map groupstoreidmap");
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700209 EventuallyConsistentMapBuilder<GroupStoreIdMapKey, StoredGroupEntry>
210 idMapBuilder = storageService.eventuallyConsistentMapBuilder();
211
212 groupStoreEntriesById = idMapBuilder
213 .withName("groupstoreidmap")
214 .withSerializer(kryoBuilder)
215 .withClockService(new GroupStoreLogicalClockManager<>())
216 .build();
217
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700218 groupStoreEntriesById.addListener(new GroupStoreIdMapListener());
219 log.trace("Current size {}", groupStoreEntriesById.size());
220
221 log.debug("Creating EC map pendinggroupkeymap");
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700222 EventuallyConsistentMapBuilder<GroupStoreKeyMapKey, StoredGroupEntry>
223 auditMapBuilder = storageService.eventuallyConsistentMapBuilder();
224
225 auditPendingReqQueue = auditMapBuilder
226 .withName("pendinggroupkeymap")
227 .withSerializer(kryoBuilder)
228 .withClockService(new GroupStoreLogicalClockManager<>())
229 .build();
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700230 log.trace("Current size {}", auditPendingReqQueue.size());
231
alshabib10580802015-02-18 18:30:33 -0800232 log.info("Started");
233 }
234
235 @Deactivate
236 public void deactivate() {
Thomas Vachuska152f9fd2015-04-02 16:28:13 -0700237 groupStoreEntriesByKey.destroy();
238 groupStoreEntriesById.destroy();
239 auditPendingReqQueue.destroy();
alshabib10580802015-02-18 18:30:33 -0800240 log.info("Stopped");
241 }
242
alshabib10580802015-02-18 18:30:33 -0800243 private static NewConcurrentHashMap<GroupId, Group>
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700244 lazyEmptyExtraneousGroupIdTable() {
alshabib10580802015-02-18 18:30:33 -0800245 return NewConcurrentHashMap.<GroupId, Group>ifNeeded();
246 }
247
248 /**
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700249 * Returns the group store eventual consistent key map.
alshabib10580802015-02-18 18:30:33 -0800250 *
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700251 * @return Map representing group key table.
alshabib10580802015-02-18 18:30:33 -0800252 */
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700253 private EventuallyConsistentMap<GroupStoreKeyMapKey, StoredGroupEntry>
254 getGroupStoreKeyMap() {
255 return groupStoreEntriesByKey;
alshabib10580802015-02-18 18:30:33 -0800256 }
257
258 /**
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700259 * Returns the group store eventual consistent id map.
alshabib10580802015-02-18 18:30:33 -0800260 *
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700261 * @return Map representing group id table.
alshabib10580802015-02-18 18:30:33 -0800262 */
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700263 private EventuallyConsistentMap<GroupStoreIdMapKey, StoredGroupEntry>
264 getGroupStoreIdMap() {
265 return groupStoreEntriesById;
alshabib10580802015-02-18 18:30:33 -0800266 }
267
268 /**
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700269 * Returns the pending group request table.
alshabib10580802015-02-18 18:30:33 -0800270 *
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700271 * @return Map representing group key table.
alshabib10580802015-02-18 18:30:33 -0800272 */
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700273 private EventuallyConsistentMap<GroupStoreKeyMapKey, StoredGroupEntry>
274 getPendingGroupKeyTable() {
275 return auditPendingReqQueue;
alshabib10580802015-02-18 18:30:33 -0800276 }
277
278 /**
279 * Returns the extraneous group id table for specified device.
280 *
281 * @param deviceId identifier of the device
282 * @return Map representing group key table of given device.
283 */
284 private ConcurrentMap<GroupId, Group>
285 getExtraneousGroupIdTable(DeviceId deviceId) {
286 return createIfAbsentUnchecked(extraneousGroupEntriesById,
287 deviceId,
288 lazyEmptyExtraneousGroupIdTable());
289 }
290
291 /**
292 * Returns the number of groups for the specified device in the store.
293 *
294 * @return number of groups for the specified device
295 */
296 @Override
297 public int getGroupCount(DeviceId deviceId) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700298 return (getGroups(deviceId) != null) ?
299 Iterables.size(getGroups(deviceId)) : 0;
alshabib10580802015-02-18 18:30:33 -0800300 }
301
302 /**
303 * Returns the groups associated with a device.
304 *
305 * @param deviceId the device ID
306 *
307 * @return the group entries
308 */
309 @Override
310 public Iterable<Group> getGroups(DeviceId deviceId) {
311 // flatten and make iterator unmodifiable
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700312 log.trace("getGroups: for device {} total number of groups {}",
313 deviceId, getGroupStoreKeyMap().values().size());
314 return FluentIterable.from(getGroupStoreKeyMap().values())
315 .filter(input -> input.deviceId().equals(deviceId))
316 .transform(input -> input);
alshabib10580802015-02-18 18:30:33 -0800317 }
318
319 /**
320 * Returns the stored group entry.
321 *
322 * @param deviceId the device ID
323 * @param appCookie the group key
324 *
325 * @return a group associated with the key
326 */
327 @Override
328 public Group getGroup(DeviceId deviceId, GroupKey appCookie) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700329 return getStoredGroupEntry(deviceId, appCookie);
330 }
331
332 private StoredGroupEntry getStoredGroupEntry(DeviceId deviceId,
333 GroupKey appCookie) {
334 return getGroupStoreKeyMap().get(new GroupStoreKeyMapKey(deviceId,
335 appCookie));
336 }
337
338 @Override
339 public Group getGroup(DeviceId deviceId, GroupId groupId) {
340 return getStoredGroupEntry(deviceId, groupId);
341 }
342
343 private StoredGroupEntry getStoredGroupEntry(DeviceId deviceId,
344 GroupId groupId) {
345 return getGroupStoreIdMap().get(new GroupStoreIdMapKey(deviceId,
346 groupId));
alshabib10580802015-02-18 18:30:33 -0800347 }
348
349 private int getFreeGroupIdValue(DeviceId deviceId) {
350 int freeId = groupIdGen.incrementAndGet();
351
352 while (true) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700353 Group existing = getGroup(deviceId, new DefaultGroupId(freeId));
alshabib10580802015-02-18 18:30:33 -0800354 if (existing == null) {
355 existing = (
356 extraneousGroupEntriesById.get(deviceId) != null) ?
357 extraneousGroupEntriesById.get(deviceId).
358 get(new DefaultGroupId(freeId)) :
359 null;
360 }
361 if (existing != null) {
362 freeId = groupIdGen.incrementAndGet();
363 } else {
364 break;
365 }
366 }
367 return freeId;
368 }
369
370 /**
371 * Stores a new group entry using the information from group description.
372 *
373 * @param groupDesc group description to be used to create group entry
374 */
375 @Override
376 public void storeGroupDescription(GroupDescription groupDesc) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700377 log.trace("In storeGroupDescription");
alshabib10580802015-02-18 18:30:33 -0800378 // Check if a group is existing with the same key
379 if (getGroup(groupDesc.deviceId(), groupDesc.appCookie()) != null) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700380 log.warn("Group already exists with the same key {}",
381 groupDesc.appCookie());
alshabib10580802015-02-18 18:30:33 -0800382 return;
383 }
384
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700385 // Check if group to be created by a remote instance
386 if (mastershipService.getLocalRole(
387 groupDesc.deviceId()) != MastershipRole.MASTER) {
388 log.debug("Device {} local role is not MASTER",
389 groupDesc.deviceId());
390 GroupStoreMessage groupOp = GroupStoreMessage.
391 createGroupAddRequestMsg(groupDesc.deviceId(),
392 groupDesc);
393 ClusterMessage message = new ClusterMessage(
394 clusterService.getLocalNode().id(),
395 GroupStoreMessageSubjects.
396 REMOTE_GROUP_OP_REQUEST,
397 kryoBuilder.build().serialize(groupOp));
398 if (!clusterCommunicator.unicast(message,
399 mastershipService.
400 getMasterFor(
401 groupDesc.deviceId()))) {
402 log.warn("Failed to send request to master: {} to {}",
403 message,
404 mastershipService.getMasterFor(groupDesc.deviceId()));
405 //TODO: Send Group operation failure event
406 }
407 log.debug("Sent Group operation request for device {} "
408 + "to remote MASTER {}",
409 groupDesc.deviceId(),
410 mastershipService.getMasterFor(groupDesc.deviceId()));
alshabib10580802015-02-18 18:30:33 -0800411 return;
412 }
413
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700414 log.debug("Store group for device {} is getting handled locally",
415 groupDesc.deviceId());
alshabib10580802015-02-18 18:30:33 -0800416 storeGroupDescriptionInternal(groupDesc);
417 }
418
419 private void storeGroupDescriptionInternal(GroupDescription groupDesc) {
420 // Check if a group is existing with the same key
421 if (getGroup(groupDesc.deviceId(), groupDesc.appCookie()) != null) {
422 return;
423 }
424
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700425 if (deviceAuditStatus.get(groupDesc.deviceId()) == null) {
426 // Device group audit has not completed yet
427 // Add this group description to pending group key table
428 // Create a group entry object with Dummy Group ID
429 log.debug("storeGroupDescriptionInternal: Device {} AUDIT "
430 + "pending...Queuing Group ADD request",
431 groupDesc.deviceId());
432 StoredGroupEntry group = new DefaultGroup(dummyGroupId, groupDesc);
433 group.setState(GroupState.WAITING_AUDIT_COMPLETE);
434 EventuallyConsistentMap<GroupStoreKeyMapKey, StoredGroupEntry> pendingKeyTable =
435 getPendingGroupKeyTable();
436 pendingKeyTable.put(new GroupStoreKeyMapKey(groupDesc.deviceId(),
437 groupDesc.appCookie()),
438 group);
439 return;
440 }
441
alshabib10580802015-02-18 18:30:33 -0800442 // Get a new group identifier
443 GroupId id = new DefaultGroupId(getFreeGroupIdValue(groupDesc.deviceId()));
444 // Create a group entry object
445 StoredGroupEntry group = new DefaultGroup(id, groupDesc);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700446 // Insert the newly created group entry into key and id maps
447 getGroupStoreKeyMap().
448 put(new GroupStoreKeyMapKey(groupDesc.deviceId(),
449 groupDesc.appCookie()), group);
450 getGroupStoreIdMap().
451 put(new GroupStoreIdMapKey(groupDesc.deviceId(),
452 id), group);
alshabib10580802015-02-18 18:30:33 -0800453 notifyDelegate(new GroupEvent(GroupEvent.Type.GROUP_ADD_REQUESTED,
454 group));
455 }
456
457 /**
458 * Updates the existing group entry with the information
459 * from group description.
460 *
461 * @param deviceId the device ID
462 * @param oldAppCookie the current group key
463 * @param type update type
464 * @param newBuckets group buckets for updates
465 * @param newAppCookie optional new group key
466 */
467 @Override
468 public void updateGroupDescription(DeviceId deviceId,
469 GroupKey oldAppCookie,
470 UpdateType type,
471 GroupBuckets newBuckets,
472 GroupKey newAppCookie) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700473 // Check if group update to be done by a remote instance
474 if (mastershipService.
475 getLocalRole(deviceId) != MastershipRole.MASTER) {
476 GroupStoreMessage groupOp = GroupStoreMessage.
477 createGroupUpdateRequestMsg(deviceId,
478 oldAppCookie,
479 type,
480 newBuckets,
481 newAppCookie);
482 ClusterMessage message =
483 new ClusterMessage(clusterService.getLocalNode().id(),
484 GroupStoreMessageSubjects.
485 REMOTE_GROUP_OP_REQUEST,
486 kryoBuilder.build().serialize(groupOp));
487 if (!clusterCommunicator.unicast(message,
488 mastershipService.
489 getMasterFor(deviceId))) {
490 log.warn("Failed to send request to master: {} to {}",
491 message,
492 mastershipService.getMasterFor(deviceId));
493 //TODO: Send Group operation failure event
494 }
495 return;
496 }
497 updateGroupDescriptionInternal(deviceId,
498 oldAppCookie,
499 type,
500 newBuckets,
501 newAppCookie);
502 }
503
504 private void updateGroupDescriptionInternal(DeviceId deviceId,
505 GroupKey oldAppCookie,
506 UpdateType type,
507 GroupBuckets newBuckets,
508 GroupKey newAppCookie) {
alshabib10580802015-02-18 18:30:33 -0800509 // Check if a group is existing with the provided key
510 Group oldGroup = getGroup(deviceId, oldAppCookie);
511 if (oldGroup == null) {
512 return;
513 }
514
515 List<GroupBucket> newBucketList = getUpdatedBucketList(oldGroup,
516 type,
517 newBuckets);
518 if (newBucketList != null) {
519 // Create a new group object from the old group
520 GroupBuckets updatedBuckets = new GroupBuckets(newBucketList);
521 GroupKey newCookie = (newAppCookie != null) ? newAppCookie : oldAppCookie;
522 GroupDescription updatedGroupDesc = new DefaultGroupDescription(
523 oldGroup.deviceId(),
524 oldGroup.type(),
525 updatedBuckets,
526 newCookie,
527 oldGroup.appId());
528 StoredGroupEntry newGroup = new DefaultGroup(oldGroup.id(),
529 updatedGroupDesc);
530 newGroup.setState(GroupState.PENDING_UPDATE);
531 newGroup.setLife(oldGroup.life());
532 newGroup.setPackets(oldGroup.packets());
533 newGroup.setBytes(oldGroup.bytes());
534 // Remove the old entry from maps and add new entry using new key
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700535 getGroupStoreKeyMap().remove(new GroupStoreKeyMapKey(oldGroup.deviceId(),
536 oldGroup.appCookie()));
537 getGroupStoreIdMap().remove(new GroupStoreIdMapKey(oldGroup.deviceId(),
538 oldGroup.id()));
539 getGroupStoreKeyMap().
540 put(new GroupStoreKeyMapKey(newGroup.deviceId(),
541 newGroup.appCookie()), newGroup);
542 getGroupStoreIdMap().
543 put(new GroupStoreIdMapKey(newGroup.deviceId(),
544 newGroup.id()), newGroup);
545
alshabib10580802015-02-18 18:30:33 -0800546 notifyDelegate(new GroupEvent(Type.GROUP_UPDATE_REQUESTED, newGroup));
547 }
548 }
549
550 private List<GroupBucket> getUpdatedBucketList(Group oldGroup,
551 UpdateType type,
552 GroupBuckets buckets) {
553 GroupBuckets oldBuckets = oldGroup.buckets();
554 List<GroupBucket> newBucketList = new ArrayList<GroupBucket>(
555 oldBuckets.buckets());
556 boolean groupDescUpdated = false;
557
558 if (type == UpdateType.ADD) {
559 // Check if the any of the new buckets are part of
560 // the old bucket list
561 for (GroupBucket addBucket:buckets.buckets()) {
562 if (!newBucketList.contains(addBucket)) {
563 newBucketList.add(addBucket);
564 groupDescUpdated = true;
565 }
566 }
567 } else if (type == UpdateType.REMOVE) {
568 // Check if the to be removed buckets are part of the
569 // old bucket list
570 for (GroupBucket removeBucket:buckets.buckets()) {
571 if (newBucketList.contains(removeBucket)) {
572 newBucketList.remove(removeBucket);
573 groupDescUpdated = true;
574 }
575 }
576 }
577
578 if (groupDescUpdated) {
579 return newBucketList;
580 } else {
581 return null;
582 }
583 }
584
585 /**
586 * Triggers deleting the existing group entry.
587 *
588 * @param deviceId the device ID
589 * @param appCookie the group key
590 */
591 @Override
592 public void deleteGroupDescription(DeviceId deviceId,
593 GroupKey appCookie) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700594 // Check if group to be deleted by a remote instance
595 if (mastershipService.
596 getLocalRole(deviceId) != MastershipRole.MASTER) {
597 GroupStoreMessage groupOp = GroupStoreMessage.
598 createGroupDeleteRequestMsg(deviceId,
599 appCookie);
600 ClusterMessage message =
601 new ClusterMessage(clusterService.getLocalNode().id(),
602 GroupStoreMessageSubjects.
603 REMOTE_GROUP_OP_REQUEST,
604 kryoBuilder.build().serialize(groupOp));
605 if (!clusterCommunicator.unicast(message,
606 mastershipService.
607 getMasterFor(deviceId))) {
608 log.warn("Failed to send request to master: {} to {}",
609 message,
610 mastershipService.getMasterFor(deviceId));
611 //TODO: Send Group operation failure event
612 }
613 return;
614 }
615 deleteGroupDescriptionInternal(deviceId, appCookie);
616 }
617
618 private void deleteGroupDescriptionInternal(DeviceId deviceId,
619 GroupKey appCookie) {
alshabib10580802015-02-18 18:30:33 -0800620 // Check if a group is existing with the provided key
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700621 StoredGroupEntry existing = getStoredGroupEntry(deviceId, appCookie);
alshabib10580802015-02-18 18:30:33 -0800622 if (existing == null) {
623 return;
624 }
625
626 synchronized (existing) {
627 existing.setState(GroupState.PENDING_DELETE);
628 }
629 notifyDelegate(new GroupEvent(Type.GROUP_REMOVE_REQUESTED, existing));
630 }
631
632 /**
633 * Stores a new group entry, or updates an existing entry.
634 *
635 * @param group group entry
636 */
637 @Override
638 public void addOrUpdateGroupEntry(Group group) {
639 // check if this new entry is an update to an existing entry
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700640 StoredGroupEntry existing = getStoredGroupEntry(group.deviceId(),
641 group.id());
alshabib10580802015-02-18 18:30:33 -0800642 GroupEvent event = null;
643
644 if (existing != null) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700645 log.trace("addOrUpdateGroupEntry: updating group "
646 + "entry {} in device {}",
647 group.id(),
648 group.deviceId());
alshabib10580802015-02-18 18:30:33 -0800649 synchronized (existing) {
650 existing.setLife(group.life());
651 existing.setPackets(group.packets());
652 existing.setBytes(group.bytes());
653 if (existing.state() == GroupState.PENDING_ADD) {
654 existing.setState(GroupState.ADDED);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700655 existing.setIsGroupStateAddedFirstTime(true);
alshabib10580802015-02-18 18:30:33 -0800656 event = new GroupEvent(Type.GROUP_ADDED, existing);
657 } else {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700658 existing.setState(GroupState.ADDED);
659 existing.setIsGroupStateAddedFirstTime(false);
alshabib10580802015-02-18 18:30:33 -0800660 event = new GroupEvent(Type.GROUP_UPDATED, existing);
661 }
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700662 //Re-PUT map entries to trigger map update events
663 getGroupStoreKeyMap().
664 put(new GroupStoreKeyMapKey(existing.deviceId(),
665 existing.appCookie()), existing);
666 getGroupStoreIdMap().
667 put(new GroupStoreIdMapKey(existing.deviceId(),
668 existing.id()), existing);
alshabib10580802015-02-18 18:30:33 -0800669 }
670 }
671
672 if (event != null) {
673 notifyDelegate(event);
674 }
675 }
676
677 /**
678 * Removes the group entry from store.
679 *
680 * @param group group entry
681 */
682 @Override
683 public void removeGroupEntry(Group group) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700684 StoredGroupEntry existing = getStoredGroupEntry(group.deviceId(),
685 group.id());
alshabib10580802015-02-18 18:30:33 -0800686
687 if (existing != null) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700688 log.trace("removeGroupEntry: removing group "
689 + "entry {} in device {}",
690 group.id(),
691 group.deviceId());
692 getGroupStoreKeyMap().remove(new GroupStoreKeyMapKey(existing.deviceId(),
693 existing.appCookie()));
694 getGroupStoreIdMap().remove(new GroupStoreIdMapKey(existing.deviceId(),
695 existing.id()));
alshabib10580802015-02-18 18:30:33 -0800696 notifyDelegate(new GroupEvent(Type.GROUP_REMOVED, existing));
697 }
698 }
699
700 @Override
701 public void deviceInitialAuditCompleted(DeviceId deviceId,
702 boolean completed) {
703 synchronized (deviceAuditStatus) {
704 if (completed) {
705 log.debug("deviceInitialAuditCompleted: AUDIT "
706 + "completed for device {}", deviceId);
707 deviceAuditStatus.put(deviceId, true);
708 // Execute all pending group requests
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700709 List<StoredGroupEntry> pendingGroupRequests =
710 getPendingGroupKeyTable().values()
711 .stream()
712 .filter(g-> g.deviceId().equals(deviceId))
713 .collect(Collectors.toList());
714 log.trace("deviceInitialAuditCompleted: processing "
715 + "pending group add requests for device {} and "
716 + "number of pending requests {}",
717 deviceId,
718 pendingGroupRequests.size());
719 for (Group group:pendingGroupRequests) {
alshabib10580802015-02-18 18:30:33 -0800720 GroupDescription tmp = new DefaultGroupDescription(
721 group.deviceId(),
722 group.type(),
723 group.buckets(),
724 group.appCookie(),
725 group.appId());
726 storeGroupDescriptionInternal(tmp);
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700727 getPendingGroupKeyTable().
728 remove(new GroupStoreKeyMapKey(deviceId, group.appCookie()));
alshabib10580802015-02-18 18:30:33 -0800729 }
alshabib10580802015-02-18 18:30:33 -0800730 } else {
731 if (deviceAuditStatus.get(deviceId)) {
732 log.debug("deviceInitialAuditCompleted: Clearing AUDIT "
733 + "status for device {}", deviceId);
734 deviceAuditStatus.put(deviceId, false);
735 }
736 }
737 }
738 }
739
740 @Override
741 public boolean deviceInitialAuditStatus(DeviceId deviceId) {
742 synchronized (deviceAuditStatus) {
743 return (deviceAuditStatus.get(deviceId) != null)
744 ? deviceAuditStatus.get(deviceId) : false;
745 }
746 }
747
748 @Override
749 public void groupOperationFailed(DeviceId deviceId, GroupOperation operation) {
750
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700751 StoredGroupEntry existing = getStoredGroupEntry(deviceId,
752 operation.groupId());
alshabib10580802015-02-18 18:30:33 -0800753
754 if (existing == null) {
755 log.warn("No group entry with ID {} found ", operation.groupId());
756 return;
757 }
758
759 switch (operation.opType()) {
760 case ADD:
761 notifyDelegate(new GroupEvent(Type.GROUP_ADD_FAILED, existing));
762 break;
763 case MODIFY:
764 notifyDelegate(new GroupEvent(Type.GROUP_UPDATE_FAILED, existing));
765 break;
766 case DELETE:
767 notifyDelegate(new GroupEvent(Type.GROUP_REMOVE_FAILED, existing));
768 break;
769 default:
770 log.warn("Unknown group operation type {}", operation.opType());
771 }
772
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700773 getGroupStoreKeyMap().remove(new GroupStoreKeyMapKey(existing.deviceId(),
774 existing.appCookie()));
775 getGroupStoreIdMap().remove(new GroupStoreIdMapKey(existing.deviceId(),
776 existing.id()));
alshabib10580802015-02-18 18:30:33 -0800777 }
778
779 @Override
780 public void addOrUpdateExtraneousGroupEntry(Group group) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700781 log.trace("addOrUpdateExtraneousGroupEntry: add/update extraneous "
782 + "group entry {} in device {}",
783 group.id(),
784 group.deviceId());
alshabib10580802015-02-18 18:30:33 -0800785 ConcurrentMap<GroupId, Group> extraneousIdTable =
786 getExtraneousGroupIdTable(group.deviceId());
787 extraneousIdTable.put(group.id(), group);
788 // Check the reference counter
789 if (group.referenceCount() == 0) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700790 log.trace("addOrUpdateExtraneousGroupEntry: Flow reference "
791 + "counter is zero and triggering remove",
792 group.id(),
793 group.deviceId());
alshabib10580802015-02-18 18:30:33 -0800794 notifyDelegate(new GroupEvent(Type.GROUP_REMOVE_REQUESTED, group));
795 }
796 }
797
798 @Override
799 public void removeExtraneousGroupEntry(Group group) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700800 log.trace("removeExtraneousGroupEntry: remove extraneous "
801 + "group entry {} of device {} from store",
802 group.id(),
803 group.deviceId());
alshabib10580802015-02-18 18:30:33 -0800804 ConcurrentMap<GroupId, Group> extraneousIdTable =
805 getExtraneousGroupIdTable(group.deviceId());
806 extraneousIdTable.remove(group.id());
807 }
808
809 @Override
810 public Iterable<Group> getExtraneousGroups(DeviceId deviceId) {
811 // flatten and make iterator unmodifiable
812 return FluentIterable.from(
813 getExtraneousGroupIdTable(deviceId).values());
814 }
815
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700816 /**
817 * ClockService that generates wallclock based timestamps.
818 */
819 private class GroupStoreLogicalClockManager<T, U>
820 implements ClockService<T, U> {
alshabib10580802015-02-18 18:30:33 -0800821
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700822 private final AtomicLong sequenceNumber = new AtomicLong(0);
823
824 @Override
825 public Timestamp getTimestamp(T t1, U u1) {
826 return new MultiValuedTimestamp<>(System.currentTimeMillis(),
827 sequenceNumber.getAndIncrement());
828 }
829 }
830
831 /**
832 * Map handler to receive any events when the group map is updated.
833 */
834 private class GroupStoreIdMapListener implements
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700835 EventuallyConsistentMapListener<GroupStoreIdMapKey, StoredGroupEntry> {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700836
837 @Override
838 public void event(EventuallyConsistentMapEvent<GroupStoreIdMapKey,
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700839 StoredGroupEntry> mapEvent) {
Srikanth Vavilapalli717361f2015-03-16 12:06:04 -0700840 GroupEvent groupEvent = null;
841 log.trace("GroupStoreIdMapListener: received groupid map event {}",
842 mapEvent.type());
843 if (mapEvent.type() == EventuallyConsistentMapEvent.Type.PUT) {
844 log.trace("GroupIdMapListener: Received PUT event");
845 if (mapEvent.value().state() == Group.GroupState.ADDED) {
846 if (mapEvent.value().isGroupStateAddedFirstTime()) {
847 groupEvent = new GroupEvent(Type.GROUP_ADDED,
848 mapEvent.value());
849 log.trace("GroupIdMapListener: Received first time "
850 + "GROUP_ADDED state update");
851 } else {
852 groupEvent = new GroupEvent(Type.GROUP_UPDATED,
853 mapEvent.value());
854 log.trace("GroupIdMapListener: Received following "
855 + "GROUP_ADDED state update");
856 }
857 }
858 } else if (mapEvent.type() == EventuallyConsistentMapEvent.Type.REMOVE) {
859 log.trace("GroupIdMapListener: Received REMOVE event");
860 groupEvent = new GroupEvent(Type.GROUP_REMOVED, mapEvent.value());
861 }
862
863 if (groupEvent != null) {
864 notifyDelegate(groupEvent);
865 }
866 }
867 }
868 /**
869 * Message handler to receive messages from group subsystems of
870 * other cluster members.
871 */
872 private final class ClusterGroupMsgHandler
873 implements ClusterMessageHandler {
874 @Override
875 public void handle(ClusterMessage message) {
876 log.trace("ClusterGroupMsgHandler: received remote group message");
877 if (message.subject() ==
878 GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST) {
879 GroupStoreMessage groupOp = kryoBuilder.
880 build().deserialize(message.payload());
881 log.trace("received remote group operation request");
882 if (!(mastershipService.
883 getLocalRole(groupOp.deviceId()) !=
884 MastershipRole.MASTER)) {
885 log.warn("ClusterGroupMsgHandler: This node is not "
886 + "MASTER for device {}", groupOp.deviceId());
887 return;
888 }
889 if (groupOp.type() == GroupStoreMessage.Type.ADD) {
890 log.trace("processing remote group "
891 + "add operation request");
892 storeGroupDescriptionInternal(groupOp.groupDesc());
893 } else if (groupOp.type() == GroupStoreMessage.Type.UPDATE) {
894 log.trace("processing remote group "
895 + "update operation request");
896 updateGroupDescriptionInternal(groupOp.deviceId(),
897 groupOp.appCookie(),
898 groupOp.updateType(),
899 groupOp.updateBuckets(),
900 groupOp.newAppCookie());
901 } else if (groupOp.type() == GroupStoreMessage.Type.DELETE) {
902 log.trace("processing remote group "
903 + "delete operation request");
904 deleteGroupDescriptionInternal(groupOp.deviceId(),
905 groupOp.appCookie());
906 }
907 }
908 }
909 }
910
911 /**
912 * Flattened map key to be used to store group entries.
913 */
914 private class GroupStoreMapKey {
915 private final DeviceId deviceId;
916
917 public GroupStoreMapKey(DeviceId deviceId) {
918 this.deviceId = deviceId;
919 }
920
921 @Override
922 public boolean equals(Object o) {
923 if (this == o) {
924 return true;
925 }
926 if (!(o instanceof GroupStoreMapKey)) {
927 return false;
928 }
929 GroupStoreMapKey that = (GroupStoreMapKey) o;
930 return this.deviceId.equals(that.deviceId);
931 }
932
933 @Override
934 public int hashCode() {
935 int result = 17;
936
937 result = 31 * result + Objects.hash(this.deviceId);
938
939 return result;
940 }
941 }
942
943 private class GroupStoreKeyMapKey extends GroupStoreMapKey {
944 private final GroupKey appCookie;
945 public GroupStoreKeyMapKey(DeviceId deviceId,
946 GroupKey appCookie) {
947 super(deviceId);
948 this.appCookie = appCookie;
949 }
950
951 @Override
952 public boolean equals(Object o) {
953 if (this == o) {
954 return true;
955 }
956 if (!(o instanceof GroupStoreKeyMapKey)) {
957 return false;
958 }
959 GroupStoreKeyMapKey that = (GroupStoreKeyMapKey) o;
960 return (super.equals(that) &&
961 this.appCookie.equals(that.appCookie));
962 }
963
964 @Override
965 public int hashCode() {
966 int result = 17;
967
968 result = 31 * result + super.hashCode() + Objects.hash(this.appCookie);
969
970 return result;
971 }
972 }
973
974 private class GroupStoreIdMapKey extends GroupStoreMapKey {
975 private final GroupId groupId;
976 public GroupStoreIdMapKey(DeviceId deviceId,
977 GroupId groupId) {
978 super(deviceId);
979 this.groupId = groupId;
980 }
981
982 @Override
983 public boolean equals(Object o) {
984 if (this == o) {
985 return true;
986 }
987 if (!(o instanceof GroupStoreIdMapKey)) {
988 return false;
989 }
990 GroupStoreIdMapKey that = (GroupStoreIdMapKey) o;
991 return (super.equals(that) &&
992 this.groupId.equals(that.groupId));
993 }
994
995 @Override
996 public int hashCode() {
997 int result = 17;
998
999 result = 31 * result + super.hashCode() + Objects.hash(this.groupId);
1000
1001 return result;
1002 }
1003 }
alshabib10580802015-02-18 18:30:33 -08001004}