diff --git a/apps/kafka-integration/api/src/main/java/org/onosproject/kafkaintegration/api/dto/DefaultEventSubscriber.java b/apps/kafka-integration/api/src/main/java/org/onosproject/kafkaintegration/api/dto/DefaultEventSubscriber.java
new file mode 100644
index 0000000..bf1fbf8
--- /dev/null
+++ b/apps/kafka-integration/api/src/main/java/org/onosproject/kafkaintegration/api/dto/DefaultEventSubscriber.java
@@ -0,0 +1,136 @@
+/**
+ * Copyright 2016-present Open Networking Laboratory
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.kafkaintegration.api.dto;
+
+import static com.google.common.base.MoreObjects.toStringHelper;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.util.Objects;
+
+import org.onosproject.kafkaintegration.api.dto.OnosEvent.Type;
+
+/**
+ * Representation of a subscription to an event type.
+ *
+ */
+public final class DefaultEventSubscriber implements EventSubscriber {
+    private final String appName;
+    private final EventSubscriberGroupId subscriberGroupId;
+    private final Type eventType;
+
+    /**
+     * Creates a new Event Subscriber.
+     *
+     * @param name Application Name
+     * @param groupId Subscriber group id of the application
+     * @param eventType ONOS event type
+     */
+    public DefaultEventSubscriber(String name, EventSubscriberGroupId groupId,
+                                  Type eventType) {
+        this.appName = checkNotNull(name);
+        this.subscriberGroupId = checkNotNull(groupId);
+        this.eventType = checkNotNull(eventType);
+    }
+
+    @Override
+    public String appName() {
+        return appName;
+    }
+
+    @Override
+    public EventSubscriberGroupId subscriberGroupId() {
+        return subscriberGroupId;
+    }
+
+    @Override
+    public Type eventType() {
+        return eventType;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(appName, subscriberGroupId, eventType);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (o instanceof DefaultEventSubscriber) {
+            DefaultEventSubscriber sub = (DefaultEventSubscriber) o;
+            if (sub.appName.equals(appName)
+                    && sub.subscriberGroupId.equals(subscriberGroupId)
+                    && sub.eventType.equals(eventType)) {
+                return true;
+            }
+        }
+
+        return false;
+    }
+
+    @Override
+    public String toString() {
+        return toStringHelper(this).add("appName", appName)
+                .addValue(subscriberGroupId.toString())
+                .add("eventType", eventType).toString();
+    }
+    /**
+     * To create an instance of the builder.
+     *
+     * @return instance of builder
+     */
+    public static Builder builder() {
+        return new Builder();
+    }
+    /**
+     * Builder class for Event subscriber.
+     */
+    public static final class Builder implements EventSubscriber.Builder {
+        private String appName;
+        private EventSubscriberGroupId subscriberGroupId;
+        private Type eventType;
+
+        @Override
+        public Builder setAppName(String appName) {
+            this.appName = appName;
+            return this;
+        }
+
+        @Override
+        public Builder setSubscriberGroupId(EventSubscriberGroupId
+                                                            subscriberGroupId) {
+            this.subscriberGroupId = subscriberGroupId;
+            return this;
+        }
+
+        @Override
+        public Builder setEventType(Type eventType) {
+            this.eventType = eventType;
+            return this;
+        }
+
+        @Override
+        public EventSubscriber build() {
+            checkNotNull(appName, "App name cannot be null");
+            checkNotNull(subscriberGroupId, "Subscriber group ID cannot " +
+                    "be " +
+                    "null");
+            checkNotNull(eventType, "Event type cannot be null");
+
+            return new DefaultEventSubscriber(appName,
+                                              subscriberGroupId,
+                                              eventType);
+        }
+    }
+
+}
diff --git a/apps/kafka-integration/api/src/main/java/org/onosproject/kafkaintegration/api/dto/EventSubscriber.java b/apps/kafka-integration/api/src/main/java/org/onosproject/kafkaintegration/api/dto/EventSubscriber.java
index 95647e7..3de8199 100644
--- a/apps/kafka-integration/api/src/main/java/org/onosproject/kafkaintegration/api/dto/EventSubscriber.java
+++ b/apps/kafka-integration/api/src/main/java/org/onosproject/kafkaintegration/api/dto/EventSubscriber.java
@@ -13,87 +13,42 @@
  * limitations under the License.
  */
 package org.onosproject.kafkaintegration.api.dto;
-
-import static com.google.common.base.MoreObjects.toStringHelper;
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import java.util.Objects;
-
 import org.onosproject.kafkaintegration.api.dto.OnosEvent.Type;
 
 /**
- * Representation of a subscription to an event type.
- *
+ * Abstraction of subscription to an event type.
  */
