[ONOS-3504] Initial implementation of control message aggregation

This commit implements control message collection feature in
OpenFlow message provider.

Change-Id: I2a3ed2e5edbe1f39b503bb74a10259026b806513
diff --git a/providers/openflow/message/app.xml b/providers/openflow/message/app.xml
new file mode 100644
index 0000000..ff889ae
--- /dev/null
+++ b/providers/openflow/message/app.xml
@@ -0,0 +1,23 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Copyright 2016 Open Networking Laboratory
+  ~
+  ~ Licensed under the Apache License, Version 2.0 (the "License");
+  ~ you may not use this file except in compliance with the License.
+  ~ You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+<app name="org.onosproject.openflow-message" origin="ON.Lab" version="${project.version}"
+     category="default" url="http://onosproject.org"
+     featuresRepo="mvn:${project.groupId}/${project.artifactId}/${project.version}/xml/features"
+     features="${project.artifactId}">
+    <description>${project.description}</description>
+    <artifact>mvn:${project.groupId}/onos-of-provider-message/${project.version}</artifact>
+</app>
diff --git a/providers/openflow/message/features.xml b/providers/openflow/message/features.xml
new file mode 100644
index 0000000..6b92a98
--- /dev/null
+++ b/providers/openflow/message/features.xml
@@ -0,0 +1,23 @@
+<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
+<!--
+  ~ Copyright 2016 Open Networking Laboratory
+  ~
+  ~ Licensed under the Apache License, Version 2.0 (the "License");
+  ~ you may not use this file except in compliance with the License.
+  ~ You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+<features xmlns="http://karaf.apache.org/xmlns/features/v1.2.0" name="${project.artifactId}-${project.version}">
+    <feature name="${project.artifactId}" version="${project.version}"
+             description="${project.description}">
+        <feature>onos-api</feature>
+        <bundle>mvn:${project.groupId}/onos-of-provider-message/${project.version}</bundle>
+    </feature>
+</features>
diff --git a/providers/openflow/message/src/main/java/org/onosproject/provider/of/message/impl/OpenFlowControlMessageAggregator.java b/providers/openflow/message/src/main/java/org/onosproject/provider/of/message/impl/OpenFlowControlMessageAggregator.java
new file mode 100644
index 0000000..0ce4ce7
--- /dev/null
+++ b/providers/openflow/message/src/main/java/org/onosproject/provider/of/message/impl/OpenFlowControlMessageAggregator.java
@@ -0,0 +1,141 @@
+/*
+ * Copyright 2016 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.provider.of.message.impl;
+
+import com.codahale.metrics.Meter;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Maps;
+import org.onlab.metrics.MetricsComponent;
+import org.onlab.metrics.MetricsFeature;
+import org.onlab.metrics.MetricsService;
+import org.onosproject.cpman.ControlMessage;
+import org.onosproject.cpman.DefaultControlMessage;
+import org.onosproject.cpman.message.ControlMessageProviderService;
+import org.onosproject.net.DeviceId;
+import org.projectfloodlight.openflow.protocol.OFMessage;
+import org.projectfloodlight.openflow.protocol.OFType;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+
+import static org.onosproject.provider.of.message.impl.OpenFlowControlMessageMapper.lookupControlMessageType;
+
+/**
+ * Collects the OpenFlow messages and aggregates using MetricsService.
+ */
+public class OpenFlowControlMessageAggregator implements Runnable {
+
+    private static final Set<OFType> OF_TYPE_SET =
+            ImmutableSet.of(OFType.PACKET_IN, OFType.PACKET_OUT, OFType.FLOW_MOD,
+                    OFType.FLOW_REMOVED, OFType.STATS_REQUEST, OFType.STATS_REPLY);
+
+    private final Map<OFType, Meter> rateMeterMap = Maps.newHashMap();
+    private final Map<OFType, Meter> countMeterMap = Maps.newHashMap();
+
+    private final DeviceId deviceId;
+    private final ControlMessageProviderService providerService;
+
+    private static final String RATE_NAME = "rate";
+    private static final String COUNT_NAME = "count";
+
+    private Collection<ControlMessage> controlMessages = new ArrayList<>();
+
+    // TODO: this needs to be configurable
+    private static final int EXECUTE_PERIOD_IN_SECOND = 60;
+
+    /**
+     * Generates an OpenFlow message aggregator instance.
+     * The instance is for aggregating a specific OpenFlow message
+     * type of an OpenFlow switch.
+     *
+     * @param metricsService metrics service reference object
+     * @param providerService control message provider service reference object
+     * @param deviceId device identification
+     */
+    public OpenFlowControlMessageAggregator(MetricsService metricsService,
+                                            ControlMessageProviderService providerService,
+                                            DeviceId deviceId) {
+        MetricsComponent mc = metricsService.registerComponent(deviceId.toString());
+
+        OF_TYPE_SET.forEach(type -> {
+            MetricsFeature metricsFeature = mc.registerFeature(type.toString());
+            Meter rateMeter = metricsService.createMeter(mc, metricsFeature, RATE_NAME);
+            Meter countMeter = metricsService.createMeter(mc, metricsFeature, COUNT_NAME);
+            rateMeterMap.put(type, rateMeter);
+            countMeterMap.put(type, countMeter);
+        });
+
+        this.deviceId = deviceId;
+        this.providerService = providerService;
+    }
+
+    /**
+     * Increments the meter rate by n, and the meter count by 1.
+     *
+     * @param msg OpenFlow message
+     */
+    public void increment(OFMessage msg) {
+        rateMeterMap.get(msg.getType()).mark(msg.toString().length());
+        countMeterMap.get(msg.getType()).mark(1);
+    }
+
+    @Override
+    public void run() {
+        // update 1 minute statistic information of all control messages
+        OF_TYPE_SET.forEach(type -> controlMessages.add(
+                new DefaultControlMessage(lookupControlMessageType(type),
+                        getLoad(type), getRate(type), getCount(type),
+                        System.currentTimeMillis())));
+        providerService.updateStatsInfo(deviceId,
+                Collections.unmodifiableCollection(controlMessages));
+    }
+
+    /**
+     * Returns the average load value.
+     *
+     * @param type OpenFlow message type
+     * @return load value
+     */
+    private long getLoad(OFType type) {
+        return (long) rateMeterMap.get(type).getOneMinuteRate() /
+                (long) countMeterMap.get(type).getOneMinuteRate();
+    }
+
+    /**
+     * Returns the average meter rate within recent 1 minute.
+     *
+     * @param type OpenFlow message type
+     * @return rate value
+     */
+    private long getRate(OFType type) {
+        return (long) rateMeterMap.get(type).getOneMinuteRate();
+    }
+
+    /**
+     * Returns the average meter count within recent 1 minute.
+     *
+     * @param type OpenFlow message type
+     * @return count value
+     */
+    private long getCount(OFType type) {
+        return (long) countMeterMap.get(type).getOneMinuteRate() *
+                EXECUTE_PERIOD_IN_SECOND;
+    }
+}
diff --git a/providers/openflow/message/src/main/java/org/onosproject/provider/of/message/impl/OpenFlowControlMessageMapper.java b/providers/openflow/message/src/main/java/org/onosproject/provider/of/message/impl/OpenFlowControlMessageMapper.java
new file mode 100644
index 0000000..7347264
--- /dev/null
+++ b/providers/openflow/message/src/main/java/org/onosproject/provider/of/message/impl/OpenFlowControlMessageMapper.java
@@ -0,0 +1,90 @@
+/*
+ * Copyright 2016 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.provider.of.message.impl;
+
+import com.google.common.collect.BiMap;
+import com.google.common.collect.EnumHashBiMap;
+import org.onosproject.cpman.ControlMessage;
+import org.projectfloodlight.openflow.protocol.OFType;
+
+import static org.projectfloodlight.openflow.protocol.OFType.*;
+import static org.onosproject.cpman.ControlMessage.Type.*;
+
+/**
+ * Collection of helper methods to convert protocol agnostic control message to
+ * messages used in OpenFlow specification.
+ */
+public final class OpenFlowControlMessageMapper {
+
+    // prohibit instantiation
+    private OpenFlowControlMessageMapper() {
+    }
+
+    private static final BiMap<OFType, ControlMessage.Type> MESSAGE_TYPE =
+            EnumHashBiMap.create(OFType.class);
+
+    static {
+        // key is OpenFlow specific OFType
+        // value is protocol agnostic ControlMessage.Type
+        MESSAGE_TYPE.put(PACKET_IN, INCOMING_PACKET);
+        MESSAGE_TYPE.put(PACKET_OUT, OUTGOING_PACKET);
+        MESSAGE_TYPE.put(FLOW_MOD, FLOW_MOD_PACKET);
+        MESSAGE_TYPE.put(FLOW_REMOVED, FLOW_REMOVED_PACKET);
+        MESSAGE_TYPE.put(STATS_REQUEST, REQUEST_PACKET);
+        MESSAGE_TYPE.put(STATS_REPLY, REPLY_PACKET);
+    }
+
+    /**
+     * Looks up the specified input value to the corresponding value with the specified map.
+     *
+     * @param map   bidirectional mapping
+     * @param input input type
+     * @param cls   class of output value
+     * @param <I>   type of input value
+     * @param <O>   type of output value
+     * @return the corresponding value stored in the specified map
+     */
+    private static <I, O> O lookup(BiMap<I, O> map, I input, Class<O> cls) {
+        if (!map.containsKey(input)) {
+            throw new RuntimeException(
+                    String.format("No mapping found for %s when converting to %s",
+                            input, cls.getName()));
+        }
+        return map.get(input);
+    }
+
+    /**
+     * Looks up the corresponding {@link ControlMessage.Type} instance
+     * from the specified OFType value for OpenFlow message type.
+     *
+     * @param type OpenFlow message type
+     * @return protocol agnostic control message type
+     */
+    public static ControlMessage.Type lookupControlMessageType(OFType type) {
+        return lookup(MESSAGE_TYPE, type, ControlMessage.Type.class);
+    }
+
+    /**
+     * Looks up the corresponding {@link OFType} instance from the specified
+     * ControlMetricType value.
+     *
+     * @param type control message type
+     * @return OpenFlow specific message type
+     */
+    public static OFType lookupOFType(ControlMessage.Type type) {
+        return lookup(MESSAGE_TYPE.inverse(), type, OFType.class);
+    }
+}
diff --git a/providers/openflow/message/src/main/java/org/onosproject/provider/of/message/impl/OpenFlowControlMessageProvider.java b/providers/openflow/message/src/main/java/org/onosproject/provider/of/message/impl/OpenFlowControlMessageProvider.java
index d1325ee..8c9e160 100644
--- a/providers/openflow/message/src/main/java/org/onosproject/provider/of/message/impl/OpenFlowControlMessageProvider.java
+++ b/providers/openflow/message/src/main/java/org/onosproject/provider/of/message/impl/OpenFlowControlMessageProvider.java
@@ -15,19 +15,39 @@
  */
 package org.onosproject.provider.of.message.impl;
 
