blob: 52d166cb733b6985ba8913a2b47f272fa3723a71 [file] [log] [blame]
/*
* Copyright 2014 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onlab.onos.store.intent.impl;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.IQueue;
import com.hazelcast.core.ItemEvent;
import com.hazelcast.core.ItemListener;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onlab.onos.cluster.ClusterService;
import org.onlab.onos.cluster.ControllerNode;
import org.onlab.onos.cluster.LeadershipEvent;
import org.onlab.onos.cluster.LeadershipEventListener;
import org.onlab.onos.cluster.LeadershipService;
import org.onlab.onos.core.ApplicationId;
import org.onlab.onos.core.CoreService;
import org.onlab.onos.net.intent.IntentBatchDelegate;
import org.onlab.onos.net.intent.IntentBatchService;
import org.onlab.onos.net.intent.IntentOperations;
import org.onlab.onos.store.hz.SQueue;
import org.onlab.onos.store.hz.StoreService;
import org.onlab.onos.store.serializers.KryoNamespaces;
import org.onlab.onos.store.serializers.KryoSerializer;
import org.onlab.onos.store.serializers.StoreSerializer;
import org.onlab.util.KryoNamespace;
import org.slf4j.Logger;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
import static org.slf4j.LoggerFactory.getLogger;
@Component(immediate = true)
@Service
public class HazelcastIntentBatchQueue
implements IntentBatchService {
private final Logger log = getLogger(getClass());
private static final String TOPIC_BASE = "intent-batch-";
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected CoreService coreService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClusterService clusterService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected LeadershipService leadershipService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected StoreService storeService;
private HazelcastInstance theInstance;
private ControllerNode localControllerNode;
protected StoreSerializer serializer;
private IntentBatchDelegate delegate;
private InternalLeaderListener leaderListener = new InternalLeaderListener();
private final Map<ApplicationId, SQueue<IntentOperations>> batchQueues
= Maps.newHashMap(); // FIXME make distributed?
private final Set<ApplicationId> myTopics = Sets.newHashSet();
private final Map<ApplicationId, IntentOperations> outstandingOps
= Maps.newHashMap();
@Activate
public void activate() {
theInstance = storeService.getHazelcastInstance();
localControllerNode = clusterService.getLocalNode();
leadershipService.addListener(leaderListener);
serializer = new KryoSerializer() {
@Override
protected void setupKryoPool() {
serializerPool = KryoNamespace.newBuilder()
.setRegistrationRequired(false)
.register(KryoNamespaces.API)
.nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID)
.build();
}
};
log.info("Started");
}
@Deactivate
public void deactivate() {
leadershipService.removeListener(leaderListener);
for (ApplicationId appId: batchQueues.keySet()) {
leadershipService.withdraw(getTopic(appId));
}
log.info("Stopped");
}
public static String getTopic(ApplicationId appId) {
return TOPIC_BASE + checkNotNull(appId.id());
}
public ApplicationId getAppId(String topic) {
checkState(topic.startsWith(TOPIC_BASE),
"Trying to get app id for invalid topic: {}", topic);
Short id = Short.parseShort(topic.substring(TOPIC_BASE.length()));
return coreService.getAppId(id);
}
private SQueue<IntentOperations> getQueue(ApplicationId appId) {
SQueue<IntentOperations> queue = batchQueues.get(appId);
if (queue == null) {
synchronized (this) {
String topic = getTopic(appId);
IQueue<byte[]> rawQueue = theInstance.getQueue(topic);
queue = new SQueue<>(rawQueue, serializer);
queue.addItemListener(new InternalItemListener(appId), false);
batchQueues.putIfAbsent(appId, queue);
leadershipService.runForLeadership(topic);
}
}
return queue;
}
@Override
public void addIntentOperations(IntentOperations ops) {
checkNotNull(ops, "Intent operations cannot be null.");
ApplicationId appId = ops.appId();
getQueue(appId).add(ops); // TODO consider using put here
dispatchNextOperation(appId);
}
@Override
public void removeIntentOperations(IntentOperations ops) {
ApplicationId appId = ops.appId();
synchronized (this) {
checkState(outstandingOps.get(appId).equals(ops),
"Operations not found.");
SQueue<IntentOperations> queue = batchQueues.get(appId);
// TODO consider alternatives to remove
checkState(queue.remove().equals(ops),
"Operations are wrong.");
outstandingOps.remove(appId);
dispatchNextOperation(appId);
}
}
/**
* Dispatches the next available operations to the delegate, unless
* we are not the leader for this application id or there is an
* outstanding operations for this application id.
*
* @param appId application id
*/
private void dispatchNextOperation(ApplicationId appId) {
synchronized (this) {
if (!myTopics.contains(appId) ||
outstandingOps.containsKey(appId)) {
return;
}
IntentOperations ops = batchQueues.get(appId).peek();
if (ops != null) {
outstandingOps.put(appId, ops);
delegate.execute(ops);
}
}
}
/**
* Record the leadership change for the given topic. If we have become the
* leader, then dispatch the next operations. If we have lost leadership,
* then cancel the last operations.
*
* @param topic topic based on application id
* @param leader true if we have become the leader, false otherwise
*/
private void leaderChanged(String topic, boolean leader) {
ApplicationId appId = getAppId(topic);
//TODO we are using the event caller's thread, should we use our own?
synchronized (this) {
if (leader) {
myTopics.add(appId);
checkState(!outstandingOps.containsKey(appId),
"Existing intent ops for app id: {}", appId);
dispatchNextOperation(appId);
} else {
myTopics.remove(appId);
IntentOperations ops = outstandingOps.get(appId);
if (ops != null) {
delegate.cancel(ops);
}
outstandingOps.remove(appId);
}
}
}
private class InternalItemListener implements ItemListener<IntentOperations> {
private final ApplicationId appId;
public InternalItemListener(ApplicationId appId) {
this.appId = appId;
}
@Override
public void itemAdded(ItemEvent<IntentOperations> item) {
dispatchNextOperation(appId);
}
@Override
public void itemRemoved(ItemEvent<IntentOperations> item) {
// no-op
}
}
private class InternalLeaderListener implements LeadershipEventListener {
@Override
public void event(LeadershipEvent event) {
log.trace("Leadership Event: time = {} type = {} event = {}",
event.time(), event.type(), event);
String topic = event.subject().topic();
if (!topic.startsWith(TOPIC_BASE)) {
return; // Not our topic: ignore
}
if (!event.subject().leader().id().equals(localControllerNode.id())) {
// run for leadership
getQueue(getAppId(topic));
return; // The event is not about this instance: ignore
}
switch (event.type()) {
case LEADER_ELECTED:
log.info("Elected leader for app {}", getAppId(topic));
leaderChanged(topic, true);
break;
case LEADER_BOOTED:
log.info("Lost leader election for app {}", getAppId(topic));
leaderChanged(topic, false);
break;
case LEADER_REELECTED:
break;
default:
break;
}
}
}
@Override
public Set<IntentOperations> getPendingOperations() {
Set<IntentOperations> ops = Sets.newHashSet();
synchronized (this) {
for (SQueue<IntentOperations> queue : batchQueues.values()) {
ops.addAll(queue);
}
return ops;
}
}
@Override
public Set<IntentOperations> getCurrentOperations() {
//FIXME this is not really implemented
return Collections.emptySet();
}
@Override
public boolean isLocalLeader(ApplicationId applicationId) {
return myTopics.contains(applicationId);
}
@Override
public void setDelegate(IntentBatchDelegate delegate) {
this.delegate = checkNotNull(delegate, "Delegate cannot be null");
}
@Override
public void unsetDelegate(IntentBatchDelegate delegate) {
if (this.delegate != null && this.delegate.equals(delegate)) {
this.delegate = null;
}
}
}