blob: 4b9035e4a6c2da418c937788cec5ea591d0536fe [file] [log] [blame]
/**
* Copyright 2016 Open Networking Laboratory
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.kafkaintegration.impl;
import java.util.TreeMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.onosproject.kafkaintegration.api.KafkaEventStorageService;
import org.onosproject.kafkaintegration.api.dto.OnosEvent;
import org.onosproject.store.service.AtomicValue;
import org.onosproject.store.service.StorageService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@Component(immediate = true)
public class KafkaStorageManager implements KafkaEventStorageService {
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected StorageService storageService;
private TreeMap<Long, OnosEvent> kafkaEventStore;
private AtomicValue<Long> lastPublishedEvent;
private final Logger log = LoggerFactory.getLogger(getClass());
private ScheduledExecutorService gcExService;
private InternalGarbageCollector gcTask;
// Thread scheduler parameters.
private final long delay = 0;
private final long period = 1;
@Activate
protected void activate() {
kafkaEventStore = new TreeMap<Long, OnosEvent>();
lastPublishedEvent = storageService.<Long>atomicValueBuilder()
.withName("onos-app-kafka-published-seqNumber").build()
.asAtomicValue();
startGC();
log.info("Started");
}
private void startGC() {
log.info("Starting Garbage Collection Service");
gcExService = Executors.newSingleThreadScheduledExecutor();
gcTask = new InternalGarbageCollector();
gcExService.scheduleAtFixedRate(gcTask, delay, period,
TimeUnit.SECONDS);
}
@Deactivate
protected void deactivate() {
stopGC();
log.info("Stopped");
}
private void stopGC() {
log.info("Stopping Garbage Collection Service");
gcExService.shutdown();
}
@Override
public boolean insertCacheEntry(OnosEvent e) {
// TODO: Fill in the code once the event carries timestamp info.
return true;
}
@Override
public void updateLastPublishedEntry(Long sequenceNumber) {
this.lastPublishedEvent.set(sequenceNumber);
}
/**
* Removes events from the Kafka Event Store which have been published.
*
*/
private class InternalGarbageCollector implements Runnable {
@Override
public void run() {
kafkaEventStore.headMap(lastPublishedEvent.get(), true).clear();
}
}
}