[ONOS-3504] Initial implementation of control message aggregation
This commit implements control message collection feature in
OpenFlow message provider.
Change-Id: I2a3ed2e5edbe1f39b503bb74a10259026b806513
diff --git a/providers/openflow/message/app.xml b/providers/openflow/message/app.xml
new file mode 100644
index 0000000..ff889ae
--- /dev/null
+++ b/providers/openflow/message/app.xml
@@ -0,0 +1,23 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Copyright 2016 Open Networking Laboratory
+ ~
+ ~ Licensed under the Apache License, Version 2.0 (the "License");
+ ~ you may not use this file except in compliance with the License.
+ ~ You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+<app name="org.onosproject.openflow-message" origin="ON.Lab" version="${project.version}"
+ category="default" url="http://onosproject.org"
+ featuresRepo="mvn:${project.groupId}/${project.artifactId}/${project.version}/xml/features"
+ features="${project.artifactId}">
+ <description>${project.description}</description>
+ <artifact>mvn:${project.groupId}/onos-of-provider-message/${project.version}</artifact>
+</app>
diff --git a/providers/openflow/message/features.xml b/providers/openflow/message/features.xml
new file mode 100644
index 0000000..6b92a98
--- /dev/null
+++ b/providers/openflow/message/features.xml
@@ -0,0 +1,23 @@
+<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
+<!--
+ ~ Copyright 2016 Open Networking Laboratory
+ ~
+ ~ Licensed under the Apache License, Version 2.0 (the "License");
+ ~ you may not use this file except in compliance with the License.
+ ~ You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+<features xmlns="http://karaf.apache.org/xmlns/features/v1.2.0" name="${project.artifactId}-${project.version}">
+ <feature name="${project.artifactId}" version="${project.version}"
+ description="${project.description}">
+ <feature>onos-api</feature>
+ <bundle>mvn:${project.groupId}/onos-of-provider-message/${project.version}</bundle>
+ </feature>
+</features>
diff --git a/providers/openflow/message/src/main/java/org/onosproject/provider/of/message/impl/OpenFlowControlMessageAggregator.java b/providers/openflow/message/src/main/java/org/onosproject/provider/of/message/impl/OpenFlowControlMessageAggregator.java
new file mode 100644
index 0000000..0ce4ce7
--- /dev/null
+++ b/providers/openflow/message/src/main/java/org/onosproject/provider/of/message/impl/OpenFlowControlMessageAggregator.java
@@ -0,0 +1,141 @@
+/*
+ * Copyright 2016 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.provider.of.message.impl;
+
+import com.codahale.metrics.Meter;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Maps;
+import org.onlab.metrics.MetricsComponent;
+import org.onlab.metrics.MetricsFeature;
+import org.onlab.metrics.MetricsService;
+import org.onosproject.cpman.ControlMessage;
+import org.onosproject.cpman.DefaultControlMessage;
+import org.onosproject.cpman.message.ControlMessageProviderService;
+import org.onosproject.net.DeviceId;
+import org.projectfloodlight.openflow.protocol.OFMessage;
+import org.projectfloodlight.openflow.protocol.OFType;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+
+import static org.onosproject.provider.of.message.impl.OpenFlowControlMessageMapper.lookupControlMessageType;
+
+/**
+ * Collects the OpenFlow messages and aggregates using MetricsService.
+ */
+public class OpenFlowControlMessageAggregator implements Runnable {
+
+ private static final Set<OFType> OF_TYPE_SET =
+ ImmutableSet.of(OFType.PACKET_IN, OFType.PACKET_OUT, OFType.FLOW_MOD,
+ OFType.FLOW_REMOVED, OFType.STATS_REQUEST, OFType.STATS_REPLY);
+
+ private final Map<OFType, Meter> rateMeterMap = Maps.newHashMap();
+ private final Map<OFType, Meter> countMeterMap = Maps.newHashMap();
+
+ private final DeviceId deviceId;
+ private final ControlMessageProviderService providerService;
+
+ private static final String RATE_NAME = "rate";
+ private static final String COUNT_NAME = "count";
+
+ private Collection<ControlMessage> controlMessages = new ArrayList<>();
+
+ // TODO: this needs to be configurable
+ private static final int EXECUTE_PERIOD_IN_SECOND = 60;
+
+ /**
+ * Generates an OpenFlow message aggregator instance.
+ * The instance is for aggregating a specific OpenFlow message
+ * type of an OpenFlow switch.
+ *
+ * @param metricsService metrics service reference object
+ * @param providerService control message provider service reference object
+ * @param deviceId device identification
+ */
+ public OpenFlowControlMessageAggregator(MetricsService metricsService,
+ ControlMessageProviderService providerService,
+ DeviceId deviceId) {
+ MetricsComponent mc = metricsService.registerComponent(deviceId.toString());
+
+ OF_TYPE_SET.forEach(type -> {
+ MetricsFeature metricsFeature = mc.registerFeature(type.toString());
+ Meter rateMeter = metricsService.createMeter(mc, metricsFeature, RATE_NAME);
+ Meter countMeter = metricsService.createMeter(mc, metricsFeature, COUNT_NAME);
+ rateMeterMap.put(type, rateMeter);
+ countMeterMap.put(type, countMeter);
+ });
+
+ this.deviceId = deviceId;
+ this.providerService = providerService;
+ }
+
+ /**
+ * Increments the meter rate by n, and the meter count by 1.
+ *
+ * @param msg OpenFlow message
+ */
+ public void increment(OFMessage msg) {
+ rateMeterMap.get(msg.getType()).mark(msg.toString().length());
+ countMeterMap.get(msg.getType()).mark(1);
+ }
+
+ @Override
+ public void run() {
+ // update 1 minute statistic information of all control messages
+ OF_TYPE_SET.forEach(type -> controlMessages.add(
+ new DefaultControlMessage(lookupControlMessageType(type),
+ getLoad(type), getRate(type), getCount(type),
+ System.currentTimeMillis())));
+ providerService.updateStatsInfo(deviceId,
+ Collections.unmodifiableCollection(controlMessages));
+ }
+
+ /**
+ * Returns the average load value.
+ *
+ * @param type OpenFlow message type
+ * @return load value
+ */
+ private long getLoad(OFType type) {
+ return (long) rateMeterMap.get(type).getOneMinuteRate() /
+ (long) countMeterMap.get(type).getOneMinuteRate();
+ }
+
+ /**
+ * Returns the average meter rate within recent 1 minute.
+ *
+ * @param type OpenFlow message type
+ * @return rate value
+ */
+ private long getRate(OFType type) {
+ return (long) rateMeterMap.get(type).getOneMinuteRate();
+ }
+
+ /**
+ * Returns the average meter count within recent 1 minute.
+ *
+ * @param type OpenFlow message type
+ * @return count value
+ */
+ private long getCount(OFType type) {
+ return (long) countMeterMap.get(type).getOneMinuteRate() *
+ EXECUTE_PERIOD_IN_SECOND;
+ }
+}
diff --git a/providers/openflow/message/src/main/java/org/onosproject/provider/of/message/impl/OpenFlowControlMessageMapper.java b/providers/openflow/message/src/main/java/org/onosproject/provider/of/message/impl/OpenFlowControlMessageMapper.java
new file mode 100644
index 0000000..7347264
--- /dev/null
+++ b/providers/openflow/message/src/main/java/org/onosproject/provider/of/message/impl/OpenFlowControlMessageMapper.java
@@ -0,0 +1,90 @@
+/*
+ * Copyright 2016 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.provider.of.message.impl;
+
+import com.google.common.collect.BiMap;
+import com.google.common.collect.EnumHashBiMap;
+import org.onosproject.cpman.ControlMessage;
+import org.projectfloodlight.openflow.protocol.OFType;
+
+import static org.projectfloodlight.openflow.protocol.OFType.*;
+import static org.onosproject.cpman.ControlMessage.Type.*;
+
+/**
+ * Collection of helper methods to convert protocol agnostic control message to
+ * messages used in OpenFlow specification.
+ */
+public final class OpenFlowControlMessageMapper {
+
+ // prohibit instantiation
+ private OpenFlowControlMessageMapper() {
+ }
+
+ private static final BiMap<OFType, ControlMessage.Type> MESSAGE_TYPE =
+ EnumHashBiMap.create(OFType.class);
+
+ static {
+ // key is OpenFlow specific OFType
+ // value is protocol agnostic ControlMessage.Type
+ MESSAGE_TYPE.put(PACKET_IN, INCOMING_PACKET);
+ MESSAGE_TYPE.put(PACKET_OUT, OUTGOING_PACKET);
+ MESSAGE_TYPE.put(FLOW_MOD, FLOW_MOD_PACKET);
+ MESSAGE_TYPE.put(FLOW_REMOVED, FLOW_REMOVED_PACKET);
+ MESSAGE_TYPE.put(STATS_REQUEST, REQUEST_PACKET);
+ MESSAGE_TYPE.put(STATS_REPLY, REPLY_PACKET);
+ }
+
+ /**
+ * Looks up the specified input value to the corresponding value with the specified map.
+ *
+ * @param map bidirectional mapping
+ * @param input input type
+ * @param cls class of output value
+ * @param <I> type of input value
+ * @param <O> type of output value
+ * @return the corresponding value stored in the specified map
+ */
+ private static <I, O> O lookup(BiMap<I, O> map, I input, Class<O> cls) {
+ if (!map.containsKey(input)) {
+ throw new RuntimeException(
+ String.format("No mapping found for %s when converting to %s",
+ input, cls.getName()));
+ }
+ return map.get(input);
+ }
+
+ /**
+ * Looks up the corresponding {@link ControlMessage.Type} instance
+ * from the specified OFType value for OpenFlow message type.
+ *
+ * @param type OpenFlow message type
+ * @return protocol agnostic control message type
+ */
+ public static ControlMessage.Type lookupControlMessageType(OFType type) {
+ return lookup(MESSAGE_TYPE, type, ControlMessage.Type.class);
+ }
+
+ /**
+ * Looks up the corresponding {@link OFType} instance from the specified
+ * ControlMetricType value.
+ *
+ * @param type control message type
+ * @return OpenFlow specific message type
+ */
+ public static OFType lookupOFType(ControlMessage.Type type) {
+ return lookup(MESSAGE_TYPE.inverse(), type, OFType.class);
+ }
+}
diff --git a/providers/openflow/message/src/main/java/org/onosproject/provider/of/message/impl/OpenFlowControlMessageProvider.java b/providers/openflow/message/src/main/java/org/onosproject/provider/of/message/impl/OpenFlowControlMessageProvider.java
index d1325ee..8c9e160 100644
--- a/providers/openflow/message/src/main/java/org/onosproject/provider/of/message/impl/OpenFlowControlMessageProvider.java
+++ b/providers/openflow/message/src/main/java/org/onosproject/provider/of/message/impl/OpenFlowControlMessageProvider.java
@@ -15,19 +15,39 @@
*/
package org.onosproject.provider.of.message.impl;
+import com.google.common.collect.Maps;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.onlab.metrics.MetricsService;
import org.onosproject.cpman.message.ControlMessageProvider;
import org.onosproject.cpman.message.ControlMessageProviderRegistry;
import org.onosproject.cpman.message.ControlMessageProviderService;
+import org.onosproject.net.DeviceId;
import org.onosproject.net.provider.AbstractProvider;
import org.onosproject.net.provider.ProviderId;
+import org.onosproject.openflow.controller.Dpid;
+import org.onosproject.openflow.controller.OpenFlowController;
+import org.onosproject.openflow.controller.OpenFlowEventListener;
+import org.onosproject.openflow.controller.OpenFlowSwitch;
+import org.onosproject.openflow.controller.OpenFlowSwitchListener;
+import org.onosproject.openflow.controller.RoleState;
+import org.projectfloodlight.openflow.protocol.OFMessage;
+import org.projectfloodlight.openflow.protocol.OFPortStatus;
import org.slf4j.Logger;
+import java.util.HashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+import static org.onosproject.net.DeviceId.deviceId;
+import static org.onosproject.openflow.controller.Dpid.uri;
import static org.slf4j.LoggerFactory.getLogger;
+import static org.onlab.util.Tools.groupedThreads;
/**
* Provider which uses an OpenFlow controller to collect control message.
@@ -36,13 +56,34 @@
public class OpenFlowControlMessageProvider extends AbstractProvider
implements ControlMessageProvider {
- private static final Logger LOG = getLogger(OpenFlowControlMessageProvider.class);
+ private final Logger log = getLogger(getClass());
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ControlMessageProviderRegistry providerRegistry;
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected OpenFlowController controller;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected MetricsService metricsService;
+
private ControlMessageProviderService providerService;
+ private final InternalDeviceProvider listener = new InternalDeviceProvider();
+
+ private final InternalIncomingMessageProvider inMsgListener =
+ new InternalIncomingMessageProvider();
+
+ private final InternalOutgoingMessageProvider outMsgListener =
+ new InternalOutgoingMessageProvider();
+
+ private HashMap<Dpid, OpenFlowControlMessageAggregator> aggregators = Maps.newHashMap();
+ private ScheduledExecutorService executor;
+ private static final int AGGR_INIT_DELAY = 1;
+ private static final int AGGR_PERIOD = 1;
+ private static final TimeUnit AGGR_TIME_UNIT = TimeUnit.MINUTES;
+ private HashMap<Dpid, ScheduledFuture<?>> executorResults = Maps.newHashMap();
+
/**
* Creates a provider with the supplier identifier.
*/
@@ -53,13 +94,131 @@
@Activate
protected void activate() {
providerService = providerRegistry.register(this);
- LOG.info("Started");
+
+ // listens all OpenFlow device related events
+ controller.addListener(listener);
+
+ // listens all OpenFlow incoming message events
+ controller.addEventListener(inMsgListener);
+ controller.monitorAllEvents(true);
+
+ // listens all OpenFlow outgoing message events
+ controller.getSwitches().forEach(sw -> sw.addEventListener(outMsgListener));
+
+ executor = Executors.newSingleThreadScheduledExecutor(
+ groupedThreads("onos/provider", "aggregator"));
+
+ connectInitialDevices();
+ log.info("Started");
}
@Deactivate
protected void deactivate() {
+ controller.removeListener(listener);
providerRegistry.unregister(this);
providerService = null;
- LOG.info("Stopped");
+
+ // stops listening all OpenFlow incoming message events
+ controller.monitorAllEvents(false);
+ controller.removeEventListener(inMsgListener);
+
+ // stops listening all OpenFlow outgoing message events
+ controller.getSwitches().forEach(sw -> sw.removeEventListener(outMsgListener));
+
+ log.info("Stopped");
+ }
+
+ private void connectInitialDevices() {
+ for (OpenFlowSwitch sw: controller.getSwitches()) {
+ try {
+ listener.switchAdded(new Dpid(sw.getId()));
+ } catch (Exception e) {
+ log.warn("Failed initially adding {} : {}", sw.getStringId(), e.getMessage());
+ log.debug("Error details:", e);
+ }
+ }
+ }
+
+ /**
+ * A listener for OpenFlow switch event.
+ */
+ private class InternalDeviceProvider implements OpenFlowSwitchListener {
+
+ @Override
+ public void switchAdded(Dpid dpid) {
+ if (providerService == null) {
+ return;
+ }
+
+ OpenFlowSwitch sw = controller.getSwitch(dpid);
+ if (sw != null) {
+ // start to monitor the outgoing control messages
+ sw.addEventListener(outMsgListener);
+ }
+
+ DeviceId deviceId = deviceId(uri(dpid));
+ OpenFlowControlMessageAggregator ofcma =
+ new OpenFlowControlMessageAggregator(metricsService,
+ providerService, deviceId);
+ ScheduledFuture result = executor.scheduleAtFixedRate(ofcma,
+ AGGR_INIT_DELAY, AGGR_PERIOD, AGGR_TIME_UNIT);
+ aggregators.put(dpid, ofcma);
+ executorResults.put(dpid, result);
+ }
+
+ @Override
+ public void switchRemoved(Dpid dpid) {
+ if (providerService == null) {
+ return;
+ }
+
+ OpenFlowSwitch sw = controller.getSwitch(dpid);
+ if (sw != null) {
+ // stop monitoring the outgoing control messages
+ sw.removeEventListener(outMsgListener);
+ }
+
+ // removes the aggregator when switch is removed
+ // this also stops the aggregator from running
+ OpenFlowControlMessageAggregator aggregator = aggregators.remove(dpid);
+ if (aggregator != null) {
+ executorResults.get(dpid).cancel(true);
+ executorResults.remove(dpid);
+ }
+ }
+
+ @Override
+ public void switchChanged(Dpid dpid) {
+ }
+
+ @Override
+ public void portChanged(Dpid dpid, OFPortStatus status) {
+ }
+
+ @Override
+ public void receivedRoleReply(Dpid dpid, RoleState requested, RoleState response) {
+ }
+ }
+
+ /**
+ * A listener for incoming OpenFlow messages.
+ */
+ private class InternalIncomingMessageProvider implements OpenFlowEventListener {
+
+ @Override
+ public void handleMessage(Dpid dpid, OFMessage msg) {
+ aggregators.get(dpid).increment(msg);
+ }
+ }
+
+ /**
+ * A listener for outgoing OpenFlow messages.
+ */
+ private class InternalOutgoingMessageProvider implements OpenFlowEventListener {
+
+ @Override
+ public void handleMessage(Dpid dpid, OFMessage msg) {
+ aggregators.get(dpid).increment(msg);
+ }
}
}