blob: b8f2080d5e8b3ea3d5cfa22a91f0eaa76094e679 [file] [log] [blame]
/*
* Copyright 2015 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.incubator.store.meter.impl;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Property;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.onlab.util.KryoNamespace;
import org.onlab.util.Tools;
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.NodeId;
import org.onosproject.incubator.net.meter.DefaultBand;
import org.onosproject.incubator.net.meter.DefaultMeter;
import org.onosproject.incubator.net.meter.Meter;
import org.onosproject.incubator.net.meter.MeterEvent;
import org.onosproject.incubator.net.meter.MeterFailReason;
import org.onosproject.incubator.net.meter.MeterId;
import org.onosproject.incubator.net.meter.MeterOperation;
import org.onosproject.incubator.net.meter.MeterStore;
import org.onosproject.incubator.net.meter.MeterStoreDelegate;
import org.onosproject.mastership.MastershipService;
import org.onosproject.store.AbstractStore;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
import org.onosproject.store.cluster.messaging.MessageSubject;
import org.onosproject.store.service.ConsistentMap;
import org.onosproject.store.service.Serializer;
import org.onosproject.store.service.StorageService;
import org.slf4j.Logger;
import java.util.Collection;
import java.util.Objects;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
import static org.slf4j.LoggerFactory.getLogger;
/**
* A distributed meter store implementation. Meters are stored consistently
* across the cluster.
*/
public class DistributedMeterStore extends AbstractStore<MeterEvent, MeterStoreDelegate>
implements MeterStore {
private Logger log = getLogger(getClass());
private static final String METERSTORE = "onos-meter-store";
private static final int MESSAGE_HANDLER_THREAD_POOL_SIZE = 8;
private static final MessageSubject UPDATE_METER = new MessageSubject("peer-mod-meter");
@Property(name = "msgHandlerPoolSize", intValue = MESSAGE_HANDLER_THREAD_POOL_SIZE,
label = "Number of threads in the message handler pool")
private int msgPoolSize;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
private StorageService storageService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
private ClusterCommunicationService clusterCommunicationService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
private MastershipService mastershipService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
private ClusterService clusterService;
private ConsistentMap<MeterId, Meter> meters;
private NodeId local;
private KryoNamespace kryoNameSpace;
private Serializer serializer;
@Activate
public void activate() {
local = clusterService.getLocalNode().id();
kryoNameSpace =
KryoNamespace.newBuilder()
.register(DefaultMeter.class)
.register(DefaultBand.class)
.build();
serializer = Serializer.using(kryoNameSpace);
meters = storageService.<MeterId, Meter>consistentMapBuilder()
.withName(METERSTORE)
.withSerializer(serializer)
.build();
ExecutorService executors = Executors.newFixedThreadPool(
msgPoolSize, Tools.groupedThreads("onos/store/meter", "message-handlers"));
registerMessageHandlers(executors);
log.info("Started");
}
@Deactivate
public void deactivate() {
log.info("Stopped");
}
private void registerMessageHandlers(ExecutorService executor) {
clusterCommunicationService.<MeterEvent>addSubscriber(UPDATE_METER, kryoNameSpace::deserialize,
this::notifyDelegate, executor);
}
@Override
public void storeMeter(Meter meter) {
NodeId master = mastershipService.getMasterFor(meter.deviceId());
meters.put(meter.id(), meter);
MeterEvent event = new MeterEvent(MeterEvent.Type.METER_OP_REQ,
new MeterOperation(meter, MeterOperation.Type.ADD));
if (Objects.equals(local, master)) {
notifyDelegate(event);
} else {
clusterCommunicationService.unicast(
event,
UPDATE_METER,
serializer::encode,
master
).whenComplete((result, error) -> {
if (error != null) {
log.warn("Failed to install meter {} because {} on {}",
meter, error, master);
// notify app of failure
meter.context().ifPresent(c -> c.onError(
event.subject(), MeterFailReason.UNKNOWN));
}
});
}
}
@Override
public void deleteMeter(Meter meter) {
NodeId master = mastershipService.getMasterFor(meter.deviceId());
// update the state of the meter. It will be pruned by observing
// that it has been removed from the dataplane.
meters.put(meter.id(), meter);
MeterEvent event = new MeterEvent(MeterEvent.Type.METER_OP_REQ,
new MeterOperation(meter, MeterOperation.Type.REMOVE));
if (Objects.equals(local, master)) {
notifyDelegate(event);
} else {
clusterCommunicationService.unicast(
event,
UPDATE_METER,
serializer::encode,
master
).whenComplete((result, error) -> {
if (error != null) {
log.warn("Failed to delete meter {} because {} on {}",
meter, error, master);
// notify app of failure
meter.context().ifPresent(c -> c.onError(
event.subject(), MeterFailReason.UNKNOWN));
}
});
}
}
@Override
public void updateMeter(Meter meter) {
NodeId master = mastershipService.getMasterFor(meter.deviceId());
meters.put(meter.id(), meter);
MeterEvent event = new MeterEvent(MeterEvent.Type.METER_OP_REQ,
new MeterOperation(meter, MeterOperation.Type.MODIFY));
if (Objects.equals(local, master)) {
notifyDelegate(event);
} else {
clusterCommunicationService.unicast(
event,
UPDATE_METER,
serializer::encode,
master
).whenComplete((result, error) -> {
if (error != null) {
log.warn("Failed to update meter {} because {} on {}",
meter, error, master);
// notify app of failure
meter.context().ifPresent(c -> c.onError(
event.subject(), MeterFailReason.UNKNOWN));
}
});
}
}
@Override
public void updateMeterState(Meter meter) {
meters.compute(meter.id(), (id, v) -> {
DefaultMeter m = (DefaultMeter) v;
m.setState(meter.state());
m.setProcessedPackets(meter.packetsSeen());
m.setProcessedBytes(meter.bytesSeen());
m.setLife(meter.life());
m.setReferenceCount(meter.referenceCount());
return m;
});
}
@Override
public Meter getMeter(MeterId meterId) {
return meters.get(meterId).value();
}
@Override
public Collection<Meter> getAllMeters() {
return meters.values().stream()
.map(v -> v.value()).collect(Collectors.toSet());
}
@Override
public void failedMeter(MeterOperation op, MeterFailReason reason) {
NodeId master = mastershipService.getMasterFor(op.meter().deviceId());
meters.remove(op.meter().id());
MeterEvent event = new MeterEvent(MeterEvent.Type.METER_OP_FAILED, op, reason);
if (Objects.equals(local, master)) {
notifyDelegate(event);
} else {
clusterCommunicationService.unicast(
event,
UPDATE_METER,
serializer::encode,
master
).whenComplete((result, error) -> {
if (error != null) {
log.warn("Failed to delete failed meter {} because {} on {}",
op.meter(), error, master);
// Can't do any more...
}
});
}
}
}