[ONOS-6986] Implement getGroups in GroupProgrammable

Change-Id: I9f25bddb6a8baad74e8e74abd44187a9c3f6520a
diff --git a/core/net/src/main/java/org/onosproject/net/group/impl/GroupDriverProvider.java b/core/net/src/main/java/org/onosproject/net/group/impl/GroupDriverProvider.java
index 414d9e2..6eea583 100644
--- a/core/net/src/main/java/org/onosproject/net/group/impl/GroupDriverProvider.java
+++ b/core/net/src/main/java/org/onosproject/net/group/impl/GroupDriverProvider.java
@@ -16,17 +16,34 @@
 
 package org.onosproject.net.group.impl;
 
+import com.google.common.collect.Sets;
+import org.onosproject.mastership.MastershipService;
 import org.onosproject.net.Device;
 import org.onosproject.net.DeviceId;
+import org.onosproject.net.device.DeviceEvent;
+import org.onosproject.net.device.DeviceListener;
 import org.onosproject.net.device.DeviceService;
+import org.onosproject.net.group.Group;
 import org.onosproject.net.group.GroupOperations;
 import org.onosproject.net.group.GroupProgrammable;
 import org.onosproject.net.group.GroupProvider;
+import org.onosproject.net.group.GroupProviderService;
 import org.onosproject.net.provider.AbstractProvider;
 import org.onosproject.net.provider.ProviderId;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Collection;
+import java.util.Set;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
+import static org.onlab.util.Tools.groupedThreads;
+import static org.onosproject.net.device.DeviceEvent.Type.DEVICE_ADDED;
+import static org.onosproject.net.device.DeviceEvent.Type.DEVICE_AVAILABILITY_CHANGED;
+
 /**
  * Driver-based Group rule provider.
  */
@@ -37,19 +54,49 @@
     // To be extracted for reuse as we deal with other.
     private static final String SCHEME = "default";
     private static final String PROVIDER_NAME = "org.onosproject.provider";
+
+    // potentially positive device event
+    private static final Set<DeviceEvent.Type> POSITIVE_DEVICE_EVENT =
+            Sets.immutableEnumSet(DEVICE_ADDED,
+                    DEVICE_AVAILABILITY_CHANGED);
+
     protected DeviceService deviceService;
+    protected GroupProviderService groupProviderService;
+    protected MastershipService mastershipService;
+
+    private InternalDeviceListener deviceListener = new InternalDeviceListener();
+    private ScheduledExecutorService executor
+            = newSingleThreadScheduledExecutor(groupedThreads("GroupDriverProvider", "%d", log));
+    private ScheduledFuture<?> poller = null;
 
     public GroupDriverProvider() {
         super(new ProviderId(SCHEME, PROVIDER_NAME));
     }
 
     /**
-     * Initializes the provider with the necessary device service.
+     * Initializes the provider with the necessary device service, group provider service,
+     * mastership service and poll frequency.
      *
-     * @param deviceService device service
+     * @param deviceService        device service
+     * @param groupProviderService group provider service
+     * @param mastershipService    mastership service
+     * @param pollFrequency        group entry poll frequency
      */
-    void init(DeviceService deviceService) {
+    void init(DeviceService deviceService, GroupProviderService groupProviderService,
+              MastershipService mastershipService, int pollFrequency) {
         this.deviceService = deviceService;
+        this.groupProviderService = groupProviderService;
+        this.mastershipService = mastershipService;
+
+        deviceService.addListener(deviceListener);
+
+        if (poller != null && !poller.isCancelled()) {
+            poller.cancel(false);
+        }
+
+        poller = executor.scheduleAtFixedRate(this::pollGroups, pollFrequency,
+                pollFrequency, TimeUnit.SECONDS);
+
     }
 
     @Override
@@ -60,6 +107,20 @@
         }
     }
 
+    private void pollGroups() {
+        deviceService.getAvailableDevices().forEach(device -> {
+            if (mastershipService.isLocalMaster(device.id()) &&
+                    device.is(GroupProgrammable.class)) {
+                pollDeviceGroups(device.id());
+            }
+        });
+    }
+
+    private void pollDeviceGroups(DeviceId deviceId) {
+        Collection<Group> groups = getGroupProgrammable(deviceId).getGroups();
+        groupProviderService.pushGroupMetrics(deviceId, groups);
+    }
+
     private GroupProgrammable getGroupProgrammable(DeviceId deviceId) {
         Device device = deviceService.getDevice(deviceId);
         if (device.is(GroupProgrammable.class)) {
@@ -69,4 +130,29 @@
             return null;
         }
     }