+import com.google.common.collect.Maps;
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
 import org.apache.felix.scr.annotations.Reference;
 import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.onlab.metrics.MetricsService;
 import org.onosproject.cpman.message.ControlMessageProvider;
 import org.onosproject.cpman.message.ControlMessageProviderRegistry;
 import org.onosproject.cpman.message.ControlMessageProviderService;
+import org.onosproject.net.DeviceId;
 import org.onosproject.net.provider.AbstractProvider;
 import org.onosproject.net.provider.ProviderId;
+import org.onosproject.openflow.controller.Dpid;
+import org.onosproject.openflow.controller.OpenFlowController;
+import org.onosproject.openflow.controller.OpenFlowEventListener;
+import org.onosproject.openflow.controller.OpenFlowSwitch;
+import org.onosproject.openflow.controller.OpenFlowSwitchListener;
+import org.onosproject.openflow.controller.RoleState;
+import org.projectfloodlight.openflow.protocol.OFMessage;
+import org.projectfloodlight.openflow.protocol.OFPortStatus;
 import org.slf4j.Logger;
 
+import java.util.HashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+import static org.onosproject.net.DeviceId.deviceId;
+import static org.onosproject.openflow.controller.Dpid.uri;
 import static org.slf4j.LoggerFactory.getLogger;
