blob: c914784537d1e98763c75341edeb756be134ea87 [file] [log] [blame]
alshabib10580802015-02-18 18:30:33 -08001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.group.impl;
17
18import static org.apache.commons.lang3.concurrent.ConcurrentUtils.createIfAbsentUnchecked;
19import static org.slf4j.LoggerFactory.getLogger;
20
21import java.util.ArrayList;
22import java.util.HashMap;
23import java.util.List;
24import java.util.concurrent.ConcurrentHashMap;
25import java.util.concurrent.ConcurrentMap;
26import java.util.concurrent.atomic.AtomicInteger;
27
28import org.apache.felix.scr.annotations.Activate;
29import org.apache.felix.scr.annotations.Component;
30import org.apache.felix.scr.annotations.Deactivate;
31import org.apache.felix.scr.annotations.Service;
32import org.onlab.util.NewConcurrentHashMap;
33import org.onosproject.core.DefaultGroupId;
34import org.onosproject.core.GroupId;
35import org.onosproject.net.DeviceId;
36import org.onosproject.net.group.DefaultGroup;
37import org.onosproject.net.group.DefaultGroupDescription;
38import org.onosproject.net.group.Group;
39import org.onosproject.net.group.Group.GroupState;
40import org.onosproject.net.group.GroupBucket;
41import org.onosproject.net.group.GroupBuckets;
42import org.onosproject.net.group.GroupDescription;
43import org.onosproject.net.group.GroupEvent;
44import org.onosproject.net.group.GroupEvent.Type;
45import org.onosproject.net.group.GroupKey;
46import org.onosproject.net.group.GroupOperation;
47import org.onosproject.net.group.GroupStore;
48import org.onosproject.net.group.GroupStoreDelegate;
49import org.onosproject.net.group.StoredGroupEntry;
50import org.onosproject.store.AbstractStore;
51import org.slf4j.Logger;
52
53import com.google.common.base.Function;
54import com.google.common.collect.FluentIterable;
55
56/**
57 * Manages inventory of group entries using trivial in-memory implementation.
58 */
59@Component(immediate = true)
60@Service
61public class DistributedGroupStore
62 extends AbstractStore<GroupEvent, GroupStoreDelegate>
63 implements GroupStore {
64
65 private final Logger log = getLogger(getClass());
66
67 private final int dummyId = 0xffffffff;
68 private final GroupId dummyGroupId = new DefaultGroupId(dummyId);
69
70 // inner Map is per device group table
71 private final ConcurrentMap<DeviceId, ConcurrentMap<GroupKey, StoredGroupEntry>>
72 groupEntriesByKey = new ConcurrentHashMap<>();
73 private final ConcurrentMap<DeviceId, ConcurrentMap<GroupId, StoredGroupEntry>>
74 groupEntriesById = new ConcurrentHashMap<>();
75 private final ConcurrentMap<DeviceId, ConcurrentMap<GroupKey, StoredGroupEntry>>
76 pendingGroupEntriesByKey = new ConcurrentHashMap<>();
77 private final ConcurrentMap<DeviceId, ConcurrentMap<GroupId, Group>>
78 extraneousGroupEntriesById = new ConcurrentHashMap<>();
79
80 private final HashMap<DeviceId, Boolean> deviceAuditStatus =
81 new HashMap<DeviceId, Boolean>();
82
83 private final AtomicInteger groupIdGen = new AtomicInteger();
84
85 @Activate
86 public void activate() {
87 log.info("Started");
88 }
89
90 @Deactivate
91 public void deactivate() {
92 groupEntriesByKey.clear();
93 groupEntriesById.clear();
94 log.info("Stopped");
95 }
96
97 private static NewConcurrentHashMap<GroupKey, StoredGroupEntry>
98 lazyEmptyGroupKeyTable() {
99 return NewConcurrentHashMap.<GroupKey, StoredGroupEntry>ifNeeded();
100 }
101
102 private static NewConcurrentHashMap<GroupId, StoredGroupEntry>
103 lazyEmptyGroupIdTable() {
104 return NewConcurrentHashMap.<GroupId, StoredGroupEntry>ifNeeded();
105 }
106
107 private static NewConcurrentHashMap<GroupKey, StoredGroupEntry>
108 lazyEmptyPendingGroupKeyTable() {
109 return NewConcurrentHashMap.<GroupKey, StoredGroupEntry>ifNeeded();
110 }
111
112 private static NewConcurrentHashMap<GroupId, Group>
113 lazyEmptyExtraneousGroupIdTable() {
114 return NewConcurrentHashMap.<GroupId, Group>ifNeeded();
115 }
116
117 /**
118 * Returns the group key table for specified device.
119 *
120 * @param deviceId identifier of the device
121 * @return Map representing group key table of given device.
122 */
123 private ConcurrentMap<GroupKey, StoredGroupEntry> getGroupKeyTable(DeviceId deviceId) {
124 return createIfAbsentUnchecked(groupEntriesByKey,
125 deviceId, lazyEmptyGroupKeyTable());
126 }
127
128 /**
129 * Returns the group id table for specified device.
130 *
131 * @param deviceId identifier of the device
132 * @return Map representing group key table of given device.
133 */
134 private ConcurrentMap<GroupId, StoredGroupEntry> getGroupIdTable(DeviceId deviceId) {
135 return createIfAbsentUnchecked(groupEntriesById,
136 deviceId, lazyEmptyGroupIdTable());
137 }
138
139 /**
140 * Returns the pending group key table for specified device.
141 *
142 * @param deviceId identifier of the device
143 * @return Map representing group key table of given device.
144 */
145 private ConcurrentMap<GroupKey, StoredGroupEntry>
146 getPendingGroupKeyTable(DeviceId deviceId) {
147 return createIfAbsentUnchecked(pendingGroupEntriesByKey,
148 deviceId, lazyEmptyPendingGroupKeyTable());
149 }
150
151 /**
152 * Returns the extraneous group id table for specified device.
153 *
154 * @param deviceId identifier of the device
155 * @return Map representing group key table of given device.
156 */
157 private ConcurrentMap<GroupId, Group>
158 getExtraneousGroupIdTable(DeviceId deviceId) {
159 return createIfAbsentUnchecked(extraneousGroupEntriesById,
160 deviceId,
161 lazyEmptyExtraneousGroupIdTable());
162 }
163
164 /**
165 * Returns the number of groups for the specified device in the store.
166 *
167 * @return number of groups for the specified device
168 */
169 @Override
170 public int getGroupCount(DeviceId deviceId) {
171 return (groupEntriesByKey.get(deviceId) != null) ?
172 groupEntriesByKey.get(deviceId).size() : 0;
173 }
174
175 /**
176 * Returns the groups associated with a device.
177 *
178 * @param deviceId the device ID
179 *
180 * @return the group entries
181 */
182 @Override
183 public Iterable<Group> getGroups(DeviceId deviceId) {
184 // flatten and make iterator unmodifiable
185 return FluentIterable.from(getGroupKeyTable(deviceId).values())
186 .transform(
187 new Function<StoredGroupEntry, Group>() {
188
189 @Override
190 public Group apply(
191 StoredGroupEntry input) {
192 return input;
193 }
194 });
195 }
196
197 /**
198 * Returns the stored group entry.
199 *
200 * @param deviceId the device ID
201 * @param appCookie the group key
202 *
203 * @return a group associated with the key
204 */
205 @Override
206 public Group getGroup(DeviceId deviceId, GroupKey appCookie) {
207 return (groupEntriesByKey.get(deviceId) != null) ?
208 groupEntriesByKey.get(deviceId).get(appCookie) :
209 null;
210 }
211
212 private int getFreeGroupIdValue(DeviceId deviceId) {
213 int freeId = groupIdGen.incrementAndGet();
214
215 while (true) {
216 Group existing = (
217 groupEntriesById.get(deviceId) != null) ?
218 groupEntriesById.get(deviceId).get(new DefaultGroupId(freeId)) :
219 null;
220 if (existing == null) {
221 existing = (
222 extraneousGroupEntriesById.get(deviceId) != null) ?
223 extraneousGroupEntriesById.get(deviceId).
224 get(new DefaultGroupId(freeId)) :
225 null;
226 }
227 if (existing != null) {
228 freeId = groupIdGen.incrementAndGet();
229 } else {
230 break;
231 }
232 }
233 return freeId;
234 }
235
236 /**
237 * Stores a new group entry using the information from group description.
238 *
239 * @param groupDesc group description to be used to create group entry
240 */
241 @Override
242 public void storeGroupDescription(GroupDescription groupDesc) {
243 // Check if a group is existing with the same key
244 if (getGroup(groupDesc.deviceId(), groupDesc.appCookie()) != null) {
245 return;
246 }
247
248 if (deviceAuditStatus.get(groupDesc.deviceId()) == null) {
249 // Device group audit has not completed yet
250 // Add this group description to pending group key table
251 // Create a group entry object with Dummy Group ID
252 StoredGroupEntry group = new DefaultGroup(dummyGroupId, groupDesc);
253 group.setState(GroupState.WAITING_AUDIT_COMPLETE);
254 ConcurrentMap<GroupKey, StoredGroupEntry> pendingKeyTable =
255 getPendingGroupKeyTable(groupDesc.deviceId());
256 pendingKeyTable.put(groupDesc.appCookie(), group);
257 return;
258 }
259
260 storeGroupDescriptionInternal(groupDesc);
261 }
262
263 private void storeGroupDescriptionInternal(GroupDescription groupDesc) {
264 // Check if a group is existing with the same key
265 if (getGroup(groupDesc.deviceId(), groupDesc.appCookie()) != null) {
266 return;
267 }
268
269 // Get a new group identifier
270 GroupId id = new DefaultGroupId(getFreeGroupIdValue(groupDesc.deviceId()));
271 // Create a group entry object
272 StoredGroupEntry group = new DefaultGroup(id, groupDesc);
273 // Insert the newly created group entry into concurrent key and id maps
274 ConcurrentMap<GroupKey, StoredGroupEntry> keyTable =
275 getGroupKeyTable(groupDesc.deviceId());
276 keyTable.put(groupDesc.appCookie(), group);
277 ConcurrentMap<GroupId, StoredGroupEntry> idTable =
278 getGroupIdTable(groupDesc.deviceId());
279 idTable.put(id, group);
280 notifyDelegate(new GroupEvent(GroupEvent.Type.GROUP_ADD_REQUESTED,
281 group));
282 }
283
284 /**
285 * Updates the existing group entry with the information
286 * from group description.
287 *
288 * @param deviceId the device ID
289 * @param oldAppCookie the current group key
290 * @param type update type
291 * @param newBuckets group buckets for updates
292 * @param newAppCookie optional new group key
293 */
294 @Override
295 public void updateGroupDescription(DeviceId deviceId,
296 GroupKey oldAppCookie,
297 UpdateType type,
298 GroupBuckets newBuckets,
299 GroupKey newAppCookie) {
300 // Check if a group is existing with the provided key
301 Group oldGroup = getGroup(deviceId, oldAppCookie);
302 if (oldGroup == null) {
303 return;
304 }
305
306 List<GroupBucket> newBucketList = getUpdatedBucketList(oldGroup,
307 type,
308 newBuckets);
309 if (newBucketList != null) {
310 // Create a new group object from the old group
311 GroupBuckets updatedBuckets = new GroupBuckets(newBucketList);
312 GroupKey newCookie = (newAppCookie != null) ? newAppCookie : oldAppCookie;
313 GroupDescription updatedGroupDesc = new DefaultGroupDescription(
314 oldGroup.deviceId(),
315 oldGroup.type(),
316 updatedBuckets,
317 newCookie,
318 oldGroup.appId());
319 StoredGroupEntry newGroup = new DefaultGroup(oldGroup.id(),
320 updatedGroupDesc);
321 newGroup.setState(GroupState.PENDING_UPDATE);
322 newGroup.setLife(oldGroup.life());
323 newGroup.setPackets(oldGroup.packets());
324 newGroup.setBytes(oldGroup.bytes());
325 // Remove the old entry from maps and add new entry using new key
326 ConcurrentMap<GroupKey, StoredGroupEntry> keyTable =
327 getGroupKeyTable(oldGroup.deviceId());
328 ConcurrentMap<GroupId, StoredGroupEntry> idTable =
329 getGroupIdTable(oldGroup.deviceId());
330 keyTable.remove(oldGroup.appCookie());
331 idTable.remove(oldGroup.id());
332 keyTable.put(newGroup.appCookie(), newGroup);
333 idTable.put(newGroup.id(), newGroup);
334 notifyDelegate(new GroupEvent(Type.GROUP_UPDATE_REQUESTED, newGroup));
335 }
336 }
337
338 private List<GroupBucket> getUpdatedBucketList(Group oldGroup,
339 UpdateType type,
340 GroupBuckets buckets) {
341 GroupBuckets oldBuckets = oldGroup.buckets();
342 List<GroupBucket> newBucketList = new ArrayList<GroupBucket>(
343 oldBuckets.buckets());
344 boolean groupDescUpdated = false;
345
346 if (type == UpdateType.ADD) {
347 // Check if the any of the new buckets are part of
348 // the old bucket list
349 for (GroupBucket addBucket:buckets.buckets()) {
350 if (!newBucketList.contains(addBucket)) {
351 newBucketList.add(addBucket);
352 groupDescUpdated = true;
353 }
354 }
355 } else if (type == UpdateType.REMOVE) {
356 // Check if the to be removed buckets are part of the
357 // old bucket list
358 for (GroupBucket removeBucket:buckets.buckets()) {
359 if (newBucketList.contains(removeBucket)) {
360 newBucketList.remove(removeBucket);
361 groupDescUpdated = true;
362 }
363 }
364 }
365
366 if (groupDescUpdated) {
367 return newBucketList;
368 } else {
369 return null;
370 }
371 }
372
373 /**
374 * Triggers deleting the existing group entry.
375 *
376 * @param deviceId the device ID
377 * @param appCookie the group key
378 */
379 @Override
380 public void deleteGroupDescription(DeviceId deviceId,
381 GroupKey appCookie) {
382 // Check if a group is existing with the provided key
383 StoredGroupEntry existing = (groupEntriesByKey.get(deviceId) != null) ?
384 groupEntriesByKey.get(deviceId).get(appCookie) :
385 null;
386 if (existing == null) {
387 return;
388 }
389
390 synchronized (existing) {
391 existing.setState(GroupState.PENDING_DELETE);
392 }
393 notifyDelegate(new GroupEvent(Type.GROUP_REMOVE_REQUESTED, existing));
394 }
395
396 /**
397 * Stores a new group entry, or updates an existing entry.
398 *
399 * @param group group entry
400 */
401 @Override
402 public void addOrUpdateGroupEntry(Group group) {
403 // check if this new entry is an update to an existing entry
404 StoredGroupEntry existing = (groupEntriesById.get(
405 group.deviceId()) != null) ?
406 groupEntriesById.get(group.deviceId()).get(group.id()) :
407 null;
408 GroupEvent event = null;
409
410 if (existing != null) {
411 synchronized (existing) {
412 existing.setLife(group.life());
413 existing.setPackets(group.packets());
414 existing.setBytes(group.bytes());
415 if (existing.state() == GroupState.PENDING_ADD) {
416 existing.setState(GroupState.ADDED);
417 event = new GroupEvent(Type.GROUP_ADDED, existing);
418 } else {
419 if (existing.state() == GroupState.PENDING_UPDATE) {
420 existing.setState(GroupState.PENDING_UPDATE);
421 }
422 event = new GroupEvent(Type.GROUP_UPDATED, existing);
423 }
424 }
425 }
426
427 if (event != null) {
428 notifyDelegate(event);
429 }
430 }
431
432 /**
433 * Removes the group entry from store.
434 *
435 * @param group group entry
436 */
437 @Override
438 public void removeGroupEntry(Group group) {
439 StoredGroupEntry existing = (groupEntriesById.get(
440 group.deviceId()) != null) ?
441 groupEntriesById.get(group.deviceId()).get(group.id()) :
442 null;
443
444 if (existing != null) {
445 ConcurrentMap<GroupKey, StoredGroupEntry> keyTable =
446 getGroupKeyTable(existing.deviceId());
447 ConcurrentMap<GroupId, StoredGroupEntry> idTable =
448 getGroupIdTable(existing.deviceId());
449 idTable.remove(existing.id());
450 keyTable.remove(existing.appCookie());
451 notifyDelegate(new GroupEvent(Type.GROUP_REMOVED, existing));
452 }
453 }
454
455 @Override
456 public void deviceInitialAuditCompleted(DeviceId deviceId,
457 boolean completed) {
458 synchronized (deviceAuditStatus) {
459 if (completed) {
460 log.debug("deviceInitialAuditCompleted: AUDIT "
461 + "completed for device {}", deviceId);
462 deviceAuditStatus.put(deviceId, true);
463 // Execute all pending group requests
464 ConcurrentMap<GroupKey, StoredGroupEntry> pendingGroupRequests =
465 getPendingGroupKeyTable(deviceId);
466 for (Group group:pendingGroupRequests.values()) {
467 GroupDescription tmp = new DefaultGroupDescription(
468 group.deviceId(),
469 group.type(),
470 group.buckets(),
471 group.appCookie(),
472 group.appId());
473 storeGroupDescriptionInternal(tmp);
474 }
475 getPendingGroupKeyTable(deviceId).clear();
476 } else {
477 if (deviceAuditStatus.get(deviceId)) {
478 log.debug("deviceInitialAuditCompleted: Clearing AUDIT "
479 + "status for device {}", deviceId);
480 deviceAuditStatus.put(deviceId, false);
481 }
482 }
483 }
484 }
485
486 @Override
487 public boolean deviceInitialAuditStatus(DeviceId deviceId) {
488 synchronized (deviceAuditStatus) {
489 return (deviceAuditStatus.get(deviceId) != null)
490 ? deviceAuditStatus.get(deviceId) : false;
491 }
492 }
493
494 @Override
495 public void groupOperationFailed(DeviceId deviceId, GroupOperation operation) {
496
497 StoredGroupEntry existing = (groupEntriesById.get(
498 deviceId) != null) ?
499 groupEntriesById.get(deviceId).get(operation.groupId()) :
500 null;
501
502 if (existing == null) {
503 log.warn("No group entry with ID {} found ", operation.groupId());
504 return;
505 }
506
507 switch (operation.opType()) {
508 case ADD:
509 notifyDelegate(new GroupEvent(Type.GROUP_ADD_FAILED, existing));
510 break;
511 case MODIFY:
512 notifyDelegate(new GroupEvent(Type.GROUP_UPDATE_FAILED, existing));
513 break;
514 case DELETE:
515 notifyDelegate(new GroupEvent(Type.GROUP_REMOVE_FAILED, existing));
516 break;
517 default:
518 log.warn("Unknown group operation type {}", operation.opType());
519 }
520
521 ConcurrentMap<GroupKey, StoredGroupEntry> keyTable =
522 getGroupKeyTable(existing.deviceId());
523 ConcurrentMap<GroupId, StoredGroupEntry> idTable =
524 getGroupIdTable(existing.deviceId());
525 idTable.remove(existing.id());
526 keyTable.remove(existing.appCookie());
527 }
528
529 @Override
530 public void addOrUpdateExtraneousGroupEntry(Group group) {
531 ConcurrentMap<GroupId, Group> extraneousIdTable =
532 getExtraneousGroupIdTable(group.deviceId());
533 extraneousIdTable.put(group.id(), group);
534 // Check the reference counter
535 if (group.referenceCount() == 0) {
536 notifyDelegate(new GroupEvent(Type.GROUP_REMOVE_REQUESTED, group));
537 }
538 }
539
540 @Override
541 public void removeExtraneousGroupEntry(Group group) {
542 ConcurrentMap<GroupId, Group> extraneousIdTable =
543 getExtraneousGroupIdTable(group.deviceId());
544 extraneousIdTable.remove(group.id());
545 }
546
547 @Override
548 public Iterable<Group> getExtraneousGroups(DeviceId deviceId) {
549 // flatten and make iterator unmodifiable
550 return FluentIterable.from(
551 getExtraneousGroupIdTable(deviceId).values());
552 }
553
554
555}