Kafka Integration Application (Fix Javadoc errors + review comments)

1. Refactored the application into two java projects api and app as per convention
2. Deleted the onos-app-gpb project. The proto files are consolidated in the
   incubator-protobuf project as per suggestions.
3. Some code to translate ONOS Event pojo messages to GPB format.
4. Implementation of Subscribe and Unsubscribe APIs.
5. Minor changes due to review comments from 9212 and 9053
6. Refactored the proto fileso that its a 1:1 mapping between the core type to proto message.

Change-Id: I2bcc0de96150f838ccfe9e49293fe61d94062628
diff --git a/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/impl/EventExporterManager.java b/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/impl/EventExporterManager.java
new file mode 100644
index 0000000..6e1a0e8
--- /dev/null
+++ b/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/impl/EventExporterManager.java
@@ -0,0 +1,323 @@
+/*
+ * Copyright 2016-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.kafkaintegration.impl;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.onosproject.kafkaintegration.api.dto.OnosEvent.Type.DEVICE;
+import static org.onosproject.kafkaintegration.api.dto.OnosEvent.Type.LINK;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.apache.felix.scr.annotations.Service;
+import org.onosproject.core.ApplicationId;
+import org.onosproject.core.CoreService;
+import org.onosproject.kafkaintegration.api.EventExporterService;
+import org.onosproject.kafkaintegration.api.dto.EventSubscriber;
+import org.onosproject.kafkaintegration.api.dto.EventSubscriberGroupId;
+import org.onosproject.kafkaintegration.api.dto.OnosEvent.Type;
+import org.onosproject.kafkaintegration.errors.InvalidApplicationException;
+import org.onosproject.kafkaintegration.errors.InvalidGroupIdException;
+import org.onosproject.kafkaintegration.listener.ListenerFactory;
+import org.onosproject.kafkaintegration.listener.OnosEventListener;
+import org.onosproject.net.device.DeviceService;
+import org.onosproject.net.link.LinkService;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.StorageService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Implementation of Event Exporter Service.
+ *
+ */
+@Component(immediate = true)
+@Service
+public class EventExporterManager implements EventExporterService {
+
+    private final Logger log = LoggerFactory.getLogger(getClass());
+
+    // Stores the currently registered applications for event export service.
+    private Map<ApplicationId, EventSubscriberGroupId> registeredApps;
+
+    private Map<Type, List<EventSubscriber>> subscriptions;
+
+    private static final String REGISTERED_APPS = "registered-applications";
+
+    private static final String SUBSCRIBED_APPS = "event-subscriptions";
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected DeviceService deviceService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected LinkService linkService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected CoreService coreService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected StorageService storageService;
+
+    private ApplicationId appId;
+
+    @Activate
+    protected void activate() {
+        appId = coreService
+                .registerApplication("org.onosproject.kafkaintegration");
+
+        registeredApps = storageService
+                .<ApplicationId, EventSubscriberGroupId>consistentMapBuilder()
+                .withName(REGISTERED_APPS)
+                .withSerializer(Serializer.using(KryoNamespaces.API,
+                                                 EventSubscriberGroupId.class,
+                                                 UUID.class))
+                .build().asJavaMap();
+
+        subscriptions = storageService
+                .<Type, List<EventSubscriber>>consistentMapBuilder()
+                .withName(SUBSCRIBED_APPS)
+                .withSerializer(Serializer.using(KryoNamespaces.API,
+                                                 EventSubscriber.class))
+                .build().asJavaMap();
+
+        log.info("Started");
+    }
+
+    @Deactivate
+    protected void deactivate() {
+        log.info("Stopped");
+    }
+
+    @Override
+    public EventSubscriberGroupId registerListener(String appName) {
+
+        // TODO: Remove it once ONOS provides a mechanism for external apps
+        // to register with the core service. See Jira - 4409
+        ApplicationId externalAppId = coreService.registerApplication(appName);
+
+        return registeredApps.computeIfAbsent(externalAppId,
+                                              (key) -> new EventSubscriberGroupId(UUID
+                                                      .randomUUID()));
+
+    }
+
+    @Override
+    public void unregisterListener(String appName) {
+        ApplicationId externalAppId =
+                checkNotNull(coreService.getAppId(appName));
+        registeredApps.remove(externalAppId);
+    }
+
+    @Override
+    public void subscribe(EventSubscriber subscriber)
+            throws InvalidGroupIdException, InvalidApplicationException {
+
+        checkNotNull(subscriber);
+
+        if (!registeredApplication(subscriber.appName())) {
+            throw new InvalidApplicationException("Application is not "
+                    + "registered to make this request.");
+        }
+
+        if (!validGroupId(subscriber.subscriberGroupId(),
+                          subscriber.appName())) {
+            throw new InvalidGroupIdException("Incorrect group id in the request");
+        }
+
+        OnosEventListener onosListener = getListener(subscriber.eventType());
+        checkNotNull(onosListener,
+                     "No listener for the supported event type - {}",
+                     subscriber.eventType());
+
+        applyListenerAction(subscriber.eventType(), onosListener,
+                            ListenerAction.START);
+
+        // update internal state
+        List<EventSubscriber> subscriptionList =
+                subscriptions.get(subscriber.eventType());
+        if (subscriptionList == null) {
+            subscriptionList = new ArrayList<EventSubscriber>();
+        }
+        subscriptionList.add(subscriber);
+        subscriptions.put(subscriber.eventType(), subscriptionList);
+
+        log.info("Subscription for {} event by {} successfull",
+                 subscriber.eventType(), subscriber.appName());
+    }
+
+    /**
+     * Checks if the application has registered.
+     *
+     * @param appName application name
+     * @return true if application has registered
+     */
+    private boolean registeredApplication(String appName) {
+
+        checkNotNull(appName);
+        ApplicationId appId = checkNotNull(coreService.getAppId(appName));
+        if (registeredApps.containsKey(appId)) {
+            return true;
+        }
+
+        log.debug("{} is not registered", appName);
+        return false;
+    }
+
+    /**
+     * Actions that can be performed on the ONOS Event Listeners.
+     *
+     */
+    private enum ListenerAction {
+        START, STOP;
+    }
+
+    /**
+     * Applies the specified action on the Listener.
+     *
+     * @param eventType the ONOS Event type registered by the application
+     * @param onosListener ONOS event listener
+     * @param action to be performed on the listener
+     */
+    private void applyListenerAction(Type eventType,
+                                     OnosEventListener onosListener,
+                                     ListenerAction action) {
+        switch (eventType) {
+        case DEVICE:
+            if (action == ListenerAction.START) {
+                onosListener.startListener(DEVICE, deviceService);
+            } else {
+                onosListener.stopListener(DEVICE, deviceService);
+            }
+            break;
+        case LINK:
+            if (action == ListenerAction.START) {
+                onosListener.startListener(LINK, linkService);
+            } else {
+                onosListener.stopListener(LINK, linkService);
+            }
+            break;
+        default:
+            log.error("Cannot {} listener. Unsupported event type {} ",
+                      action.toString(), eventType.toString());
+        }
+    }
+
+    /**
+     * Returns the ONOS event listener corresponding to the ONOS Event type.
+     *
+     * @param eventType ONOS event type
+     * @return ONOS event listener
+     */
+    private OnosEventListener getListener(Type eventType) {
+        checkNotNull(eventType);
+        ListenerFactory factory = ListenerFactory.getInstance();
+        OnosEventListener onosListener = factory.getListener(eventType);
+        return onosListener;
+    }
+
+    /**
+     * Checks if the group id is valid for this registered application.
+     *
+     * @param groupId GroupId assigned to the subscriber
+     * @param appName Registered Application name
+     * @return true if valid groupId and false otherwise
+     */
+    private boolean validGroupId(EventSubscriberGroupId groupId,
+                                 String appName) {
+
+        checkNotNull(groupId);
+
+        ApplicationId appId = coreService.getAppId(appName);
+        EventSubscriberGroupId registeredGroupId = registeredApps.get(appId);
+        if (registeredGroupId.equals(groupId)) {
+            return true;
+        }
+
+        return false;
+    }
+
+    @Override
+    public void unsubscribe(EventSubscriber subscriber)
+            throws InvalidGroupIdException, InvalidApplicationException {
+
+        checkNotNull(subscriber);
+
+        if (!registeredApplication(subscriber.appName())) {
+            throw new InvalidApplicationException("Application is not "
+                    + "registered to make this request.");
+        }
+
+        if (!validGroupId(subscriber.subscriberGroupId(),
+                          subscriber.appName())) {
+            throw new InvalidGroupIdException("Incorrect group id in the request");
+        }
+
+        if (!eventSubscribed(subscriber)) {
+            log.error("No subscription to {} was found",
+                      subscriber.eventType());
+            return;
+        }
+
+        // If this is the only subscriber listening for this event,
+        // stop the listener.
+        List<EventSubscriber> subscribers =
+                subscriptions.get(subscriber.eventType());
+        if (subscribers.size() == 1) {
+            OnosEventListener onosListener =
+                    getListener(subscriber.eventType());
+            checkNotNull(onosListener,
+                         "No listener for the supported event type - {}",
+                         subscriber.eventType());
+            applyListenerAction(subscriber.eventType(), onosListener,
+                                ListenerAction.STOP);
+        }
+
+        // update internal state.
+        subscribers.remove(subscriber);
+        subscriptions.put(subscriber.eventType(), subscribers);
+
+        log.info("Unsubscribed {} for {} events", subscriber.appName(),
+                 subscriber.eventType());
+    }
+
+    /**
+     * Checks if the subscriber has already subscribed to the requested event
+     * type.
+     *
+     * @param subscriber the subscriber to a specific ONOS event
+     * @return true if subscriber has subscribed to the ONOS event
+     */
+    private boolean eventSubscribed(EventSubscriber subscriber) {
+
+        List<EventSubscriber> subscriberList =
+                subscriptions.get(subscriber.eventType());
+
+        if (subscriberList == null) {
+            return false;
+        }
+
+        return subscriberList.contains(subscriber);
+    }
+
+}