blob: 09a558d464b95d0c80d46101267b70b21407f2f2 [file] [log] [blame]
Brian O'Connor6de2e202015-05-21 14:30:41 -07001package org.onosproject.incubator.store.resource.impl;
jccde3e92e2015-03-28 01:40:44 -07002
3import static org.onlab.util.Tools.groupedThreads;
4import static org.slf4j.LoggerFactory.getLogger;
5
6import java.util.Collection;
7import java.util.Collections;
8import java.util.HashSet;
9import java.util.Iterator;
10import java.util.Map;
11import java.util.Set;
12import java.util.concurrent.ExecutionException;
13import java.util.concurrent.ExecutorService;
14import java.util.concurrent.Executors;
15import java.util.concurrent.Future;
16import java.util.concurrent.TimeUnit;
17import java.util.concurrent.TimeoutException;
18import java.util.concurrent.locks.ReentrantReadWriteLock;
19
20import org.apache.felix.scr.annotations.Activate;
21import org.apache.felix.scr.annotations.Component;
22import org.apache.felix.scr.annotations.Deactivate;
23import org.apache.felix.scr.annotations.Reference;
24import org.apache.felix.scr.annotations.ReferenceCardinality;
25import org.apache.felix.scr.annotations.Service;
26import org.onlab.util.KryoNamespace;
27import org.onosproject.cluster.ClusterService;
28import org.onosproject.net.Device;
29import org.onosproject.net.DeviceId;
30import org.onosproject.net.device.DeviceService;
Brian O'Connor6de2e202015-05-21 14:30:41 -070031import org.onosproject.incubator.net.resource.label.DefaultLabelResource;
32import org.onosproject.incubator.net.resource.label.LabelResource;
33import org.onosproject.incubator.net.resource.label.LabelResourceDelegate;
34import org.onosproject.incubator.net.resource.label.LabelResourceEvent;
35import org.onosproject.incubator.net.resource.label.LabelResourceEvent.Type;
36import org.onosproject.incubator.net.resource.label.LabelResourceId;
37import org.onosproject.incubator.net.resource.label.LabelResourcePool;
38import org.onosproject.incubator.net.resource.label.LabelResourceRequest;
39import org.onosproject.incubator.net.resource.label.LabelResourceStore;
jccde3e92e2015-03-28 01:40:44 -070040import org.onosproject.store.AbstractStore;
41import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
42import org.onosproject.store.cluster.messaging.ClusterMessage;
43import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
44import org.onosproject.store.flow.ReplicaInfo;
45import org.onosproject.store.flow.ReplicaInfoService;
46import org.onosproject.store.serializers.KryoNamespaces;
47import org.onosproject.store.serializers.KryoSerializer;
Brian O'Connor6de2e202015-05-21 14:30:41 -070048import org.onosproject.store.serializers.custom.DistributedStoreSerializers;
jccde3e92e2015-03-28 01:40:44 -070049import org.onosproject.store.service.ConsistentMap;
50import org.onosproject.store.service.Serializer;
51import org.onosproject.store.service.StorageService;
52import org.slf4j.Logger;
53
54import com.google.common.collect.ImmutableSet;
55import com.google.common.collect.Multimap;
56
57/**
58 * Manages label resources using copycat.
59 */
60@Component(immediate = true, enabled = true)
61@Service
62public class DistributedLabelResourceStore
63 extends AbstractStore<LabelResourceEvent, LabelResourceDelegate>
64 implements LabelResourceStore {
65 private final Logger log = getLogger(getClass());
66
67 private static final String POOL_MAP_NAME = "labelresourcepool";
68
69 private static final String GLOBAL_RESOURCE_POOL_DEVICE_ID = "global_resource_pool_device_id";
70 // primary data:
71 // read/write needs to be locked
72 private final ReentrantReadWriteLock resourcePoolLock = new ReentrantReadWriteLock();
73
74 private ConsistentMap<DeviceId, LabelResourcePool> resourcePool = null;
75
76 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
77 protected StorageService storageService;
78
79 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
80 protected ReplicaInfoService replicaInfoManager;
81
82 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
83 protected ClusterCommunicationService clusterCommunicator;
84
85 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
86 protected ClusterService clusterService;
87
88 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
89 protected DeviceService deviceService;
90
91 private ExecutorService messageHandlingExecutor;
92 private static final int MESSAGE_HANDLER_THREAD_POOL_SIZE = 8;
93 private static final long PEER_REQUEST_TIMEOUT_MS = 5000;
94
95 protected static final KryoSerializer SERIALIZER = new KryoSerializer() {
96 @Override
97 protected void setupKryoPool() {
98 serializerPool = KryoNamespace.newBuilder()
99 .register(DistributedStoreSerializers.STORE_COMMON)
100 .nextId(DistributedStoreSerializers.STORE_CUSTOM_BEGIN)
101 .register(LabelResourceEvent.class)
102 .register(LabelResourcePool.class).register(DeviceId.class)
103 .register(LabelResourceRequest.class)
104 .register(LabelResourceRequest.Type.class)
105 .register(LabelResourceEvent.Type.class)
106 .register(DefaultLabelResource.class)
107 .register(LabelResourceId.class).build();
108 }
109 };
110
111 @Activate
112 public void activate() {
113
114 resourcePool = storageService
115 .<DeviceId, LabelResourcePool>consistentMapBuilder()
116 .withName(POOL_MAP_NAME).withSerializer(new Serializer() {
117 KryoNamespace kryo = new KryoNamespace.Builder()
118 .register(KryoNamespaces.API).build();
119
120 @Override
121 public <T> byte[] encode(T object) {
122 return kryo.serialize(object);
123 }
124
125 @Override
126 public <T> T decode(byte[] bytes) {
127 return kryo.deserialize(bytes);
128 }
129 }).withPartitionsDisabled().build();
130 messageHandlingExecutor = Executors
131 .newFixedThreadPool(MESSAGE_HANDLER_THREAD_POOL_SIZE,
132 groupedThreads("onos/store/flow",
133 "message-handlers"));
134 clusterCommunicator
135 .addSubscriber(LabelResourceMessageSubjects.LABEL_POOL_CREATED,
136 new ClusterMessageHandler() {
137
138 @Override
139 public void handle(ClusterMessage message) {
140 LabelResourcePool operation = SERIALIZER
141 .decode(message.payload());
142 log.trace("received get flow entry request for {}",
143 operation);
144 boolean b = internalCreate(operation);
145 message.respond(SERIALIZER.encode(b));
146 }
147 }, messageHandlingExecutor);
148 clusterCommunicator
149 .addSubscriber(LabelResourceMessageSubjects.LABEL_POOL_DESTROYED,
150 new ClusterMessageHandler() {
151
152 @Override
153 public void handle(ClusterMessage message) {
154 DeviceId deviceId = SERIALIZER
155 .decode(message.payload());
156 log.trace("received get flow entry request for {}",
157 deviceId);
158 boolean b = internalDestroy(deviceId);
159 message.respond(SERIALIZER.encode(b));
160 }
161 }, messageHandlingExecutor);
162 clusterCommunicator
163 .addSubscriber(LabelResourceMessageSubjects.LABEL_POOL_APPLY,
164 new ClusterMessageHandler() {
165
166 @Override
167 public void handle(ClusterMessage message) {
168 LabelResourceRequest request = SERIALIZER
169 .decode(message.payload());
170 log.trace("received get flow entry request for {}",
171 request);
172 final Collection<LabelResource> resource = internalApply(request);
173 message.respond(SERIALIZER
174 .encode(resource));
175 }
176 }, messageHandlingExecutor);
177 clusterCommunicator
178 .addSubscriber(LabelResourceMessageSubjects.LABEL_POOL_RELEASE,
179 new ClusterMessageHandler() {
180
181 @Override
182 public void handle(ClusterMessage message) {
183 LabelResourceRequest request = SERIALIZER
184 .decode(message.payload());
185 log.trace("received get flow entry request for {}",
186 request);
187 final boolean isSuccess = internalRelease(request);
188 message.respond(SERIALIZER
189 .encode(isSuccess));
190 }
191 }, messageHandlingExecutor);
192 log.info("Started");
193 }
194
195 @Deactivate
196 public void deactivate() {
197 clusterCommunicator
198 .removeSubscriber(LabelResourceMessageSubjects.LABEL_POOL_CREATED);
199 clusterCommunicator
200 .removeSubscriber(LabelResourceMessageSubjects.LABEL_POOL_APPLY);
201 clusterCommunicator
202 .removeSubscriber(LabelResourceMessageSubjects.LABEL_POOL_DESTROYED);
203 clusterCommunicator
204 .removeSubscriber(LabelResourceMessageSubjects.LABEL_POOL_RELEASE);
205 messageHandlingExecutor.shutdown();
206 log.info("Stopped");
207 }
208
209 @Override
210 public boolean createDevicePool(DeviceId deviceId,
211 LabelResourceId beginLabel,
212 LabelResourceId endLabel) {
213 LabelResourcePool pool = new LabelResourcePool(deviceId.toString(),
214 beginLabel.labelId(),
215 endLabel.labelId());
216 return this.create(pool);
217 }
218
219 @Override
220 public boolean createGlobalPool(LabelResourceId beginLabel,
221 LabelResourceId endLabel) {
222 LabelResourcePool pool = new LabelResourcePool(
223 GLOBAL_RESOURCE_POOL_DEVICE_ID,
224 beginLabel.labelId(),
225 endLabel.labelId());
226 return this.internalCreate(pool);
227 }
228
229 private boolean create(LabelResourcePool pool) {
230 Device device = (Device) deviceService.getDevice(pool.deviceId());
231 if (device == null) {
232 return false;
233 }
234
235 ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(pool
236 .deviceId());
237
238 if (!replicaInfo.master().isPresent()) {
239 log.warn("Failed to getFlowEntries: No master for {}", pool);
240 return false;
241 }
242
243 if (replicaInfo.master().get()
244 .equals(clusterService.getLocalNode().id())) {
245 return internalCreate(pool);
246 }
247
248 log.trace("Forwarding getFlowEntries to {}, which is the primary (master) for device {}",
249 replicaInfo.master().orNull(), pool.deviceId());
250
251 return complete(clusterCommunicator
252 .sendAndReceive(pool,
253 LabelResourceMessageSubjects.LABEL_POOL_CREATED,
254 SERIALIZER::encode, SERIALIZER::decode,
255 replicaInfo.master().get()));
256 }
257
258 private boolean internalCreate(LabelResourcePool pool) {
259 resourcePoolLock.writeLock().lock();
260 LabelResourcePool poolOld = resourcePool.get(pool.deviceId()).value();
261 if (poolOld == null) {
262 resourcePool.put(pool.deviceId(), pool);
263 resourcePoolLock.writeLock().unlock();
264 LabelResourceEvent event = new LabelResourceEvent(
265 Type.POOL_CREATED,
266 pool);
267 notifyDelegate(event);
268 return true;
269 }
270 resourcePoolLock.writeLock().unlock();
271 return false;
272 }
273
274 @Override
275 public boolean destroyDevicePool(DeviceId deviceId) {
276 Device device = (Device) deviceService.getDevice(deviceId);
277 if (device == null) {
278 return false;
279 }
280 ReplicaInfo replicaInfo = replicaInfoManager
281 .getReplicaInfoFor(deviceId);
282
283 if (!replicaInfo.master().isPresent()) {
284 log.warn("Failed to getFlowEntries: No master for {}", deviceId);
285 return false;
286 }
287
288 if (replicaInfo.master().get()
289 .equals(clusterService.getLocalNode().id())) {
290 return internalDestroy(deviceId);
291 }
292
293 log.trace("Forwarding getFlowEntries to {}, which is the primary (master) for device {}",
294 replicaInfo.master().orNull(), deviceId);
295
296 return complete(clusterCommunicator
297 .sendAndReceive(deviceId,
298 LabelResourceMessageSubjects.LABEL_POOL_DESTROYED,
299 SERIALIZER::encode, SERIALIZER::decode,
300 replicaInfo.master().get()));
301 }
302
303 private boolean internalDestroy(DeviceId deviceId) {
304 LabelResourcePool poolOld = resourcePool.get(deviceId).value();
305 if (poolOld != null) {
306 resourcePool.remove(deviceId);
307 LabelResourceEvent event = new LabelResourceEvent(
308 Type.POOL_CREATED,
309 poolOld);
310 notifyDelegate(event);
311 }
312 log.info("success to destroy the label resource pool of device id {}",
313 deviceId);
314 return true;
315 }
316
317 @Override
318 public Collection<LabelResource> applyFromDevicePool(DeviceId deviceId,
319 long applyNum) {
320 Device device = (Device) deviceService.getDevice(deviceId);
321 if (device == null) {
322 return Collections.emptyList();
323 }
324 LabelResourceRequest request = new LabelResourceRequest(
325 deviceId,
326 LabelResourceRequest.Type.APPLY,
327 applyNum, null);
328 ReplicaInfo replicaInfo = replicaInfoManager
329 .getReplicaInfoFor(deviceId);
330
331 if (!replicaInfo.master().isPresent()) {
332 log.warn("Failed to getFlowEntries: No master for {}", deviceId);
333 return Collections.emptyList();
334 }
335
336 if (replicaInfo.master().get()
337 .equals(clusterService.getLocalNode().id())) {
338 return internalApply(request);
339 }
340
341 log.trace("Forwarding getFlowEntries to {}, which is the primary (master) for device {}",
342 replicaInfo.master().orNull(), deviceId);
343
344 return complete(clusterCommunicator
345 .sendAndReceive(request,
346 LabelResourceMessageSubjects.LABEL_POOL_APPLY,
347 SERIALIZER::encode, SERIALIZER::decode,
348 replicaInfo.master().get()));
349 }
350
351 private Collection<LabelResource> internalApply(LabelResourceRequest request) {
352 resourcePoolLock.writeLock().lock();
353 DeviceId deviceId = request.deviceId();
354 long applyNum = request.applyNum();
355 LabelResourcePool pool = resourcePool.get(deviceId).value();
356 Collection<LabelResource> result = new HashSet<LabelResource>();
357 long freeNum = this.getFreeNumOfDevicePool(deviceId);
358 if (applyNum > freeNum) {
359 log.info("the free number of the label resource pool of deviceId {} is not enough.");
360 resourcePoolLock.writeLock().unlock();
361 return Collections.emptyList();
362 }
363 Set<LabelResource> releaseLabels = new HashSet<LabelResource>(
364 pool.releaseLabelId());
365 long tmp = releaseLabels.size() > applyNum ? applyNum : releaseLabels
366 .size();
367 LabelResource resource = null;
368 for (int i = 0; i < tmp; i++) {
369 Iterator<LabelResource> it = releaseLabels.iterator();
370 if (it.hasNext()) {
371 resource = it.next();
372 releaseLabels.remove(resource);
373 }
374 result.add(resource);
375 }
376 for (long j = pool.currentUsedMaxLabelId().labelId(); j < pool
377 .currentUsedMaxLabelId().labelId() + applyNum - tmp; j++) {
378 resource = new DefaultLabelResource(deviceId,
379 LabelResourceId
380 .labelResourceId(j));
381 result.add(resource);
382 }
383 long beginLabel = pool.beginLabel().labelId();
384 long endLabel = pool.endLabel().labelId();
385 long totalNum = pool.totalNum();
386 long current = pool.currentUsedMaxLabelId().labelId() + applyNum - tmp;
387 long usedNum = pool.usedNum() + applyNum;
388 ImmutableSet<LabelResource> freeLabel = ImmutableSet
389 .copyOf(releaseLabels);
390 LabelResourcePool newPool = new LabelResourcePool(deviceId.toString(),
391 beginLabel, endLabel,
392 totalNum, usedNum,
393 current, freeLabel);
394 resourcePool.put(deviceId, newPool);
395 log.info("success to apply label resource");
396 resourcePoolLock.writeLock().unlock();
397 return result;
398 }
399
400 @Override
401 public boolean releaseToDevicePool(Multimap<DeviceId, LabelResource> release) {
402 Map<DeviceId, Collection<LabelResource>> maps = release.asMap();
403 Set<DeviceId> deviceIdSet = maps.keySet();
404 LabelResourceRequest request = null;
405 for (Iterator<DeviceId> it = deviceIdSet.iterator(); it.hasNext();) {
406 DeviceId deviceId = (DeviceId) it.next();
407 Device device = (Device) deviceService.getDevice(deviceId);
408 if (device == null) {
409 continue;
410 }
411 ImmutableSet<LabelResource> collection = ImmutableSet.copyOf(maps
412 .get(deviceId));
413 request = new LabelResourceRequest(
414 deviceId,
415 LabelResourceRequest.Type.RELEASE,
416 0, collection);
417 ReplicaInfo replicaInfo = replicaInfoManager
418 .getReplicaInfoFor(deviceId);
419
420 if (!replicaInfo.master().isPresent()) {
421 log.warn("Failed to getFlowEntries: No master for {}", deviceId);
422 return false;
423 }
424
425 if (replicaInfo.master().get()
426 .equals(clusterService.getLocalNode().id())) {
427 return internalRelease(request);
428 }
429
430 log.trace("Forwarding getFlowEntries to {}, which is the primary (master) for device {}",
431 replicaInfo.master().orNull(), deviceId);
432
433 return complete(clusterCommunicator
434 .sendAndReceive(request,
435 LabelResourceMessageSubjects.LABEL_POOL_RELEASE,
436 SERIALIZER::encode, SERIALIZER::decode,
437 replicaInfo.master().get()));
438 }
439 return false;
440 }
441
442 private boolean internalRelease(LabelResourceRequest request) {
443 resourcePoolLock.writeLock().lock();
444 DeviceId deviceId = request.deviceId();
445 Collection<LabelResource> release = request.releaseCollection();
446 LabelResourcePool pool = resourcePool.get(deviceId).value();
447 if (pool == null) {
448 resourcePoolLock.writeLock().unlock();
449 log.info("the label resource pool of device id {} does not exist");
450 return false;
451 }
452 Set<LabelResource> storeSet = new HashSet<LabelResource>(
453 pool.releaseLabelId());
454 LabelResource labelResource = null;
455 long realReleasedNum = 0;
456 for (Iterator<LabelResource> it = release.iterator(); it.hasNext();) {
457 labelResource = it.next();
458 if (labelResource.labelResourceId().labelId() < pool.beginLabel()
459 .labelId()
460 || labelResource.labelResourceId().labelId() > pool
461 .endLabel().labelId()) {
462 continue;
463 }
464 if (pool.currentUsedMaxLabelId().labelId() > labelResource
465 .labelResourceId().labelId()
466 || !storeSet.contains(labelResource)) {
467 storeSet.add(labelResource);
468 realReleasedNum++;
469 }
470 }
471 long beginNum = pool.beginLabel().labelId();
472 long endNum = pool.endLabel().labelId();
473 long totalNum = pool.totalNum();
474 long usedNum = pool.usedNum() - realReleasedNum;
475 long current = pool.currentUsedMaxLabelId().labelId();
476 ImmutableSet<LabelResource> s = ImmutableSet.copyOf(storeSet);
477 LabelResourcePool newPool = new LabelResourcePool(deviceId.toString(),
478 beginNum, endNum,
479 totalNum, usedNum,
480 current, s);
481 resourcePool.put(deviceId, newPool);
482 log.info("success to release label resource");
483 resourcePoolLock.writeLock().unlock();
484 return true;
485 }
486
487 @Override
488 public boolean isDevicePoolFull(DeviceId deviceId) {
489 LabelResourcePool pool = resourcePool.get(deviceId).value();
490 if (pool == null) {
491 return true;
492 }
493 return pool.currentUsedMaxLabelId() == pool.endLabel()
494 && pool.releaseLabelId().size() == 0 ? true : false;
495 }
496
497 @Override
498 public long getFreeNumOfDevicePool(DeviceId deviceId) {
499 LabelResourcePool pool = resourcePool.get(deviceId).value();
500 if (pool == null) {
501 return 0;
502 }
503 return pool.endLabel().labelId()
504 - pool.currentUsedMaxLabelId().labelId()
505 + pool.releaseLabelId().size();
506 }
507
508 @Override
509 public LabelResourcePool getDeviceLabelResourcePool(DeviceId deviceId) {
510 return resourcePool.get(deviceId).value();
511 }
512
513 @Override
514 public boolean destroyGlobalPool() {
515 return this.internalDestroy(DeviceId
516 .deviceId(GLOBAL_RESOURCE_POOL_DEVICE_ID));
517 }
518
519 @Override
520 public Collection<LabelResource> applyFromGlobalPool(long applyNum) {
521 LabelResourceRequest request = new LabelResourceRequest(
522 DeviceId.deviceId(GLOBAL_RESOURCE_POOL_DEVICE_ID),
523 LabelResourceRequest.Type.APPLY,
524 applyNum, null);
525 return this.internalApply(request);
526 }
527
528 @Override
529 public boolean releaseToGlobalPool(Set<LabelResourceId> release) {
530 Set<LabelResource> set = new HashSet<LabelResource>();
531 DefaultLabelResource resource = null;
532 for (LabelResourceId labelResource : release) {
533 resource = new DefaultLabelResource(
534 DeviceId.deviceId(GLOBAL_RESOURCE_POOL_DEVICE_ID),
535 labelResource);
536 set.add(resource);
537 }
538 LabelResourceRequest request = new LabelResourceRequest(
539 DeviceId.deviceId(GLOBAL_RESOURCE_POOL_DEVICE_ID),
540 LabelResourceRequest.Type.APPLY,
541 0,
542 ImmutableSet
543 .copyOf(set));
544 return this.internalRelease(request);
545 }
546
547 @Override
548 public boolean isGlobalPoolFull() {
549 return this.isDevicePoolFull(DeviceId
550 .deviceId(GLOBAL_RESOURCE_POOL_DEVICE_ID));
551 }
552
553 @Override
554 public long getFreeNumOfGlobalPool() {
555 return this.getFreeNumOfDevicePool(DeviceId
556 .deviceId(GLOBAL_RESOURCE_POOL_DEVICE_ID));
557 }
558
559 @Override
560 public LabelResourcePool getGlobalLabelResourcePool() {
561 return this.getDeviceLabelResourcePool(DeviceId
562 .deviceId(GLOBAL_RESOURCE_POOL_DEVICE_ID));
563 }
564
565 private <T> T complete(Future<T> future) {
566 try {
567 return future.get(PEER_REQUEST_TIMEOUT_MS,
568 TimeUnit.MILLISECONDS);
569 } catch (InterruptedException e) {
570 Thread.currentThread().interrupt();
571 log.error("Interrupted while waiting for operation to complete.", e);
572 return null;
573 } catch (TimeoutException | ExecutionException e) {
574 log.error("Failed remote operation", e);
575 return null;
576 }
577 }
578}