+
+    private class InternalDeviceListener implements DeviceListener {
+
+        @Override
+        public void event(DeviceEvent event) {
+            executor.execute(() -> handleEvent(event));
+        }
+
+        @Override
+        public boolean isRelevant(DeviceEvent event) {
+            Device device = event.subject();
+            return POSITIVE_DEVICE_EVENT.contains(event.type()) &&
+                    device.is(GroupProgrammable.class);
+        }
+
+        private void handleEvent(DeviceEvent event) {
+            Device device = event.subject();
+            boolean isRelevant = mastershipService.isLocalMaster(device.id()) &&
+                    deviceService.isAvailable(device.id());
+
+            if (isRelevant) {
+                pollDeviceGroups(device.id());
+            }
+        }
+    }
 }
diff --git a/core/net/src/main/java/org/onosproject/net/group/impl/GroupManager.java b/core/net/src/main/java/org/onosproject/net/group/impl/GroupManager.java
index 31e95a4..bc3c64d 100644
--- a/core/net/src/main/java/org/onosproject/net/group/impl/GroupManager.java
+++ b/core/net/src/main/java/org/onosproject/net/group/impl/GroupManager.java
@@ -26,6 +26,7 @@
 import org.onlab.util.Tools;
 import org.onosproject.cfg.ComponentConfigService;
 import org.onosproject.core.ApplicationId;
+import org.onosproject.mastership.MastershipService;
 import org.onosproject.net.DeviceId;
 import org.onosproject.net.device.DeviceEvent;
 import org.onosproject.net.device.DeviceListener;
@@ -54,6 +55,8 @@
 import java.util.Collections;
 import java.util.Dictionary;
 
+import static com.google.common.base.Strings.isNullOrEmpty;
+import static org.onlab.util.Tools.get;
 import static org.onosproject.security.AppGuard.checkPermission;
 import static org.onosproject.security.AppPermission.Type.GROUP_READ;
 import static org.onosproject.security.AppPermission.Type.GROUP_WRITE;
@@ -76,6 +79,8 @@
     private final GroupStoreDelegate delegate = new InternalGroupStoreDelegate();
     private final DeviceListener deviceListener = new InternalDeviceListener();
 
+    private final  GroupDriverProvider defaultProvider = new GroupDriverProvider();
+
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected GroupStore store;
 
@@ -85,10 +90,18 @@
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected ComponentConfigService cfgService;
 
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected MastershipService mastershipService;
+
+    private static final int DEFAULT_POLL_FREQUENCY = 30;
+    @Property(name = "fallbackGroupPollFrequency", intValue = DEFAULT_POLL_FREQUENCY,
+            label = "Frequency (in seconds) for polling groups via fallback provider")
+    private int fallbackGroupPollFrequency = DEFAULT_POLL_FREQUENCY;
+
     @Property(name = "purgeOnDisconnection", boolValue = false,
             label = "Purge entries associated with a device when the device goes offline")
     private boolean purgeOnDisconnection = false;
-    private final  GroupDriverProvider defaultProvider = new GroupDriverProvider();
+
 
     @Activate
     public void activate(ComponentContext context) {
@@ -114,7 +127,8 @@
         if (context != null) {
             readComponentConfiguration(context);
         }
-        defaultProvider.init(deviceService);
+        defaultProvider.init(deviceService, new InternalGroupProviderService(defaultProvider),
+                mastershipService, fallbackGroupPollFrequency);
     }
 
     @Override
@@ -140,6 +154,12 @@
             log.info("Configured. PurgeOnDisconnection is {}",
                     purgeOnDisconnection ? "enabled" : "disabled");
         }
+        String s = get(properties, "fallbackGroupPollFrequency");
+        try {
+            fallbackGroupPollFrequency = isNullOrEmpty(s) ? DEFAULT_POLL_FREQUENCY : Integer.parseInt(s);
+        } catch (NumberFormatException e) {
+            fallbackGroupPollFrequency = DEFAULT_POLL_FREQUENCY;
+        }
     }
 
     /**