blob: 8ea15a2fba556ce7881ca5ebacecd3d568c96b6e [file] [log] [blame]
Shravan Ambati1e8471e2016-07-13 14:03:17 -07001/**
Brian O'Connor0a4e6742016-09-15 23:03:10 -07002 * Copyright 2016-present Open Networking Laboratory
Shravan Ambati1e8471e2016-07-13 14:03:17 -07003 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6
7 * http://www.apache.org/licenses/LICENSE-2.0
8
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15package org.onosproject.kafkaintegration.impl;
16
Shravan Ambati1e8471e2016-07-13 14:03:17 -070017import org.apache.felix.scr.annotations.Activate;
18import org.apache.felix.scr.annotations.Component;
19import org.apache.felix.scr.annotations.Deactivate;
20import org.apache.felix.scr.annotations.Reference;
21import org.apache.felix.scr.annotations.ReferenceCardinality;
22import org.onosproject.kafkaintegration.api.KafkaEventStorageService;
23import org.onosproject.kafkaintegration.api.dto.OnosEvent;
24import org.onosproject.store.service.AtomicValue;
25import org.onosproject.store.service.StorageService;
26import org.slf4j.Logger;
27import org.slf4j.LoggerFactory;
28
Sanjana Agarwalcb4a3db2016-07-14 11:42:48 -070029import java.util.TreeMap;
30import java.util.concurrent.Executors;
31import java.util.concurrent.ScheduledExecutorService;
32import java.util.concurrent.TimeUnit;
33
Shravan Ambati4c6295e2016-12-20 14:53:06 -080034/**
35 * TODO: This code is not being used at the moment.
36 * This will be modified to use Distributed Work Queue.
37 * Please see clustering section of
38 * https://wiki.onosproject.org/display/ONOS/Kafka+Integration
39 */
Shravan Ambati5a11e172016-07-21 15:55:28 -070040@Component(immediate = false)
Shravan Ambati1e8471e2016-07-13 14:03:17 -070041public class KafkaStorageManager implements KafkaEventStorageService {
42
43 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
44 protected StorageService storageService;
45
46 private TreeMap<Long, OnosEvent> kafkaEventStore;
47
48 private AtomicValue<Long> lastPublishedEvent;
49
50 private final Logger log = LoggerFactory.getLogger(getClass());
51
52 private ScheduledExecutorService gcExService;
53
54 private InternalGarbageCollector gcTask;
55
56 // Thread scheduler parameters.
57 private final long delay = 0;
58 private final long period = 1;
59
60 @Activate
61 protected void activate() {
62 kafkaEventStore = new TreeMap<Long, OnosEvent>();
63 lastPublishedEvent = storageService.<Long>atomicValueBuilder()
64 .withName("onos-app-kafka-published-seqNumber").build()
65 .asAtomicValue();
66
67 startGC();
68
69 log.info("Started");
70 }
71
72 private void startGC() {
73 log.info("Starting Garbage Collection Service");
74 gcExService = Executors.newSingleThreadScheduledExecutor();
75 gcTask = new InternalGarbageCollector();
76 gcExService.scheduleAtFixedRate(gcTask, delay, period,
77 TimeUnit.SECONDS);
78 }
79
80 @Deactivate
81 protected void deactivate() {
82 stopGC();
83 log.info("Stopped");
84 }
85
86 private void stopGC() {
87 log.info("Stopping Garbage Collection Service");
88 gcExService.shutdown();
89 }
90
91 @Override
92 public boolean insertCacheEntry(OnosEvent e) {
93 // TODO: Fill in the code once the event carries timestamp info.
94 return true;
95 }
96
97 @Override
98 public void updateLastPublishedEntry(Long sequenceNumber) {
99 this.lastPublishedEvent.set(sequenceNumber);
100 }
101
102 /**
103 * Removes events from the Kafka Event Store which have been published.
104 *
105 */
106 private class InternalGarbageCollector implements Runnable {
107
108 @Override
109 public void run() {
110 kafkaEventStore.headMap(lastPublishedEvent.get(), true).clear();
111 }
112 }
113
114}