blob: bb6fa05c33152071fdb4444b5341c5406b651fb0 [file] [log] [blame]
Shravan Ambati1e8471e2016-07-13 14:03:17 -07001/**
Brian O'Connor0a4e6742016-09-15 23:03:10 -07002 * Copyright 2016-present Open Networking Laboratory
Shravan Ambati1e8471e2016-07-13 14:03:17 -07003 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6
7 * http://www.apache.org/licenses/LICENSE-2.0
8
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15package org.onosproject.kafkaintegration.impl;
16
Shravan Ambati1e8471e2016-07-13 14:03:17 -070017import org.apache.felix.scr.annotations.Activate;
18import org.apache.felix.scr.annotations.Component;
Shravan Ambatia4875d82017-01-09 13:06:51 -080019import org.apache.felix.scr.annotations.Service;
Shravan Ambati1e8471e2016-07-13 14:03:17 -070020import org.apache.felix.scr.annotations.Deactivate;
21import org.apache.felix.scr.annotations.Reference;
22import org.apache.felix.scr.annotations.ReferenceCardinality;
23import org.onosproject.kafkaintegration.api.KafkaEventStorageService;
24import org.onosproject.kafkaintegration.api.dto.OnosEvent;
Shravan Ambatia4875d82017-01-09 13:06:51 -080025import org.onosproject.store.serializers.KryoNamespaces;
26import org.onosproject.store.service.Serializer;
Shravan Ambati1e8471e2016-07-13 14:03:17 -070027import org.onosproject.store.service.StorageService;
Shravan Ambatia4875d82017-01-09 13:06:51 -080028import org.onosproject.store.service.Task;
29import org.onosproject.store.service.WorkQueue;
Shravan Ambati1e8471e2016-07-13 14:03:17 -070030import org.slf4j.Logger;
31import org.slf4j.LoggerFactory;
32
Shravan Ambatia4875d82017-01-09 13:06:51 -080033import java.util.concurrent.CompletableFuture;
34import java.util.concurrent.ExecutionException;
Sanjana Agarwalcb4a3db2016-07-14 11:42:48 -070035
Shravan Ambati5a11e172016-07-21 15:55:28 -070036@Component(immediate = false)
Shravan Ambatia4875d82017-01-09 13:06:51 -080037@Service
Shravan Ambati1e8471e2016-07-13 14:03:17 -070038public class KafkaStorageManager implements KafkaEventStorageService {
39
40 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
41 protected StorageService storageService;
42
Shravan Ambati1e8471e2016-07-13 14:03:17 -070043 private final Logger log = LoggerFactory.getLogger(getClass());
44
Shravan Ambatia4875d82017-01-09 13:06:51 -080045 private static final String KAFKA_WORK_QUEUE = "Kafka-Work-Queue";
Shravan Ambati1e8471e2016-07-13 14:03:17 -070046
Shravan Ambatia4875d82017-01-09 13:06:51 -080047 private WorkQueue<OnosEvent> queue;
Shravan Ambati1e8471e2016-07-13 14:03:17 -070048
49 @Activate
50 protected void activate() {
Shravan Ambatia4875d82017-01-09 13:06:51 -080051 queue = storageService.<OnosEvent>getWorkQueue(KAFKA_WORK_QUEUE,
52 Serializer.using(KryoNamespaces.API,
53 OnosEvent.class,
54 OnosEvent.Type.class));
Shravan Ambati1e8471e2016-07-13 14:03:17 -070055
56 log.info("Started");
57 }
58
Shravan Ambati1e8471e2016-07-13 14:03:17 -070059 @Deactivate
60 protected void deactivate() {
Shravan Ambatia4875d82017-01-09 13:06:51 -080061 queue = null;
Shravan Ambati1e8471e2016-07-13 14:03:17 -070062 log.info("Stopped");
63 }
64
Shravan Ambatia4875d82017-01-09 13:06:51 -080065 @Override
66 public void publishEvent(OnosEvent e) {
67 queue.addOne(e);
68 log.debug("Published {} Event to Distributed Work Queue", e.type());
Shravan Ambati1e8471e2016-07-13 14:03:17 -070069 }
70
71 @Override
Shravan Ambatia4875d82017-01-09 13:06:51 -080072 public OnosEvent consumeEvent() {
73 Task<OnosEvent> task = null;
Shravan Ambati1e8471e2016-07-13 14:03:17 -070074
Shravan Ambatia4875d82017-01-09 13:06:51 -080075 CompletableFuture<Task<OnosEvent>> future = queue.take();
76 try {
77 task = future.get();
78 } catch (InterruptedException e) {
79 e.printStackTrace();
80 } catch (ExecutionException e) {
81 e.printStackTrace();
Shravan Ambati1e8471e2016-07-13 14:03:17 -070082 }
Shravan Ambatia4875d82017-01-09 13:06:51 -080083
84 if (task != null) {
85 queue.complete(task.taskId());
86 log.debug("Consumed {} Event from Distributed Work Queue with id {}",
87 task.payload().type(), task.taskId());
88 return task.payload();
89 }
90
91 return null;
Shravan Ambati1e8471e2016-07-13 14:03:17 -070092 }
93
94}