[ONOS-7683] Add JSON codecs for FlowInfo and StatsInfo with tests

Change-Id: I12560e5f72e704a72a59465de347ddb32051aabb
diff --git a/apps/openstacktelemetry/app/BUCK b/apps/openstacktelemetry/app/BUCK
index 063e928..4696a55 100644
--- a/apps/openstacktelemetry/app/BUCK
+++ b/apps/openstacktelemetry/app/BUCK
@@ -21,6 +21,8 @@
     '//lib:TEST_ADAPTERS',
     '//core/api:onos-api-tests',
     '//core/common:onos-core-common-tests',
+    '//web/api:onos-rest-tests',
+    '//lib:TEST_REST',
 ]
 
 osgi_jar_with_tests (
diff --git a/apps/openstacktelemetry/app/pom.xml b/apps/openstacktelemetry/app/pom.xml
index 7634310..841ce2e 100644
--- a/apps/openstacktelemetry/app/pom.xml
+++ b/apps/openstacktelemetry/app/pom.xml
@@ -56,6 +56,24 @@
         </dependency>
 
         <dependency>
+            <groupId>com.google.protobuf</groupId>
+            <artifactId>protobuf-java</artifactId>
+            <version>3.2.0</version>
+        </dependency>
+
+        <dependency>
+            <groupId>io.grpc</groupId>
+            <artifactId>grpc-core</artifactId>
+            <version>1.3.1</version>
+        </dependency>
+
+        <dependency>
+            <groupId>io.grpc</groupId>
+            <artifactId>grpc-protobuf</artifactId>
+            <version>1.3.1</version>
+        </dependency>
+
+        <dependency>
             <groupId>org.onosproject</groupId>
             <artifactId>onos-apps-openstacktelemetry-api</artifactId>
             <version>${project.version}</version>
@@ -120,6 +138,79 @@
             <artifactId>commons-io</artifactId>
         </dependency>
 
+        <dependency>
+            <groupId>org.glassfish.jersey.core</groupId>
+            <artifactId>jersey-client</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.glassfish.jersey.containers</groupId>
+            <artifactId>jersey-container-servlet</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.glassfish.jersey.test-framework</groupId>
+            <artifactId>jersey-test-framework-core</artifactId>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.glassfish.jersey.test-framework.providers</groupId>
+            <artifactId>jersey-test-framework-provider-jetty</artifactId>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.onosproject</groupId>
+            <artifactId>onlab-osgi</artifactId>
+            <classifier>tests</classifier>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.onosproject</groupId>
+            <artifactId>onos-rest</artifactId>
+            <version>${project.version}</version>
+            <classifier>tests</classifier>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.glassfish.jersey.core</groupId>
+            <artifactId>jersey-common</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.glassfish.jersey.inject</groupId>
+            <artifactId>jersey-hk2</artifactId>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.onosproject</groupId>
+            <artifactId>onos-api</artifactId>
+            <classifier>tests</classifier>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>com.google.guava</groupId>
+            <artifactId>guava-testlib</artifactId>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.onosproject</groupId>
+            <artifactId>onlab-junit</artifactId>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.easymock</groupId>
+            <artifactId>easymock</artifactId>
+            <scope>test</scope>
+        </dependency>
+
     </dependencies>
 
     <build>
diff --git a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/codec/FlowInfoJsonCodec.java b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/codec/FlowInfoJsonCodec.java
new file mode 100644
index 0000000..4244c7f
--- /dev/null
+++ b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/codec/FlowInfoJsonCodec.java
@@ -0,0 +1,134 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.openstacktelemetry.codec;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.onlab.packet.IpAddress;
+import org.onlab.packet.IpPrefix;
+import org.onlab.packet.MacAddress;
+import org.onlab.packet.TpPort;
+import org.onosproject.codec.CodecContext;
+import org.onosproject.codec.JsonCodec;
+import org.onosproject.net.DeviceId;
+import org.onosproject.openstacktelemetry.api.FlowInfo;
+import org.onosproject.openstacktelemetry.api.StatsInfo;
+import org.onosproject.openstacktelemetry.impl.DefaultFlowInfo;
+import org.slf4j.Logger;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.slf4j.LoggerFactory.getLogger;
+
+/**
+ * Openstack telemetry codec used for serializing and de-serializing JSON string.
+ */
+public final class FlowInfoJsonCodec extends JsonCodec<FlowInfo> {
+
+    private final Logger log = getLogger(getClass());
+
+    private static final String FLOW_TYPE = "flowType";
+    private static final String DEVICE_ID = "deviceId";
+    private static final String INPUT_INTERFACE_ID = "inputInterfaceId";
+    private static final String OUTPUT_INTERFACE_ID = "outputInterfaceId";
+
+    private static final String VLAN_ID = "vlanId";
+    private static final String VXLAN_ID = "vxlanId";
+    private static final String SRC_IP = "srcIp";
+    private static final String SRC_IP_PREFIX_LEN = "srcIpPrefixLength";
+    private static final String DST_IP = "dstIp";
+    private static final String DST_IP_PREFIX_LEN = "dstIpPrefixLength";
+    private static final String SRC_PORT = "srcPort";
+    private static final String DST_PORT = "dstPort";
+    private static final String PROTOCOL = "protocol";
+    private static final String SRC_MAC = "srcMac";
+    private static final String DST_MAC = "dstMac";
+    private static final String STATS_INFO = "statsInfo";
+
+    @Override
+    public ObjectNode encode(FlowInfo info, CodecContext context) {
+        checkNotNull(info, "FlowInfo cannot be null");
+
+        ObjectNode result = context.mapper().createObjectNode()
+                .put(FLOW_TYPE, info.flowType())
+                .put(DEVICE_ID, info.deviceId().toString())
+                .put(INPUT_INTERFACE_ID, info.inputInterfaceId())
+                .put(OUTPUT_INTERFACE_ID, info.outputInterfaceId())
+                .put(SRC_IP, info.srcIp().address().toString())
+                .put(SRC_IP_PREFIX_LEN, info.srcIp().prefixLength())
+                .put(DST_IP, info.dstIp().address().toString())
+                .put(DST_IP_PREFIX_LEN, info.dstIp().prefixLength())
+                .put(SRC_PORT, info.srcPort().toString())
+                .put(DST_PORT, info.dstPort().toString())
+                .put(PROTOCOL, info.protocol())
+                .put(SRC_MAC, info.srcMac().toString())
+                .put(DST_MAC, info.dstMac().toString());
+
+
+        if (info.vlanId() != null) {
+            result.put(VLAN_ID, info.vlanId().toString());
+        } else {
+            result.put(VXLAN_ID, info.vxlanId());
+        }
+
+        ObjectNode statsInfoJson =
+                context.codec(StatsInfo.class).encode(info.statsInfo(), context);
+
+        result.put(STATS_INFO, statsInfoJson);
+
+        return result;
+    }
+
+    @Override
+    public FlowInfo decode(ObjectNode json, CodecContext context) {
+        if (json == null || !json.isObject()) {
+            return null;
+        }
+
+        String flowType = json.get(FLOW_TYPE).asText();
+        String deviceId = json.get(DEVICE_ID).asText();
+        int inputInterfaceId = json.get(INPUT_INTERFACE_ID).asInt();
+        int outputInterfaceId = json.get(OUTPUT_INTERFACE_ID).asInt();
+        String srcIp = json.get(SRC_IP).asText();
+        int srcIpPrefixLength = json.get(SRC_IP_PREFIX_LEN).asInt();
+        String dstIp = json.get(DST_IP).asText();
+        int dstIpPrefixLength = json.get(DST_IP_PREFIX_LEN).asInt();
+        int srcPort = json.get(SRC_PORT).asInt();
+        int dstPort = json.get(DST_PORT).asInt();
+        String protocol = json.get(PROTOCOL).asText();
+        String srcMac = json.get(SRC_MAC).asText();
+        String dstMac = json.get(DST_MAC).asText();
+
+        JsonNode statsInfoJson = json.get(STATS_INFO);
+
+        JsonCodec<StatsInfo> statsInfoCodec = context.codec(StatsInfo.class);
+        StatsInfo statsInfo = statsInfoCodec.decode((ObjectNode) statsInfoJson.deepCopy(), context);
+
+        return new DefaultFlowInfo.DefaultBuilder()
+                .withFlowType(Byte.valueOf(flowType))
+                .withDeviceId(DeviceId.deviceId(deviceId))
+                .withInputInterfaceId(inputInterfaceId)
+                .withOutputInterfaceId(outputInterfaceId)
+                .withSrcIp(IpPrefix.valueOf(IpAddress.valueOf(srcIp), srcIpPrefixLength))
+                .withDstIp(IpPrefix.valueOf(IpAddress.valueOf(dstIp), dstIpPrefixLength))
+                .withSrcPort(TpPort.tpPort(srcPort))
+                .withDstPort(TpPort.tpPort(dstPort))
+                .withProtocol(Byte.valueOf(protocol))
+                .withSrcMac(MacAddress.valueOf(srcMac))
+                .withDstMac(MacAddress.valueOf(dstMac))
+                .withStatsInfo(statsInfo)
+                .build();
+    }
+}
diff --git a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/codec/StatsInfoJsonCodec.java b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/codec/StatsInfoJsonCodec.java
new file mode 100644
index 0000000..53cf924
--- /dev/null
+++ b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/codec/StatsInfoJsonCodec.java
@@ -0,0 +1,79 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.openstacktelemetry.codec;
+
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.onosproject.codec.CodecContext;
+import org.onosproject.codec.JsonCodec;
+import org.onosproject.openstacktelemetry.api.StatsInfo;
+import org.onosproject.openstacktelemetry.impl.DefaultStatsInfo;
+import org.slf4j.Logger;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.slf4j.LoggerFactory.getLogger;
+
+/**
+ * Openstack telemetry codec used for serializing and de-serializing JSON string.
+ */
+public final class StatsInfoJsonCodec extends JsonCodec<StatsInfo> {
+
+    private final Logger log = getLogger(getClass());
+
+    private static final String STARTUP_TIME = "startupTime";
+    private static final String FST_PKT_ARR_TIME = "fstPktArrTime";
+    private static final String LST_PKT_OFFSET = "lstPktOffset";
+    private static final String PREV_ACC_BYTES = "prevAccBytes";
+    private static final String PREV_ACC_PKTS = "prevAccPkts";
+    private static final String CURR_ACC_BYTES = "currAccBytes";
+    private static final String CURR_ACC_PKTS = "currAccPkts";
+    private static final String ERROR_PKTS = "errorPkts";
+    private static final String DROP_PKTS = "dropPkts";
+
+    @Override
+    public ObjectNode encode(StatsInfo info, CodecContext context) {
+        checkNotNull(info, "StatsInfo cannot be null");
+
+        return context.mapper().createObjectNode()
+                .put(STARTUP_TIME, info.startupTime())
+                .put(FST_PKT_ARR_TIME, info.fstPktArrTime())
+                .put(LST_PKT_OFFSET, info.lstPktOffset())
+                .put(PREV_ACC_BYTES, info.prevAccBytes())
+                .put(PREV_ACC_PKTS, info.prevAccPkts())
+                .put(CURR_ACC_BYTES, info.prevAccBytes())
+                .put(CURR_ACC_PKTS, info.prevAccPkts())
+                .put(ERROR_PKTS, info.errorPkts())
+                .put(DROP_PKTS, info.dropPkts());
+    }
+
+    @Override
+    public StatsInfo decode(ObjectNode json, CodecContext context) {
+        if (json == null || !json.isObject()) {
+            return null;
+        }
+
+        return new DefaultStatsInfo.DefaultBuilder()
+                .withStartupTime(json.get(STARTUP_TIME).asLong())
+                .withFstPktArrTime(json.get(FST_PKT_ARR_TIME).asLong())
+                .withLstPktOffset(json.get(LST_PKT_OFFSET).asInt())
+                .withPrevAccBytes(json.get(PREV_ACC_BYTES).asLong())
+                .withPrevAccPkts(json.get(PREV_ACC_PKTS).asInt())
+                .withCurrAccBytes(json.get(CURR_ACC_BYTES).asLong())
+                .withCurrAccPkts(json.get(CURR_ACC_PKTS).asInt())
+                .withErrorPkts((short) json.get(ERROR_PKTS).asInt())
+                .withDropPkts((short) json.get(DROP_PKTS).asInt())
+                .build();
+    }
+}
diff --git a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/web/OpenstackTelemetryCodecRegister.java b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/web/OpenstackTelemetryCodecRegister.java
new file mode 100644
index 0000000..30737ac
--- /dev/null
+++ b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/web/OpenstackTelemetryCodecRegister.java
@@ -0,0 +1,57 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.openstacktelemetry.web;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.onosproject.codec.CodecService;
+import org.onosproject.openstacktelemetry.api.FlowInfo;
+import org.onosproject.openstacktelemetry.api.StatsInfo;
+import org.onosproject.openstacktelemetry.codec.FlowInfoJsonCodec;
+import org.onosproject.openstacktelemetry.codec.StatsInfoJsonCodec;
+
+import static org.slf4j.LoggerFactory.getLogger;
+
+/**
+ * Implementation of the JSON codec brokering service for OpenstackTelemetry.
+ */
+@Component(immediate = true)
+public class OpenstackTelemetryCodecRegister {
+
+    private final org.slf4j.Logger log = getLogger(getClass());
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected CodecService codecService;
+
+    @Activate
+    protected void activate() {
+        codecService.registerCodec(StatsInfo.class, new StatsInfoJsonCodec());
+        codecService.registerCodec(FlowInfo.class, new FlowInfoJsonCodec());
+
+        log.info("Started");
+    }
+
+    @Deactivate
+    protected void deactivate() {
+        codecService.unregisterCodec(StatsInfo.class);
+        codecService.unregisterCodec(FlowInfo.class);
+
+        log.info("Stopped");
+    }
+}
diff --git a/apps/openstacktelemetry/app/src/test/java/org/onosproject/openstacktelemetry/codec/FlowInfoJsonCodecTest.java b/apps/openstacktelemetry/app/src/test/java/org/onosproject/openstacktelemetry/codec/FlowInfoJsonCodecTest.java
new file mode 100644
index 0000000..aed2b76
--- /dev/null
+++ b/apps/openstacktelemetry/app/src/test/java/org/onosproject/openstacktelemetry/codec/FlowInfoJsonCodecTest.java
@@ -0,0 +1,176 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.openstacktelemetry.codec;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.junit.Before;
+import org.junit.Test;
+import org.onlab.packet.IpAddress;
+import org.onlab.packet.IpPrefix;
+import org.onlab.packet.MacAddress;
+import org.onlab.packet.TpPort;
+import org.onlab.packet.VlanId;
+import org.onosproject.codec.CodecContext;
+import org.onosproject.codec.JsonCodec;
+import org.onosproject.codec.impl.CodecManager;
+import org.onosproject.core.CoreService;
+import org.onosproject.net.DeviceId;
+import org.onosproject.openstacktelemetry.api.FlowInfo;
+import org.onosproject.openstacktelemetry.api.StatsInfo;
+import org.onosproject.openstacktelemetry.impl.DefaultFlowInfo;
+import org.onosproject.openstacktelemetry.impl.DefaultStatsInfo;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.easymock.EasyMock.createMock;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.replay;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.onosproject.net.NetTestTools.APP_ID;
+import static org.onosproject.openstacktelemetry.codec.FlowInfoJsonMatcher.matchesFlowInfo;
+
+/**
+ * Unit tests for FlowInfo codec.
+ */
+public class FlowInfoJsonCodecTest {
+    MockCodecContext context;
+    JsonCodec<FlowInfo> flowInfoCodec;
+    JsonCodec<StatsInfo> statsInfoCodec;
+    final CoreService mockCoreService = createMock(CoreService.class);
+    private static final String REST_APP_ID = "org.onosproject.rest";
+
+    private static final int INPUT_INTERFACE_ID = 1;
+    private static final int OUTPUT_INTERFACE_ID = 2;
+
+    private static final int VLAN_ID = 1;
+    private static final int PROTOCOL = 1;
+    private static final int FLOW_TYPE = 1;
+    private static final String DEVICE_ID = "foo";
+
+    private static final String SRC_IP_ADDRESS = "10.10.10.1";
+    private static final int SRC_IP_PREFIX = 24;
+    private static final String DST_IP_ADDRESS = "20.20.20.1";
+    private static final int DST_IP_PREFIX = 24;
+    private static final int SRC_PORT = 1000;
+    private static final int DST_PORT = 2000;
+    private static final String SRC_MAC_ADDRESS = "AA:BB:CC:DD:EE:FF";
+    private static final String DST_MAC_ADDRESS = "FF:EE:DD:CC:BB:AA";
+
+    private static final long LONG_VALUE = 1L;
+    private static final int INTEGER_VALUE = 1;
+    private static final short SHORT_VALUE = (short) 1;
+
+
+    @Before
+    public void setUp() {
+        context = new MockCodecContext();
+        flowInfoCodec = new FlowInfoJsonCodec();
+        statsInfoCodec = new StatsInfoJsonCodec();
+
+        assertThat(flowInfoCodec, notNullValue());
+        assertThat(statsInfoCodec, notNullValue());
+
+        expect(mockCoreService.registerApplication(REST_APP_ID))
+                .andReturn(APP_ID).anyTimes();
+        replay(mockCoreService);
+        context.registerService(CoreService.class, mockCoreService);
+    }
+
+    /**
+     * Tests the flow info encoding.
+     */
+    @Test
+    public void testEncode() {
+        StatsInfo statsInfo = new DefaultStatsInfo.DefaultBuilder()
+                                    .withStartupTime(LONG_VALUE)
+                                    .withFstPktArrTime(LONG_VALUE)
+                                    .withLstPktOffset(INTEGER_VALUE)
+                                    .withPrevAccBytes(LONG_VALUE)
+                                    .withPrevAccPkts(INTEGER_VALUE)
+                                    .withCurrAccBytes(LONG_VALUE)
+                                    .withCurrAccPkts(INTEGER_VALUE)
+                                    .withErrorPkts(SHORT_VALUE)
+                                    .withDropPkts(SHORT_VALUE)
+                                    .build();
+        FlowInfo flowInfo = new DefaultFlowInfo.DefaultBuilder()
+                                    .withFlowType((byte) FLOW_TYPE)
+                                    .withDeviceId(DeviceId.deviceId(DEVICE_ID))
+                                    .withInputInterfaceId(INPUT_INTERFACE_ID)
+                                    .withOutputInterfaceId(OUTPUT_INTERFACE_ID)
+                                    .withVlanId(VlanId.vlanId((short) VLAN_ID))
+                                    .withSrcIp(IpPrefix.valueOf(
+                                            IpAddress.valueOf(SRC_IP_ADDRESS), SRC_IP_PREFIX))
+                                    .withDstIp(IpPrefix.valueOf(
+                                            IpAddress.valueOf(DST_IP_ADDRESS), DST_IP_PREFIX))
+                                    .withSrcPort(TpPort.tpPort(SRC_PORT))
+                                    .withDstPort(TpPort.tpPort(DST_PORT))
+                                    .withProtocol((byte) PROTOCOL)
+                                    .withSrcMac(MacAddress.valueOf(SRC_MAC_ADDRESS))
+                                    .withDstMac(MacAddress.valueOf(DST_MAC_ADDRESS))
+                                    .withStatsInfo(statsInfo)
+                                    .build();
+
+        ObjectNode nodeJson = flowInfoCodec.encode(flowInfo, context);
+        assertThat(nodeJson, matchesFlowInfo(flowInfo));
+    }
+
+    /**
+     * Mock codec context for use in codec unit tests.
+     */
+    private class MockCodecContext implements CodecContext {
+        private final ObjectMapper mapper = new ObjectMapper();
+        private final CodecManager manager = new CodecManager();
+        private final Map<Class<?>, Object> services = new HashMap<>();
+
+        /**
+         * Constructs a new mock codec context.
+         */
+        public MockCodecContext() {
+            manager.activate();
+        }
+
+        @Override
+        public ObjectMapper mapper() {
+            return mapper;
+        }
+
+        @Override
+        @SuppressWarnings("unchecked")
+        public <T> JsonCodec<T> codec(Class<T> entityClass) {
+            if (entityClass == FlowInfo.class) {
+                return (JsonCodec<T>) flowInfoCodec;
+            }
+            if (entityClass == StatsInfo.class) {
+                return (JsonCodec<T>) statsInfoCodec;
+            }
+            return manager.getCodec(entityClass);
+        }
+
+        @SuppressWarnings("unchecked")
+        @Override
+        public <T> T getService(Class<T> serviceClass) {
+            return (T) services.get(serviceClass);
+        }
+
+        // for registering mock services
+        public <T> void registerService(Class<T> serviceClass, T impl) {
+            services.put(serviceClass, impl);
+        }
+    }
+}
diff --git a/apps/openstacktelemetry/app/src/test/java/org/onosproject/openstacktelemetry/codec/FlowInfoJsonMatcher.java b/apps/openstacktelemetry/app/src/test/java/org/onosproject/openstacktelemetry/codec/FlowInfoJsonMatcher.java
new file mode 100644
index 0000000..d795aee
--- /dev/null
+++ b/apps/openstacktelemetry/app/src/test/java/org/onosproject/openstacktelemetry/codec/FlowInfoJsonMatcher.java
@@ -0,0 +1,194 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.openstacktelemetry.codec;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import org.hamcrest.Description;
+import org.hamcrest.TypeSafeDiagnosingMatcher;
+import org.onosproject.openstacktelemetry.api.FlowInfo;
+import org.onosproject.openstacktelemetry.api.StatsInfo;
+
+/**
+ * Hamcrest matcher for flow info.
+ */
+public final class FlowInfoJsonMatcher extends TypeSafeDiagnosingMatcher<JsonNode> {
+
+    private final FlowInfo flowInfo;
+
+    private static final String FLOW_TYPE = "flowType";
+    private static final String DEVICE_ID = "deviceId";
+    private static final String INPUT_INTERFACE_ID = "inputInterfaceId";
+    private static final String OUTPUT_INTERFACE_ID = "outputInterfaceId";
+
+    private static final String VLAN_ID = "vlanId";
+    private static final String VXLAN_ID = "vxlanId";
+    private static final String SRC_IP = "srcIp";
+    private static final String SRC_IP_PREFIX_LEN = "srcIpPrefixLength";
+    private static final String DST_IP = "dstIp";
+    private static final String DST_IP_PREFIX_LEN = "dstIpPrefixLength";
+    private static final String SRC_PORT = "srcPort";
+    private static final String DST_PORT = "dstPort";
+    private static final String PROTOCOL = "protocol";
+    private static final String SRC_MAC = "srcMac";
+    private static final String DST_MAC = "dstMac";
+    private static final String STATS_INFO = "statsInfo";
+
+    private FlowInfoJsonMatcher(FlowInfo flowInfo) {
+        this.flowInfo = flowInfo;
+    }
+
+    @Override
+    protected boolean matchesSafely(JsonNode jsonNode, Description description) {
+
+        // check flow type
+        String jsonFlowType = jsonNode.get(FLOW_TYPE).asText();
+        String flowType = String.valueOf(flowInfo.flowType());
+        if (!jsonFlowType.equals(flowType)) {
+            description.appendText("flow type was " + jsonFlowType);
+            return false;
+        }
+
+        // check device id
+        String jsonDeviceId = jsonNode.get(DEVICE_ID).asText();
+        String deviceId = flowInfo.deviceId().toString();
+        if (!jsonDeviceId.equals(deviceId)) {
+            description.appendText("device id was " + jsonDeviceId);
+            return false;
+        }
+
+        // check input interface id
+        int jsonInputInterfaceId = jsonNode.get(INPUT_INTERFACE_ID).asInt();
+        int inputInterfaceId = flowInfo.inputInterfaceId();
+        if (jsonInputInterfaceId != inputInterfaceId) {
+            description.appendText("input interface id was " + jsonInputInterfaceId);
+            return false;
+        }
+
+        // check output interface id
+        int jsonOutputInterfaceId = jsonNode.get(OUTPUT_INTERFACE_ID).asInt();
+        int outputInterfaceId = flowInfo.outputInterfaceId();
+        if (jsonOutputInterfaceId != outputInterfaceId) {
+            description.appendText("output interface id was " + jsonInputInterfaceId);
+            return false;
+        }
+
+        // check vlan id
+        String jsonVlanId = jsonNode.get(VLAN_ID).asText();
+        String vlanId = flowInfo.vlanId().toString();
+        if (!jsonVlanId.equals(vlanId)) {
+            description.appendText("VLAN id was " + jsonVlanId);
+            return false;
+        }
+
+        // check source IP
+        String jsonSrcIp = jsonNode.get(SRC_IP).asText();
+        String srcIp = flowInfo.srcIp().address().toString();
+        if (!jsonSrcIp.equals(srcIp)) {
+            description.appendText("Source IP was " + jsonSrcIp);
+            return false;
+        }
+
+        // check destination IP
+        String jsonDstIp = jsonNode.get(DST_IP).asText();
+        String dstIp = flowInfo.dstIp().address().toString();
+        if (!jsonDstIp.equals(dstIp)) {
+            description.appendText("Destination IP was " + jsonDstIp);
+            return false;
+        }
+
+        // check source IP prefix length
+        int jsonSrcPrefixLength = jsonNode.get(SRC_IP_PREFIX_LEN).asInt();
+        int srcPrefixLength = flowInfo.srcIp().prefixLength();
+        if (jsonSrcPrefixLength != srcPrefixLength) {
+            description.appendText("Source IP prefix length was " + jsonSrcPrefixLength);
+            return false;
+        }
+
+        // check destination IP prefix length
+        int jsonDstPrefixLength = jsonNode.get(DST_IP_PREFIX_LEN).asInt();
+        int dstPrefixLength = flowInfo.dstIp().prefixLength();
+        if (jsonDstPrefixLength != dstPrefixLength) {
+            description.appendText("Destination IP prefix length was " + jsonDstPrefixLength);
+            return false;
+        }
+
+        // check source port
+        int jsonSrcPort = jsonNode.get(SRC_PORT).asInt();
+        int srcPort = flowInfo.srcPort().toInt();
+        if (jsonSrcPort != srcPort) {
+            description.appendText("Source port was " + jsonSrcPort);
+            return false;
+        }
+
+        // check destination port
+        int jsonDstPort = jsonNode.get(DST_PORT).asInt();
+        int dstPort = flowInfo.dstPort().toInt();
+        if (jsonDstPort != dstPort) {
+            description.appendText("Destination port was " + jsonDstPort);
+            return false;
+        }
+
+        // check protocol
+        String jsonProtocol = jsonNode.get(PROTOCOL).asText();
+        String protocol = String.valueOf(flowInfo.protocol());
+        if (!jsonProtocol.equals(protocol)) {
+            description.appendText("Protocol was " + jsonProtocol);
+            return false;
+        }
+
+        // check source mac
+        String jsonSrcMac = jsonNode.get(SRC_MAC).asText();
+        String srcMac = flowInfo.srcMac().toString();
+        if (!jsonSrcMac.equals(srcMac)) {
+            description.appendText("Source MAC was " + jsonSrcMac);
+            return false;
+        }
+
+        // check destination mac
+        String jsonDstMac = jsonNode.get(DST_MAC).asText();
+        String dstMac = flowInfo.dstMac().toString();
+        if (!jsonDstMac.equals(dstMac)) {
+            description.appendText("Destination MAC was " + jsonDstMac);
+            return false;
+        }
+
+        // check stats info
+        JsonNode jsonStatsInfo = jsonNode.get(STATS_INFO);
+        if (jsonStatsInfo != null) {
+            StatsInfo statsInfo = flowInfo.statsInfo();
+            StatsInfoJsonMatcher infoMatcher =
+                    StatsInfoJsonMatcher.matchStatsInfo(statsInfo);
+            return infoMatcher.matches(jsonStatsInfo);
+        }
+
+        return true;
+    }
+
+    @Override
+    public void describeTo(Description description) {
+        description.appendText(flowInfo.toString());
+    }
+
+    /**
+     * Factory to allocate an flow info matcher.
+     *
+     * @param flowInfo flow info object we are looking for
+     * @return matcher
+     */
+    public static FlowInfoJsonMatcher matchesFlowInfo(FlowInfo flowInfo) {
+        return new FlowInfoJsonMatcher(flowInfo);
+    }
+}
diff --git a/apps/openstacktelemetry/app/src/test/java/org/onosproject/openstacktelemetry/codec/StatsInfoJsonMatcher.java b/apps/openstacktelemetry/app/src/test/java/org/onosproject/openstacktelemetry/codec/StatsInfoJsonMatcher.java
new file mode 100644
index 0000000..ffebaf5
--- /dev/null
+++ b/apps/openstacktelemetry/app/src/test/java/org/onosproject/openstacktelemetry/codec/StatsInfoJsonMatcher.java
@@ -0,0 +1,136 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.openstacktelemetry.codec;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import org.hamcrest.Description;
+import org.hamcrest.TypeSafeDiagnosingMatcher;
+import org.onosproject.openstacktelemetry.api.StatsInfo;
+
+/**
+ * Hamcrest matcher for StatsInfoJsonCodec.
+ */
+public final class StatsInfoJsonMatcher extends TypeSafeDiagnosingMatcher<JsonNode> {
+
+    private final StatsInfo statsInfo;
+
+    private static final String STARTUP_TIME = "startupTime";
+    private static final String FST_PKT_ARR_TIME = "fstPktArrTime";
+    private static final String LST_PKT_OFFSET = "lstPktOffset";
+    private static final String PREV_ACC_BYTES = "prevAccBytes";
+    private static final String PREV_ACC_PKTS = "prevAccPkts";
+    private static final String CURR_ACC_BYTES = "currAccBytes";
+    private static final String CURR_ACC_PKTS = "currAccPkts";
+    private static final String ERROR_PKTS = "errorPkts";
+    private static final String DROP_PKTS = "dropPkts";
+
+    private StatsInfoJsonMatcher(StatsInfo statsInfo) {
+        this.statsInfo = statsInfo;
+    }
+
+    @Override
+    protected boolean matchesSafely(JsonNode jsonNode, Description description) {
+
+        // check startup time
+        long jsonStartupTime = jsonNode.get(STARTUP_TIME).asLong();
+        long startupTime = statsInfo.startupTime();
+        if (jsonStartupTime != startupTime) {
+            description.appendText("startup time was " + jsonStartupTime);
+            return false;
+        }
+
+        // check first packet arrival time
+        long jsonFstPktArrTime = jsonNode.get(FST_PKT_ARR_TIME).asLong();
+        long fstPktArrTime = statsInfo.fstPktArrTime();
+        if (jsonFstPktArrTime != fstPktArrTime) {
+            description.appendText("first packet arrival time was " + jsonFstPktArrTime);
+            return false;
+        }
+
+        // check last packet offset
+        int jsonLstPktOffset = jsonNode.get(LST_PKT_OFFSET).asInt();
+        int lstPktOffset = statsInfo.lstPktOffset();
+        if (jsonLstPktOffset != lstPktOffset) {
+            description.appendText("last packet offset was " + jsonLstPktOffset);
+            return false;
+        }
+
+        // check previous accumulated bytes
+        long jsonPrevAccBytes = jsonNode.get(PREV_ACC_BYTES).asLong();
+        long preAccBytes = statsInfo.prevAccBytes();
+        if (jsonPrevAccBytes != preAccBytes) {
+            description.appendText("previous accumulated bytes was " + jsonPrevAccBytes);
+            return false;
+        }
+
+        // check previous accumulated packets
+        int jsonPreAccPkts = jsonNode.get(PREV_ACC_PKTS).asInt();
+        int preAccPkts = statsInfo.prevAccPkts();
+        if (jsonPreAccPkts != preAccPkts) {
+            description.appendText("previous accumulated packets was " + jsonPreAccPkts);
+            return false;
+        }
+
+        // check current accumulated bytes
+        long jsonCurrAccBytes = jsonNode.get(CURR_ACC_BYTES).asLong();
+        long currAccBytes = statsInfo.currAccBytes();
+        if (jsonCurrAccBytes != currAccBytes) {
+            description.appendText("current accumulated bytes was " + jsonCurrAccBytes);
+            return false;
+        }
+
+        // check current accumulated packets
+        int jsonCurrAccPkts = jsonNode.get(CURR_ACC_PKTS).asInt();
+        int currAccPkts = statsInfo.currAccPkts();
+        if (jsonCurrAccPkts != currAccPkts) {
+            description.appendText("current accumulated packets was " + jsonCurrAccPkts);
+            return false;
+        }
+
+        // check error packets
+        short jsonErrorPkts = (short) jsonNode.get(ERROR_PKTS).asInt();
+        short errorPkts = statsInfo.errorPkts();
+        if (jsonErrorPkts != errorPkts) {
+            description.appendText("error packets was " + jsonErrorPkts);
+            return false;
+        }
+
+        // check drop packets
+        short jsonDropPkts = (short) jsonNode.get(DROP_PKTS).asInt();
+        short dropPkts = statsInfo.dropPkts();
+        if (jsonDropPkts != dropPkts) {
+            description.appendText("drop packets was " + jsonDropPkts);
+            return false;
+        }
+
+        return true;
+    }
+
+    @Override
+    public void describeTo(Description description) {
+        description.appendText(statsInfo.toString());
+    }
+
+    /**
+     * Factory to allocate a stats info matcher.
+     *
+     * @param statsInfo stats info object we are looking for
+     * @return matcher
+     */
+    public static StatsInfoJsonMatcher matchStatsInfo(StatsInfo statsInfo) {
+        return new StatsInfoJsonMatcher(statsInfo);
+    }
+}