+import static org.onlab.util.Tools.groupedThreads;
 
 /**
  * Provider which uses an OpenFlow controller to collect control message.
@@ -36,13 +56,34 @@
 public class OpenFlowControlMessageProvider extends AbstractProvider
         implements ControlMessageProvider {
 
-    private static final Logger LOG = getLogger(OpenFlowControlMessageProvider.class);
+    private final Logger log = getLogger(getClass());
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected ControlMessageProviderRegistry providerRegistry;
 
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected OpenFlowController controller;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected MetricsService metricsService;
+
     private ControlMessageProviderService providerService;
 
+    private final InternalDeviceProvider listener = new InternalDeviceProvider();
+
+    private final InternalIncomingMessageProvider inMsgListener =
+                    new InternalIncomingMessageProvider();
+
+    private final InternalOutgoingMessageProvider outMsgListener =
+                    new InternalOutgoingMessageProvider();
+
+    private HashMap<Dpid, OpenFlowControlMessageAggregator> aggregators = Maps.newHashMap();
+    private ScheduledExecutorService executor;
+    private static final int AGGR_INIT_DELAY = 1;
+    private static final int AGGR_PERIOD = 1;
+    private static final TimeUnit AGGR_TIME_UNIT = TimeUnit.MINUTES;
+    private HashMap<Dpid, ScheduledFuture<?>> executorResults = Maps.newHashMap();
+
     /**
      * Creates a provider with the supplier identifier.
      */