-public final class EventSubscriber {
-    private final String appName;
-    private final EventSubscriberGroupId subscriberGroupId;
-    private final Type eventType;
+public interface EventSubscriber {
+    /**
+     * Returns the application name.
+     *
+     * @return application name.
+     */
+    String appName();
 
     /**
-     * Creates a new Event Subscriber.
-     *
-     * @param name Application Name
-     * @param groupId Subscriber group id of the application
-     * @param eventType ONOS event type
+     * Returns the subscriber group ID.
+     * @return subscriber group ID.
      */
-    public EventSubscriber(String name, EventSubscriberGroupId groupId,
-                           Type eventType) {
-        this.appName = checkNotNull(name);
-        this.subscriberGroupId = checkNotNull(groupId);
-        this.eventType = checkNotNull(eventType);
-    }
-
-    /**
-     * Returns the Application Name.
-     *
-     * @return application name
-     */
-    public String appName() {
-        return appName;
-    }
-
-    /**
-     * Returns the Subscriber Group Id.
-     *
-     * @return Subscriber Group Id
-     */
-    public EventSubscriberGroupId subscriberGroupId() {
-        return subscriberGroupId;
-    }
+    EventSubscriberGroupId subscriberGroupId();
 
     /**
      * Returns the Event type.
      *
      * @return ONOS Event Type
      */
-    public Type eventType() {
-        return eventType;
-    }
+    Type eventType();
 
-    @Override
-    public int hashCode() {
-        return Objects.hash(appName, subscriberGroupId, eventType);
-    }
+    /**
+     * An event subscriber builder.
+     */
+    interface Builder {
+        Builder setAppName(String appName);
 
-    @Override
-    public boolean equals(Object o) {
-        if (o instanceof EventSubscriber) {
-            EventSubscriber sub = (EventSubscriber) o;
-            if (sub.appName.equals(appName)
-                    && sub.subscriberGroupId.equals(subscriberGroupId)
-                    && sub.eventType.equals(eventType)) {
-                return true;
-            }
-        }
+        Builder setSubscriberGroupId(EventSubscriberGroupId subscriberGroupId);
 
-        return false;
-    }
+        Builder setEventType(Type eventType);
 
-    @Override
-    public String toString() {
-        return toStringHelper(this).add("appName", appName)
-                .addValue(subscriberGroupId.toString())
-                .add("eventType", eventType).toString();
+        EventSubscriber build();
     }
 }
diff --git a/apps/kafka-integration/app/pom.xml b/apps/kafka-integration/app/pom.xml
index 4e1d535..20b9427 100644
--- a/apps/kafka-integration/app/pom.xml
+++ b/apps/kafka-integration/app/pom.xml
@@ -166,6 +166,7 @@
                         com.fasterxml.jackson.core,
                         org.onlab.packet.*,
                         org.onosproject.*,
+                        org.onlab.util.*,
                         com.google.common.*
                     </Import-Package>
                     <Web-ContextPath>${web.context}</Web-ContextPath>
diff --git a/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/impl/EventExporterManager.java b/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/impl/EventExporterManager.java
index 6e1a0e8..e2e8e57 100644
--- a/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/impl/EventExporterManager.java
+++ b/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/impl/EventExporterManager.java
@@ -19,6 +19,9 @@
 import static org.onosproject.kafkaintegration.api.dto.OnosEvent.Type.DEVICE;
 import static org.onosproject.kafkaintegration.api.dto.OnosEvent.Type.LINK;
 
+import org.onosproject.kafkaintegration.api.dto.DefaultEventSubscriber;
+import org.onosproject.kafkaintegration.api.dto.EventSubscriber;
+
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
@@ -33,8 +36,8 @@
 import org.onosproject.core.ApplicationId;
 import org.onosproject.core.CoreService;
 import org.onosproject.kafkaintegration.api.EventExporterService;
-import org.onosproject.kafkaintegration.api.dto.EventSubscriber;
 import org.onosproject.kafkaintegration.api.dto.EventSubscriberGroupId;
+import org.onosproject.kafkaintegration.api.dto.OnosEvent;
 import org.onosproject.kafkaintegration.api.dto.OnosEvent.Type;
 import org.onosproject.kafkaintegration.errors.InvalidApplicationException;
 import org.onosproject.kafkaintegration.errors.InvalidGroupIdException;
@@ -98,7 +101,12 @@
                 .<Type, List<EventSubscriber>>consistentMapBuilder()
                 .withName(SUBSCRIBED_APPS)
                 .withSerializer(Serializer.using(KryoNamespaces.API,
-                                                 EventSubscriber.class))
+                                                 EventSubscriber.class,
+                                                 OnosEvent.class,
+                                                 OnosEvent.Type.class,
+                                                 DefaultEventSubscriber.class,
+                                                 EventSubscriberGroupId.class,
+                                                 UUID.class))
                 .build().asJavaMap();
 
         log.info("Started");
@@ -119,7 +127,6 @@
         return registeredApps.computeIfAbsent(externalAppId,
                                               (key) -> new EventSubscriberGroupId(UUID
                                                       .randomUUID()));
-
     }
 
     @Override
