Fix for ONOS-5841

Change-Id: I562abbce373e53ef0419fe02bec950ec8f857b69
diff --git a/apps/fwd/BUCK b/apps/fwd/BUCK
index 2c69a19..0e712b9 100644
--- a/apps/fwd/BUCK
+++ b/apps/fwd/BUCK
@@ -1,5 +1,10 @@
 COMPILE_DEPS = [
     '//lib:CORE_DEPS',
+    '//core/store/serializers:onos-core-serializers',
+    '//core/store/primitives:onos-core-primitives',
+    '//core/api:onos-api',
+    '//lib:org.apache.karaf.shell.console',
+    '//cli:onos-cli',
 ]
 
 osgi_jar_with_tests (
diff --git a/apps/fwd/pom.xml b/apps/fwd/pom.xml
index 2e9e473..1224904 100644
--- a/apps/fwd/pom.xml
+++ b/apps/fwd/pom.xml
@@ -43,6 +43,30 @@
             <groupId>org.osgi</groupId>
             <artifactId>org.osgi.compendium</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.onosproject</groupId>
+            <artifactId>onos-core-serializers</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.onosproject</groupId>
+            <artifactId>onos-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.onosproject</groupId>
+            <artifactId>onos-core-primitives</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.onosproject</groupId>
+            <artifactId>onos-cli</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.karaf.shell</groupId>
+            <artifactId>org.apache.karaf.shell.console</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.felix</groupId>
+            <artifactId>org.apache.felix.scr.annotations</artifactId>
+        </dependency>
     </dependencies>
 
 </project>
diff --git a/apps/fwd/src/main/java/org/onosproject/fwd/MacAddressCompleter.java b/apps/fwd/src/main/java/org/onosproject/fwd/MacAddressCompleter.java
new file mode 100644
index 0000000..fc5ff3b
--- /dev/null
+++ b/apps/fwd/src/main/java/org/onosproject/fwd/MacAddressCompleter.java
@@ -0,0 +1,44 @@
+/*
+ * Copyright 2017-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.fwd;
+import org.apache.karaf.shell.console.Completer;
+import org.apache.karaf.shell.console.completer.StringsCompleter;
+import org.onosproject.cli.AbstractShellCommand;
+import org.onosproject.store.service.EventuallyConsistentMap;
+import java.util.List;
+import java.util.SortedSet;
+import org.onlab.packet.MacAddress;
+
+/**
+ * Sample reactive forwarding application.
+ */
+public class MacAddressCompleter implements Completer {
+    @Override
+    public int complete(String buffer, int cursor, List<String> candidates) {
+        // Delegate string completer
+        StringsCompleter delegate = new StringsCompleter();
+        EventuallyConsistentMap<MacAddress, ReactiveForwardMetrics> macAddress;
+        // Fetch our service and feed it's offerings to the string completer
+        ReactiveForwarding reactiveForwardingService = AbstractShellCommand.get(ReactiveForwarding.class);
+        macAddress = reactiveForwardingService.getMacAddress();
+        SortedSet<String> strings = delegate.getStrings();
+        for (MacAddress key : macAddress.keySet()) {
+            strings.add(key.toString());
+        }
+        // Now let the completer do the work for figuring out what to offer.
+        return delegate.complete(buffer, cursor, candidates);
+    }
+}
diff --git a/apps/fwd/src/main/java/org/onosproject/fwd/ReactiveForwardMetrics.java b/apps/fwd/src/main/java/org/onosproject/fwd/ReactiveForwardMetrics.java
new file mode 100644
index 0000000..02cf6d9
--- /dev/null
+++ b/apps/fwd/src/main/java/org/onosproject/fwd/ReactiveForwardMetrics.java
@@ -0,0 +1,69 @@
+/*
+ * Copyright 2017-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.fwd;
+
+import org.onlab.packet.MacAddress;
+import static com.google.common.base.MoreObjects.toStringHelper;
+
+/**
+ * Sample reactive forwarding application.
+ */
+public class ReactiveForwardMetrics {
+    private Long replyPacket = null;
+    private Long inPacket = null;
+    private Long droppedPacket = null;
+    private Long forwardedPacket = null;
+    private MacAddress macAddress;
+
+    ReactiveForwardMetrics(Long replyPacket, Long inPacket, Long droppedPacket,
+                           Long forwardedPacket, MacAddress macAddress) {
+        this.replyPacket = replyPacket;
+        this.inPacket = inPacket;
+        this.droppedPacket = droppedPacket;
+        this.forwardedPacket = forwardedPacket;
+        this.macAddress = macAddress;
+    }
+
+    public void incremnetReplyPacket() {
+        replyPacket++;
+
+    }
+
+    public void incrementInPacket() {
+        inPacket++;
+    }
+
+    public void incrementDroppedPacket() {
+        droppedPacket++;
+    }
+
+    public void incrementForwardedPacket() {
+        forwardedPacket++;
+    }
+
+    public MacAddress getMacAddress() {
+        return macAddress;
+    }
+
+    @Override
+    public String toString() {
+        return toStringHelper(this)
+            .add("inpktCounter ", inPacket)
+            .add("replypktCounter ", replyPacket)
+            .add("forwardpktCounter ", forwardedPacket)
+            .add("droppktCounter ", droppedPacket).toString();
+    }
+}
diff --git a/apps/fwd/src/main/java/org/onosproject/fwd/ReactiveForwarding.java b/apps/fwd/src/main/java/org/onosproject/fwd/ReactiveForwarding.java
index fc7b346..16cef77 100644
--- a/apps/fwd/src/main/java/org/onosproject/fwd/ReactiveForwarding.java
+++ b/apps/fwd/src/main/java/org/onosproject/fwd/ReactiveForwarding.java
@@ -22,6 +22,7 @@
 import org.apache.felix.scr.annotations.Modified;
 import org.apache.felix.scr.annotations.Property;
 import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.Service;
 import org.apache.felix.scr.annotations.ReferenceCardinality;
 import org.onlab.packet.Ethernet;
 import org.onlab.packet.ICMP;
@@ -35,6 +36,7 @@
 import org.onlab.packet.TpPort;
 import org.onlab.packet.UDP;
 import org.onlab.packet.VlanId;
+import org.onlab.util.KryoNamespace;
 import org.onlab.util.Tools;
 import org.onosproject.cfg.ComponentConfigService;
 import org.onosproject.core.ApplicationId;
@@ -71,9 +73,13 @@
 import org.onosproject.net.topology.TopologyEvent;
 import org.onosproject.net.topology.TopologyListener;
 import org.onosproject.net.topology.TopologyService;
+import org.onosproject.store.service.StorageService;
 import org.osgi.service.component.ComponentContext;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.EventuallyConsistentMap;
+import org.onosproject.store.service.WallClockTimestamp;
+import org.onosproject.store.service.MultiValuedTimestamp;
 import org.slf4j.Logger;
-
 import java.util.Dictionary;
 import java.util.HashMap;
 import java.util.List;
@@ -87,6 +93,7 @@
  * Sample reactive forwarding application.
  */
 @Component(immediate = true)
+@Service(value = ReactiveForwarding.class)
 public class ReactiveForwarding {
 
     private static final int DEFAULT_TIMEOUT = 10;
@@ -115,8 +122,13 @@
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected ComponentConfigService cfgService;
 
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected StorageService storageService;
+
     private ReactivePacketProcessor processor = new ReactivePacketProcessor();
 
+    private  EventuallyConsistentMap<MacAddress, ReactiveForwardMetrics> metrics;
+
     private ApplicationId appId;
 
     @Property(name = "packetOutOnly", boolValue = false,
@@ -180,11 +192,26 @@
             label = "Ignore (do not forward) IPv4 multicast packets; default is false")
     private boolean ignoreIpv4McastPackets = false;
 
+    @Property(name = "recordMetrics", boolValue = false,
+            label = "Enable record metrics for reactive forwarding")
+    private boolean recordMetrics = false;
+
     private final TopologyListener topologyListener = new InternalTopologyListener();
 
 
     @Activate
     public void activate(ComponentContext context) {
+        KryoNamespace.Builder metricSerializer = KryoNamespace.newBuilder()
+                .register(KryoNamespaces.API)
+                .register(ReactiveForwardMetrics.class)
+                .register(MultiValuedTimestamp.class);
+        metrics =  storageService.<MacAddress, ReactiveForwardMetrics>eventuallyConsistentMapBuilder()
+                .withName("metrics-fwd")
+                .withSerializer(metricSerializer)
+                .withTimestampProvider((key, metricsData) ->new
+                        MultiValuedTimestamp<>(new WallClockTimestamp(), System.nanoTime()))
+                .build();
+
         cfgService.registerProperties(getClass());
         appId = coreService.registerApplication("org.onosproject.fwd");
 
@@ -383,6 +410,17 @@
             log.info("Configured. Ignore IPv4 multicast packets is {}",
                     ignoreIpv4McastPackets ? "enabled" : "disabled");
         }
+        Boolean recordMetricsEnabled =
+                Tools.isPropertyEnabled(properties, "recordMetrics");
+        if (recordMetricsEnabled == null) {
+            log.info("IConfigured. Ignore record metrics  is {} ," +
+                    "using current value of {}", recordMetrics);
+        } else {
+            recordMetrics = recordMetricsEnabled;
+            log.info("Configured. record metrics  is {}",
+                    recordMetrics ? "enabled" : "disabled");
+        }
+
         flowTimeout = Tools.getIntegerProperty(properties, "flowTimeout", DEFAULT_TIMEOUT);
         log.info("Configured. Flow Timeout is configured to {} seconds", flowTimeout);
 
@@ -411,13 +449,20 @@
                 return;
             }
 
+            MacAddress macAddress = ethPkt.getSourceMAC();
+            ReactiveForwardMetrics macMetrics = null;
+            macMetrics = createCounter(macAddress);
+            inPacket(macMetrics);
+
             // Bail if this is deemed to be a control packet.
             if (isControlPacket(ethPkt)) {
+                droppedPacket(macMetrics);
                 return;
             }
 
             // Skip IPv6 multicast packet when IPv6 forward is disabled.
             if (!ipv6Forwarding && isIpv6Multicast(ethPkt)) {
+                droppedPacket(macMetrics);
                 return;
             }
 
@@ -425,6 +470,7 @@
 
             // Do not process link-local addresses in any way.
             if (id.mac().isLinkLocal()) {
+                droppedPacket(macMetrics);
                 return;
             }
 
@@ -438,7 +484,7 @@
             // Do we know who this is for? If not, flood and bail.
             Host dst = hostService.getHost(id);
             if (dst == null) {
-                flood(context);
+                flood(context, macMetrics);
                 return;
             }
 
@@ -446,7 +492,7 @@
             // simply forward out to the destination and bail.
             if (pkt.receivedFrom().deviceId().equals(dst.location().deviceId())) {
                 if (!context.inPacket().receivedFrom().port().equals(dst.location().port())) {
-                    installRule(context, dst.location().port());
+                    installRule(context, dst.location().port(), macMetrics);
                 }
                 return;
             }
@@ -459,7 +505,7 @@
                                              dst.location().deviceId());
             if (paths.isEmpty()) {
                 // If there are no paths, flood and bail.
-                flood(context);
+                flood(context, macMetrics);
                 return;
             }
 