@@ -53,13 +94,131 @@
     @Activate
     protected void activate() {
         providerService = providerRegistry.register(this);
-        LOG.info("Started");
+
+        // listens all OpenFlow device related events
+        controller.addListener(listener);
+
+        // listens all OpenFlow incoming message events
+        controller.addEventListener(inMsgListener);
+        controller.monitorAllEvents(true);
+
+        // listens all OpenFlow outgoing message events
+        controller.getSwitches().forEach(sw -> sw.addEventListener(outMsgListener));
+
+        executor = Executors.newSingleThreadScheduledExecutor(
+                groupedThreads("onos/provider", "aggregator"));
+
+        connectInitialDevices();
+        log.info("Started");
     }
 
     @Deactivate
     protected void deactivate() {
+        controller.removeListener(listener);
         providerRegistry.unregister(this);
         providerService = null;
-        LOG.info("Stopped");
+
+        // stops listening all OpenFlow incoming message events
+        controller.monitorAllEvents(false);
+        controller.removeEventListener(inMsgListener);
+
+        // stops listening all OpenFlow outgoing message events
+        controller.getSwitches().forEach(sw -> sw.removeEventListener(outMsgListener));
+
+        log.info("Stopped");
+    }
+
+    private void connectInitialDevices() {
+        for (OpenFlowSwitch sw: controller.getSwitches()) {
+            try {
+                listener.switchAdded(new Dpid(sw.getId()));
+            } catch (Exception e) {
+                log.warn("Failed initially adding {} : {}", sw.getStringId(), e.getMessage());
+                log.debug("Error details:", e);
+            }
+        }
+    }
+
+    /**
+     * A listener for OpenFlow switch event.
+     */
+    private class InternalDeviceProvider implements OpenFlowSwitchListener {
+
+        @Override
+        public void switchAdded(Dpid dpid) {
+            if (providerService == null) {
+                return;
+            }
+
+            OpenFlowSwitch sw = controller.getSwitch(dpid);
+            if (sw != null) {
+                // start to monitor the outgoing control messages
+                sw.addEventListener(outMsgListener);
+            }
+
+            DeviceId deviceId = deviceId(uri(dpid));
+            OpenFlowControlMessageAggregator ofcma =
+                    new OpenFlowControlMessageAggregator(metricsService,
+                            providerService, deviceId);
+            ScheduledFuture result = executor.scheduleAtFixedRate(ofcma,
+                    AGGR_INIT_DELAY, AGGR_PERIOD, AGGR_TIME_UNIT);
+            aggregators.put(dpid, ofcma);
+            executorResults.put(dpid, result);
+        }
+
+        @Override
+        public void switchRemoved(Dpid dpid) {
+            if (providerService == null) {
+                return;
+            }
+
+            OpenFlowSwitch sw = controller.getSwitch(dpid);
+            if (sw != null) {
+                // stop monitoring the outgoing control messages
+                sw.removeEventListener(outMsgListener);
+            }
+
+            // removes the aggregator when switch is removed
+            // this also stops the aggregator from running
+            OpenFlowControlMessageAggregator aggregator = aggregators.remove(dpid);
+            if (aggregator != null) {
+                executorResults.get(dpid).cancel(true);
+                executorResults.remove(dpid);
+            }
+        }
+
+        @Override
+        public void switchChanged(Dpid dpid) {
+        }
+
+        @Override
+        public void portChanged(Dpid dpid, OFPortStatus status) {
+        }
+
+        @Override
+        public void receivedRoleReply(Dpid dpid, RoleState requested, RoleState response) {
+        }
+    }
+
+    /**
+     * A listener for incoming OpenFlow messages.
+     */
+    private class InternalIncomingMessageProvider implements OpenFlowEventListener {
+
+        @Override
+        public void handleMessage(Dpid dpid, OFMessage msg) {
+            aggregators.get(dpid).increment(msg);
+        }
+    }
+
+    /**
+     * A listener for outgoing OpenFlow messages.
+     */
+    private class InternalOutgoingMessageProvider implements OpenFlowEventListener {
+
+        @Override
+        public void handleMessage(Dpid dpid, OFMessage msg) {
+            aggregators.get(dpid).increment(msg);
+        }
     }
 }