diff --git a/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/impl/KafkaCodecRegistrator.java b/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/impl/KafkaCodecRegistrator.java
new file mode 100644
index 0000000..47098fa
--- /dev/null
+++ b/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/impl/KafkaCodecRegistrator.java
@@ -0,0 +1,48 @@
+/**
+ * Copyright 2016-present Open Networking Laboratory
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.kafkaintegration.impl;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.onosproject.codec.CodecService;
+import org.onosproject.kafkaintegration.api.dto.EventSubscriber;
+import org.onosproject.kafkaintegration.rest.SubscriberCodec;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Implementation of the JSON codec brokering service for Kafka app.
+ */
+@Component(immediate = true)
+public class KafkaCodecRegistrator {
+    private static Logger log = LoggerFactory.getLogger(KafkaCodecRegistrator
+                                                                .class);
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected CodecService codecService;
+
+    @Activate
+    public void activate() {
+        codecService.registerCodec(EventSubscriber.class, new SubscriberCodec());
+        log.info("Started");
+    }
+
+    @Deactivate
+    public void deactivate() {
+        log.info("Stopped");
+    }
+}
diff --git a/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/rest/EventExporterWebResource.java b/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/rest/EventExporterWebResource.java
index 520976a..0456362 100644
--- a/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/rest/EventExporterWebResource.java
+++ b/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/rest/EventExporterWebResource.java
@@ -28,7 +28,6 @@
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
 
-import org.onosproject.codec.JsonCodec;
 import org.onosproject.kafkaintegration.api.EventExporterService;
 import org.onosproject.kafkaintegration.api.dto.EventSubscriber;
 import org.onosproject.kafkaintegration.api.dto.EventSubscriberGroupId;
@@ -54,9 +53,10 @@
             "De-Registered Listener successfully";
     public static final String EVENT_SUBSCRIPTION_SUCCESSFUL =
             "Event Registration successfull";
+    public static final String EVENT_SUBSCRIPTION_UNSUCCESSFUL =
+            "Event subscription unsuccessful";
     public static final String EVENT_SUBSCRIPTION_REMOVED =
             "Event De-Registration successfull";
-
     /**
      * Registers a listener for ONOS Events.
      *
@@ -95,7 +95,7 @@
         EventExporterService service = get(EventExporterService.class);
 
         service.unregisterListener(appName);
-
+        log.info("Unregistered app {}", appName);
         return ok(DEREGISTRATION_SUCCESSFUL).build();
     }
 
@@ -107,6 +107,7 @@
      * @onos.rsModel KafkaSubscription
      */
     @POST
+    @Consumes(MediaType.APPLICATION_JSON)
     @Produces(MediaType.APPLICATION_JSON)
     @Path("subscribe")
     public Response subscribe(InputStream input) {
@@ -136,11 +137,10 @@
 
         ObjectMapper mapper = new ObjectMapper();
         ObjectNode node = (ObjectNode) mapper.readTree(input);
-
         checkNotNull(node, JSON_NOT_NULL);
-
-        JsonCodec<EventSubscriber> codec = codec(EventSubscriber.class);
-        return codec.decode(node, this);
+        EventSubscriber codec = codec(EventSubscriber.class).decode(node, this);
+        checkNotNull(codec, JSON_NOT_NULL);
+        return codec;
     }
 
     /**
diff --git a/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/rest/SubscriberCodec.java b/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/rest/SubscriberCodec.java
index f7a02b4..2876ca1 100644
--- a/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/rest/SubscriberCodec.java
+++ b/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/rest/SubscriberCodec.java
@@ -20,10 +20,10 @@
 
 import org.onosproject.codec.CodecContext;
 import org.onosproject.codec.JsonCodec;
+import org.onosproject.kafkaintegration.api.dto.DefaultEventSubscriber;
 import org.onosproject.kafkaintegration.api.dto.EventSubscriber;
 import org.onosproject.kafkaintegration.api.dto.EventSubscriberGroupId;
 import org.onosproject.kafkaintegration.api.dto.OnosEvent.Type;
-
 import com.fasterxml.jackson.databind.node.ObjectNode;
 
 /**
@@ -47,13 +47,18 @@
 
     @Override
     public EventSubscriber decode(ObjectNode json, CodecContext context) {
-        String name = json.path(NAME).asText();
-        String groupId = json.path(GROUP_ID).asText();
-        EventSubscriberGroupId subscriberGroupId = new EventSubscriberGroupId(UUID
-                .fromString(groupId));
-        String eventType = json.path(EVENT_TYPE).asText();
 
-        return new EventSubscriber(name, subscriberGroupId,
-                                   Type.valueOf(eventType));
+        EventSubscriber.Builder resultBuilder = new DefaultEventSubscriber
+                .Builder();
+        String appName = json.get(NAME).asText();
+        resultBuilder.setAppName(appName);
+
+        String subscriberGroupId = json.get(GROUP_ID).asText();
+        resultBuilder.setSubscriberGroupId(new EventSubscriberGroupId(UUID.
+                fromString(subscriberGroupId)));
+
+        String eventType = json.get(EVENT_TYPE).asText();
+        resultBuilder.setEventType(Type.valueOf(eventType));
+        return resultBuilder.build();
     }
-}
+}
\ No newline at end of file
