Made changes as per comments.
kafkaProducer is now non-static.
TODO: KafkaPublisherManager Service and not Singleton.
Kafka event publishing.
Change-Id: I5ec20a6e4950c38e822468d343521ab77475b7d3
diff --git a/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/impl/EventConversionManager.java b/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/impl/EventConversionManager.java
new file mode 100644
index 0000000..f1d0c97
--- /dev/null
+++ b/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/impl/EventConversionManager.java
@@ -0,0 +1,72 @@
+/**
+ * Copyright 2016-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.kafkaintegration.impl;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Service;
+import org.onosproject.event.Event;
+import org.onosproject.kafkaintegration.api.EventConversionService;
+import org.onosproject.kafkaintegration.api.dto.OnosEvent;
+import org.onosproject.kafkaintegration.converter.DeviceEventConverter;
+import org.onosproject.kafkaintegration.converter.EventConverter;
+import org.onosproject.kafkaintegration.converter.LinkEventConverter;
+import org.onosproject.net.device.DeviceEvent;
+import org.onosproject.net.link.LinkEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.onosproject.kafkaintegration.api.dto.OnosEvent.Type.DEVICE;
+import static org.onosproject.kafkaintegration.api.dto.OnosEvent.Type.LINK;
+
+/**
+ * Implementation of Event Conversion Service.
+ *
+ */
+@Component(immediate = true)
+@Service
+public class EventConversionManager implements EventConversionService {
+
+ private final Logger log = LoggerFactory.getLogger(getClass());
+ private EventConverter deviceEventConverter;
+ private EventConverter linkEventConverter;
+
+ @Activate
+ protected void activate() {
+ deviceEventConverter = new DeviceEventConverter();
+ linkEventConverter = new LinkEventConverter();
+
+ log.info("Started");
+ }
+
+ @Deactivate
+ protected void deactivate() {
+ log.info("Stopped");
+ }
+
+ @Override
+ public OnosEvent convertEvent(Event<?, ?> event) {
+ if (event instanceof DeviceEvent) {
+ return new OnosEvent(DEVICE, deviceEventConverter.convertToProtoMessage(event));
+ } else if (event instanceof LinkEvent) {
+ return new OnosEvent(LINK, linkEventConverter.convertToProtoMessage(event));
+ } else {
+ throw new IllegalArgumentException("Unsupported event type");
+ }
+ }
+}
diff --git a/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/impl/EventSubscriptionManager.java b/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/impl/EventSubscriptionManager.java
new file mode 100644
index 0000000..ebbac6b
--- /dev/null
+++ b/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/impl/EventSubscriptionManager.java
@@ -0,0 +1,333 @@
+/**
+ * Copyright 2016-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.kafkaintegration.impl;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.apache.felix.scr.annotations.Service;
+import org.onosproject.core.ApplicationId;
+import org.onosproject.core.CoreService;
+import org.onosproject.kafkaintegration.api.EventSubscriptionService;
+import org.onosproject.kafkaintegration.api.dto.DefaultEventSubscriber;
+import org.onosproject.kafkaintegration.api.dto.EventSubscriber;
+import org.onosproject.kafkaintegration.api.dto.EventSubscriberGroupId;
+import org.onosproject.kafkaintegration.api.dto.OnosEvent;
+import org.onosproject.kafkaintegration.api.dto.OnosEvent.Type;
+import org.onosproject.kafkaintegration.errors.InvalidApplicationException;
+import org.onosproject.kafkaintegration.errors.InvalidGroupIdException;
+import org.onosproject.kafkaintegration.listener.ListenerFactory;
+import org.onosproject.kafkaintegration.listener.OnosEventListener;
+import org.onosproject.net.device.DeviceService;
+import org.onosproject.net.link.LinkService;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.StorageService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.onosproject.kafkaintegration.api.dto.OnosEvent.Type.DEVICE;
+import static org.onosproject.kafkaintegration.api.dto.OnosEvent.Type.LINK;
+
+/**
+ * Implementation of Event Subscription Manager.
+ *
+ */
+@Component(immediate = true)
+@Service
+public class EventSubscriptionManager implements EventSubscriptionService {
+
+ private final Logger log = LoggerFactory.getLogger(getClass());
+
+ // Stores the currently registered applications for event export service.
+ private Map<ApplicationId, EventSubscriberGroupId> registeredApps;
+
+ private Map<Type, List<EventSubscriber>> subscriptions;
+
+ private static final String REGISTERED_APPS = "registered-applications";
+
+ private static final String SUBSCRIBED_APPS = "event-subscriptions";
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected DeviceService deviceService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected LinkService linkService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected CoreService coreService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected StorageService storageService;
+
+ private ApplicationId appId;
+
+ @Activate
+ protected void activate() {
+ appId = coreService
+ .registerApplication("org.onosproject.kafkaintegration");
+
+ registeredApps = storageService
+ .<ApplicationId, EventSubscriberGroupId>consistentMapBuilder()
+ .withName(REGISTERED_APPS)
+ .withSerializer(Serializer.using(KryoNamespaces.API,
+ EventSubscriberGroupId.class,
+ UUID.class))
+ .build().asJavaMap();
+
+ subscriptions = storageService
+ .<Type, List<EventSubscriber>>consistentMapBuilder()
+ .withName(SUBSCRIBED_APPS)
+ .withSerializer(Serializer.using(KryoNamespaces.API,
+ EventSubscriber.class,
+ OnosEvent.class,
+ OnosEvent.Type.class,
+ DefaultEventSubscriber.class,
+ EventSubscriberGroupId.class,
+ UUID.class))
+ .build().asJavaMap();
+
+ log.info("Started");
+ }
+
+ @Deactivate
+ protected void deactivate() {
+ log.info("Stopped");
+ }
+
+ @Override
+ public EventSubscriberGroupId registerListener(String appName) {
+
+ // TODO: Remove it once ONOS provides a mechanism for external apps
+ // to register with the core service. See Jira - 4409
+ ApplicationId externalAppId = coreService.registerApplication(appName);
+
+ return registeredApps.computeIfAbsent(externalAppId,
+ (key) -> new EventSubscriberGroupId(UUID
+ .randomUUID()));
+ }
+
+ @Override
+ public void unregisterListener(String appName) {
+ ApplicationId externalAppId =
+ checkNotNull(coreService.getAppId(appName));
+ registeredApps.remove(externalAppId);
+ }
+
+ @Override
+ public void subscribe(EventSubscriber subscriber)
+ throws InvalidGroupIdException, InvalidApplicationException {
+
+ checkNotNull(subscriber);
+
+ if (!registeredApplication(subscriber.appName())) {
+ throw new InvalidApplicationException("Application is not "
+ + "registered to make this request.");
+ }
+
+ if (!validGroupId(subscriber.subscriberGroupId(), subscriber.appName())) {
+ throw new InvalidGroupIdException("Incorrect group id in the request");
+ }
+
+ OnosEventListener onosListener = getListener(subscriber.eventType());
+ checkNotNull(onosListener, "No listener for the supported event type - {}", subscriber.eventType());
+
+ applyListenerAction(subscriber.eventType(), onosListener,
+ ListenerAction.START);
+
+ // update internal state
+ List<EventSubscriber> subscriptionList =
+ subscriptions.get(subscriber.eventType());
+ if (subscriptionList == null) {
+ subscriptionList = new ArrayList<EventSubscriber>();
+ }
+ subscriptionList.add(subscriber);
+ subscriptions.put(subscriber.eventType(), subscriptionList);
+
+ log.info("Subscription for {} event by {} successful",
+ subscriber.eventType(), subscriber.appName());
+ }
+
+ /**
+ * Checks if the application has registered.
+ *
+ * @param appName application name
+ * @return true if application has registered
+ */
+ private boolean registeredApplication(String appName) {
+
+ checkNotNull(appName);
+ ApplicationId appId = checkNotNull(coreService.getAppId(appName));
+ if (registeredApps.containsKey(appId)) {
+ return true;
+ }
+
+ log.debug("{} is not registered", appName);
+ return false;
+ }
+
+ /**
+ * Actions that can be performed on the ONOS Event Listeners.
+ *
+ */
+ private enum ListenerAction {
+ START, STOP;
+ }
+
+ /**
+ * Applies the specified action on the Listener.
+ *
+ * @param eventType the ONOS Event type registered by the application
+ * @param onosListener ONOS event listener
+ * @param action to be performed on the listener
+ */
+ private void applyListenerAction(Type eventType,
+ OnosEventListener onosListener,
+ ListenerAction action) {
+ switch (eventType) {
+ case DEVICE:
+ if (action == ListenerAction.START) {
+ onosListener.startListener(DEVICE, deviceService);
+ } else {
+ onosListener.stopListener(DEVICE, deviceService);
+ }
+ break;
+ case LINK:
+ if (action == ListenerAction.START) {
+ onosListener.startListener(LINK, linkService);
+ } else {
+ onosListener.stopListener(LINK, linkService);
+ }
+ break;
+ default:
+ log.error("Cannot {} listener. Unsupported event type {} ",
+ action.toString(), eventType.toString());
+ }
+ }
+
+ /**
+ * Returns the ONOS event listener corresponding to the ONOS Event type.
+ *
+ * @param eventType ONOS event type
+ * @return ONOS event listener
+ */
+ private OnosEventListener getListener(Type eventType) {
+ checkNotNull(eventType);
+ ListenerFactory factory = ListenerFactory.getInstance();
+ OnosEventListener onosListener = factory.getListener(eventType);
+ return onosListener;
+ }
+
+ /**
+ * Checks if the group id is valid for this registered application.
+ *
+ * @param groupId GroupId assigned to the subscriber
+ * @param appName Registered Application name
+ * @return true if valid groupId and false otherwise
+ */
+ private boolean validGroupId(EventSubscriberGroupId groupId,
+ String appName) {
+
+ checkNotNull(groupId);
+
+ ApplicationId appId = coreService.getAppId(appName);
+ EventSubscriberGroupId registeredGroupId = registeredApps.get(appId);
+ if (registeredGroupId.equals(groupId)) {
+ return true;
+ }
+
+ return false;
+ }
+
+ @Override
+ public void unsubscribe(EventSubscriber subscriber)
+ throws InvalidGroupIdException, InvalidApplicationException {
+
+ checkNotNull(subscriber);
+
+ if (!registeredApplication(subscriber.appName())) {
+ throw new InvalidApplicationException("Application is not "
+ + "registered to make this request.");
+ }
+
+ if (!validGroupId(subscriber.subscriberGroupId(),
+ subscriber.appName())) {
+ throw new InvalidGroupIdException("Incorrect group id in the request");
+ }
+
+ if (!eventSubscribed(subscriber)) {
+ log.error("No subscription to {} was found",
+ subscriber.eventType());
+ return;
+ }
+
+ // If this is the only subscriber listening for this event,
+ // stop the listener.
+ List<EventSubscriber> subscribers =
+ subscriptions.get(subscriber.eventType());
+ if (subscribers.size() == 1) {
+ OnosEventListener onosListener =
+ getListener(subscriber.eventType());
+ checkNotNull(onosListener,
+ "No listener for the supported event type - {}",
+ subscriber.eventType());
+ applyListenerAction(subscriber.eventType(), onosListener,
+ ListenerAction.STOP);
+ }
+
+ // update internal state.
+ subscribers.remove(subscriber);
+ subscriptions.put(subscriber.eventType(), subscribers);
+
+ log.info("Unsubscribed {} for {} events", subscriber.appName(),
+ subscriber.eventType());
+ }
+
+ @Override
+ public List<EventSubscriber> getEventSubscribers(Type type) {
+ return subscriptions.getOrDefault(type, ImmutableList.of());
+
+ }
+
+ /**
+ * Checks if the subscriber has already subscribed to the requested event
+ * type.
+ *
+ * @param subscriber the subscriber to a specific ONOS event
+ * @return true if subscriber has subscribed to the ONOS event
+ */
+ private boolean eventSubscribed(EventSubscriber subscriber) {
+
+ List<EventSubscriber> subscriberList =
+ subscriptions.get(subscriber.eventType());
+
+ if (subscriberList == null) {
+ return false;
+ }
+
+ return subscriberList.contains(subscriber);
+ }
+
+}
diff --git a/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/impl/KafkaCodecRegistrator.java b/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/impl/KafkaCodecRegistrator.java
new file mode 100644
index 0000000..22705d5
--- /dev/null
+++ b/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/impl/KafkaCodecRegistrator.java
@@ -0,0 +1,47 @@
+/**
+ * Copyright 2016-present Open Networking Laboratory
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.kafkaintegration.impl;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.onosproject.codec.CodecService;
+import org.onosproject.kafkaintegration.api.dto.EventSubscriber;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Implementation of the JSON codec brokering service for Kafka app.
+ */
+@Component(immediate = true)
+public class KafkaCodecRegistrator {
+ private static Logger log = LoggerFactory.getLogger(KafkaCodecRegistrator
+ .class);
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected CodecService codecService;
+
+ @Activate
+ public void activate() {
+ codecService.registerCodec(EventSubscriber.class, new SubscriberCodec());
+ log.info("Started");
+ }
+
+ @Deactivate
+ public void deactivate() {
+ log.info("Stopped");
+ }
+}
diff --git a/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/impl/KafkaPublisherManager.java b/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/impl/KafkaPublisherManager.java
new file mode 100644
index 0000000..104619a
--- /dev/null
+++ b/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/impl/KafkaPublisherManager.java
@@ -0,0 +1,60 @@
+/**
+ * Copyright 2016 Open Networking Laboratory
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.kafkaintegration.impl;
+
+import org.onosproject.event.AbstractListenerManager;
+import org.onosproject.kafkaintegration.api.ExportableEventListener;
+import org.onosproject.kafkaintegration.api.KafkaPublisherService;
+import org.onosproject.kafkaintegration.api.dto.OnosEvent;
+import org.onosproject.kafkaintegration.api.dto.OnosEvent.Type;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.protobuf.GeneratedMessage;
+
+/**
+ * Dispatch ONOS Events to all interested Listeners.
+ *
+ */
+
+public final class KafkaPublisherManager
+ extends AbstractListenerManager<OnosEvent, ExportableEventListener> implements KafkaPublisherService {
+
+ private final Logger log = LoggerFactory.getLogger(getClass());
+
+ // Exists to defeat instantiation
+ private KafkaPublisherManager() {
+ }
+
+ //TODO: If possible, get rid of Singleton implementation.
+ private static class SingletonHolder {
+ private static final KafkaPublisherManager INSTANCE = new KafkaPublisherManager();
+ }
+
+ /**
+ * Returns a static reference to the Listener Factory.
+ *
+ * @return singleton object
+ */
+ public static KafkaPublisherManager getInstance() {
+ return SingletonHolder.INSTANCE;
+ }
+
+ @Override
+ public void publish(Type eventType, GeneratedMessage message) {
+ log.debug("Dispatching ONOS Event {}", eventType);
+ post(new OnosEvent(eventType, message));
+ }
+}
diff --git a/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/impl/KafkaStorageManager.java b/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/impl/KafkaStorageManager.java
new file mode 100644
index 0000000..64af4e8
--- /dev/null
+++ b/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/impl/KafkaStorageManager.java
@@ -0,0 +1,108 @@
+/**
+ * Copyright 2016 Open Networking Laboratory
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.kafkaintegration.impl;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.onosproject.kafkaintegration.api.KafkaEventStorageService;
+import org.onosproject.kafkaintegration.api.dto.OnosEvent;
+import org.onosproject.store.service.AtomicValue;
+import org.onosproject.store.service.StorageService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.TreeMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+@Component(immediate = true)
+public class KafkaStorageManager implements KafkaEventStorageService {
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected StorageService storageService;
+
+ private TreeMap<Long, OnosEvent> kafkaEventStore;
+
+ private AtomicValue<Long> lastPublishedEvent;
+
+ private final Logger log = LoggerFactory.getLogger(getClass());
+
+ private ScheduledExecutorService gcExService;
+
+ private InternalGarbageCollector gcTask;
+
+ // Thread scheduler parameters.
+ private final long delay = 0;
+ private final long period = 1;
+
+ @Activate
+ protected void activate() {
+ kafkaEventStore = new TreeMap<Long, OnosEvent>();
+ lastPublishedEvent = storageService.<Long>atomicValueBuilder()
+ .withName("onos-app-kafka-published-seqNumber").build()
+ .asAtomicValue();
+
+ startGC();
+
+ log.info("Started");
+ }
+
+ private void startGC() {
+ log.info("Starting Garbage Collection Service");
+ gcExService = Executors.newSingleThreadScheduledExecutor();
+ gcTask = new InternalGarbageCollector();
+ gcExService.scheduleAtFixedRate(gcTask, delay, period,
+ TimeUnit.SECONDS);
+ }
+
+ @Deactivate
+ protected void deactivate() {
+ stopGC();
+ log.info("Stopped");
+ }
+
+ private void stopGC() {
+ log.info("Stopping Garbage Collection Service");
+ gcExService.shutdown();
+ }
+
+ @Override
+ public boolean insertCacheEntry(OnosEvent e) {
+ // TODO: Fill in the code once the event carries timestamp info.
+ return true;
+ }
+
+ @Override
+ public void updateLastPublishedEntry(Long sequenceNumber) {
+ this.lastPublishedEvent.set(sequenceNumber);
+ }
+
+ /**
+ * Removes events from the Kafka Event Store which have been published.
+ *
+ */
+ private class InternalGarbageCollector implements Runnable {
+
+ @Override
+ public void run() {
+ kafkaEventStore.headMap(lastPublishedEvent.get(), true).clear();
+ }
+ }
+
+}
diff --git a/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/impl/SubscriberCodec.java b/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/impl/SubscriberCodec.java
new file mode 100644
index 0000000..3489f6f
--- /dev/null
+++ b/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/impl/SubscriberCodec.java
@@ -0,0 +1,64 @@
+/**
+ * Copyright 2016-present Open Networking Laboratory
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.kafkaintegration.impl;
+
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.onosproject.codec.CodecContext;
+import org.onosproject.codec.JsonCodec;
+import org.onosproject.kafkaintegration.api.dto.DefaultEventSubscriber;
+import org.onosproject.kafkaintegration.api.dto.EventSubscriber;
+import org.onosproject.kafkaintegration.api.dto.EventSubscriberGroupId;
+import org.onosproject.kafkaintegration.api.dto.OnosEvent.Type;
+
+import java.util.UUID;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * Codec for encoding/decoding a Subscriber object to/from JSON.
+ *
+ */
+public final class SubscriberCodec extends JsonCodec<EventSubscriber> {
+
+ // JSON field names
+ private static final String NAME = "appName";
+ private static final String GROUP_ID = "groupId";
+ private static final String EVENT_TYPE = "eventType";
+
+ @Override
+ public ObjectNode encode(EventSubscriber data, CodecContext context) {
+ checkNotNull(data, "Subscriber cannot be null");
+ return context.mapper().createObjectNode().put(NAME, data.appName())
+ .put(GROUP_ID, data.subscriberGroupId().getId().toString())
+ .put(EVENT_TYPE, data.eventType().toString());
+ }
+
+ @Override
+ public EventSubscriber decode(ObjectNode json, CodecContext context) {
+
+ EventSubscriber.Builder resultBuilder = new DefaultEventSubscriber
+ .Builder();
+ String appName = json.get(NAME).asText();
+ resultBuilder.setAppName(appName);
+
+ String subscriberGroupId = json.get(GROUP_ID).asText();
+ resultBuilder.setSubscriberGroupId(new EventSubscriberGroupId(UUID.
+ fromString(subscriberGroupId)));
+
+ String eventType = json.get(EVENT_TYPE).asText();
+ resultBuilder.setEventType(Type.valueOf(eventType));
+ return resultBuilder.build();
+ }
+}
\ No newline at end of file
diff --git a/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/impl/package-info.java b/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/impl/package-info.java
new file mode 100644
index 0000000..a2950fa
--- /dev/null
+++ b/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/impl/package-info.java
@@ -0,0 +1,18 @@
+/**
+ * Copyright 2016-present Open Networking Laboratory Licensed under the Apache
+ * License, Version 2.0 (the "License"); you may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+/**
+ * API implementation classes.
+ */
+package org.onosproject.kafkaintegration.impl;