@@ -469,12 +515,12 @@
             if (path == null) {
                 log.warn("Don't know where to go from here {} for {} -> {}",
                          pkt.receivedFrom(), ethPkt.getSourceMAC(), ethPkt.getDestinationMAC());
-                flood(context);
+                flood(context, macMetrics);
                 return;
             }
 
             // Otherwise forward and be done with it.
-            installRule(context, path.src().port());
+            installRule(context, path.src().port(), macMetrics);
         }
 
     }
@@ -504,23 +550,24 @@
     }
 
     // Floods the specified packet if permissible.
-    private void flood(PacketContext context) {
+    private void flood(PacketContext context, ReactiveForwardMetrics macMetrics) {
         if (topologyService.isBroadcastPoint(topologyService.currentTopology(),
                                              context.inPacket().receivedFrom())) {
-            packetOut(context, PortNumber.FLOOD);
+            packetOut(context, PortNumber.FLOOD, macMetrics);
         } else {
             context.block();
         }
     }
 
     // Sends a packet out the specified port.
-    private void packetOut(PacketContext context, PortNumber portNumber) {
+    private void packetOut(PacketContext context, PortNumber portNumber, ReactiveForwardMetrics macMetrics) {
+        replyPacket(macMetrics);
         context.treatmentBuilder().setOutput(portNumber);
         context.send();
     }
 
     // Install a rule forwarding the packet to the specified port.
-    private void installRule(PacketContext context, PortNumber portNumber) {
+    private void installRule(PacketContext context, PortNumber portNumber, ReactiveForwardMetrics macMetrics) {
         //
         // We don't support (yet) buffer IDs in the Flow Service so
         // packet out first.
@@ -530,7 +577,7 @@
 
         // If PacketOutOnly or ARP packet than forward directly to output port
         if (packetOutOnly || inPkt.getEtherType() == Ethernet.TYPE_ARP) {
-            packetOut(context, portNumber);
+            packetOut(context, portNumber, macMetrics);
             return;
         }
 
@@ -651,7 +698,7 @@
 
         flowObjectiveService.forward(context.inPacket().receivedFrom().deviceId(),
                                      forwardingObjective);
-
+        forwardPacket(macMetrics);
         //
         // If packetOutOfppTable
         //  Send packet back to the OpenFlow pipeline to match installed flow
@@ -659,12 +706,13 @@
         //  Send packet direction on the appropriate port
         //
         if (packetOutOfppTable) {
-            packetOut(context, PortNumber.TABLE);
+            packetOut(context, PortNumber.TABLE, macMetrics);
         } else {
-            packetOut(context, portNumber);
+            packetOut(context, portNumber, macMetrics);
         }
     }
 
+
     private class InternalTopologyListener implements TopologyListener {
         @Override
         public void event(TopologyEvent event) {
@@ -784,8 +832,64 @@
         return builder.build();
     }
 
-    // Returns set of flow entries which were created by this application and
-    // which egress from the specified connection port
+    private ReactiveForwardMetrics createCounter(MacAddress macAddress) {
+        ReactiveForwardMetrics macMetrics = null;
+        if (recordMetrics) {
+            macMetrics = metrics.compute(macAddress, (key, existingValue) -> {
+                if (existingValue == null) {
+                    return new ReactiveForwardMetrics(0L, 0L, 0L, 0L, macAddress);
+                } else {
+                    return existingValue;
+                }
+            });
+        }
+        return macMetrics;
+    }
+
+    private void  forwardPacket(ReactiveForwardMetrics macmetrics) {
+        if (recordMetrics) {
+            macmetrics.incrementForwardedPacket();
+            metrics.put(macmetrics.getMacAddress(), macmetrics);
+        }
+    }
+
+    private void inPacket(ReactiveForwardMetrics macmetrics) {
+        if (recordMetrics) {
+            macmetrics.incrementInPacket();
+            metrics.put(macmetrics.getMacAddress(), macmetrics);
+        }
+    }
+
+    private void replyPacket(ReactiveForwardMetrics macmetrics) {
+        if (recordMetrics) {
+            macmetrics.incremnetReplyPacket();
+            metrics.put(macmetrics.getMacAddress(), macmetrics);
+        }
+    }
+
+    private void droppedPacket(ReactiveForwardMetrics macmetrics) {
+        if (recordMetrics) {
+            macmetrics.incrementDroppedPacket();
+            metrics.put(macmetrics.getMacAddress(), macmetrics);
+        }
+    }
+
+    public EventuallyConsistentMap<MacAddress, ReactiveForwardMetrics> getMacAddress() {
+        return metrics;
+    }
+
+    public void printMetric(MacAddress mac) {
+        System.out.println("-----------------------------------------------------------------------------------------");
+        System.out.println(" MACADDRESS \t\t\t\t\t\t Metrics");
+        if (mac != null) {
+            System.out.println(" " + mac + " \t\t\t " + metrics.get(mac));
+        } else {
+            for (MacAddress key : metrics.keySet()) {
+                System.out.println(" " + key + " \t\t\t " + metrics.get(key));
+            }
+        }
+    }
+
     private Set<FlowEntry> getFlowRulesFrom(ConnectPoint egress) {
         ImmutableSet.Builder<FlowEntry> builder = ImmutableSet.builder();
         flowRuleService.getFlowEntries(egress.deviceId()).forEach(r -> {
diff --git a/apps/fwd/src/main/java/org/onosproject/fwd/ReactiveForwardingCommand.java b/apps/fwd/src/main/java/org/onosproject/fwd/ReactiveForwardingCommand.java
new file mode 100644
index 0000000..0f6641c
--- /dev/null
+++ b/apps/fwd/src/main/java/org/onosproject/fwd/ReactiveForwardingCommand.java
@@ -0,0 +1,40 @@
+/*
+ * Copyright 2017-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.fwd;
+import org.apache.karaf.shell.commands.Command;
+import org.onosproject.cli.AbstractShellCommand;
+import org.apache.karaf.shell.commands.Argument;
+import org.onlab.packet.MacAddress;
+
+/**
+ * Sample reactive forwarding application.
+ */
+@Command(scope = "onos", name = "reactive-fwd-metrics",
+        description = "List all the metrics of reactive fwd app based on mac address")
+public class ReactiveForwardingCommand extends AbstractShellCommand {
+    @Argument(index = 0, name = "mac", description = "One Mac Address",
+            required = false, multiValued = false)
+    String mac = null;
+    @Override
+    protected void execute() {
+        ReactiveForwarding reactiveForwardingService = AbstractShellCommand.get(ReactiveForwarding.class);
+        MacAddress macAddress = null;
+        if (mac != null) {
+            macAddress = MacAddress.valueOf(mac);
+        }
+        reactiveForwardingService.printMetric(macAddress);
+    }
+}
diff --git a/apps/fwd/src/main/resources/OSGI-INF/blueprint/shell-config.xml b/apps/fwd/src/main/resources/OSGI-INF/blueprint/shell-config.xml
new file mode 100644
index 0000000..fb321cb
--- /dev/null
+++ b/apps/fwd/src/main/resources/OSGI-INF/blueprint/shell-config.xml
@@ -0,0 +1,27 @@
+<!--
++  ~ Copyright 2017-present Open Networking Laboratory
++  ~
++  ~ Licensed under the Apache License, Version 2.0 (the "License");
++  ~ you may not use this file except in compliance with the License.
++  ~ You may obtain a copy of the License at
++  ~
++  ~     http://www.apache.org/licenses/LICENSE-2.0
++  ~
++  ~ Unless required by applicable law or agreed to in writing, software
++  ~ distributed under the License is distributed on an "AS IS" BASIS,
++  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++  ~ See the License for the specific language governing permissions and
++  ~ limitations under the License.
++  -->
+<blueprint xmlns="http://www.osgi.org/xmlns/blueprint/v1.0.0">
+    <command-bundle xmlns="http://karaf.apache.org/xmlns/shell/v1.1.0">
+        <command>
+            <action class="org.onosproject.fwd.ReactiveForwardingCommand"/>
+            <completers>
+            <ref component-id="MacAddressCompleter"/>
+            </completers>
+        </command>
+    </command-bundle>
+    <bean id="MacAddressCompleter" class="org.onosproject.fwd.MacAddressCompleter"/>
+</blueprint>
+