blob: 4b9035e4a6c2da418c937788cec5ea591d0536fe [file] [log] [blame]
Shravan Ambati1e8471e2016-07-13 14:03:17 -07001/**
2 * Copyright 2016 Open Networking Laboratory
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6
7 * http://www.apache.org/licenses/LICENSE-2.0
8
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15package org.onosproject.kafkaintegration.impl;
16
17import java.util.TreeMap;
18import java.util.concurrent.Executors;
19import java.util.concurrent.ScheduledExecutorService;
20import java.util.concurrent.TimeUnit;
21
22import org.apache.felix.scr.annotations.Activate;
23import org.apache.felix.scr.annotations.Component;
24import org.apache.felix.scr.annotations.Deactivate;
25import org.apache.felix.scr.annotations.Reference;
26import org.apache.felix.scr.annotations.ReferenceCardinality;
27import org.onosproject.kafkaintegration.api.KafkaEventStorageService;
28import org.onosproject.kafkaintegration.api.dto.OnosEvent;
29import org.onosproject.store.service.AtomicValue;
30import org.onosproject.store.service.StorageService;
31import org.slf4j.Logger;
32import org.slf4j.LoggerFactory;
33
34@Component(immediate = true)
35public class KafkaStorageManager implements KafkaEventStorageService {
36
37 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
38 protected StorageService storageService;
39
40 private TreeMap<Long, OnosEvent> kafkaEventStore;
41
42 private AtomicValue<Long> lastPublishedEvent;
43
44 private final Logger log = LoggerFactory.getLogger(getClass());
45
46 private ScheduledExecutorService gcExService;
47
48 private InternalGarbageCollector gcTask;
49
50 // Thread scheduler parameters.
51 private final long delay = 0;
52 private final long period = 1;
53
54 @Activate
55 protected void activate() {
56 kafkaEventStore = new TreeMap<Long, OnosEvent>();
57 lastPublishedEvent = storageService.<Long>atomicValueBuilder()
58 .withName("onos-app-kafka-published-seqNumber").build()
59 .asAtomicValue();
60
61 startGC();
62
63 log.info("Started");
64 }
65
66 private void startGC() {
67 log.info("Starting Garbage Collection Service");
68 gcExService = Executors.newSingleThreadScheduledExecutor();
69 gcTask = new InternalGarbageCollector();
70 gcExService.scheduleAtFixedRate(gcTask, delay, period,
71 TimeUnit.SECONDS);
72 }
73
74 @Deactivate
75 protected void deactivate() {
76 stopGC();
77 log.info("Stopped");
78 }
79
80 private void stopGC() {
81 log.info("Stopping Garbage Collection Service");
82 gcExService.shutdown();
83 }
84
85 @Override
86 public boolean insertCacheEntry(OnosEvent e) {
87 // TODO: Fill in the code once the event carries timestamp info.
88 return true;
89 }
90
91 @Override
92 public void updateLastPublishedEntry(Long sequenceNumber) {
93 this.lastPublishedEvent.set(sequenceNumber);
94 }
95
96 /**
97 * Removes events from the Kafka Event Store which have been published.
98 *
99 */
100 private class InternalGarbageCollector implements Runnable {
101
102 @Override
103 public void run() {
104 kafkaEventStore.headMap(lastPublishedEvent.get(), true).clear();
105 }
106 }
107
108}