OLT ability to remove a subscriber

Change-Id: I5fee9dd8189ae374bf39b0a74da5bd33304a3346
diff --git a/apps/olt/src/main/java/org/onosproject/olt/Olt.java b/apps/olt/src/main/java/org/onosproject/olt/Olt.java
index fdf82a0..d3120dd 100644
--- a/apps/olt/src/main/java/org/onosproject/olt/Olt.java
+++ b/apps/olt/src/main/java/org/onosproject/olt/Olt.java
@@ -15,6 +15,8 @@
  */
 package org.onosproject.olt;
 
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
@@ -52,6 +54,7 @@
 
 import java.util.Map;
 import java.util.Optional;
+import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
@@ -89,11 +92,16 @@
     private static final VlanId DEFAULT_VLAN = VlanId.vlanId((short) 0);
 
     private ExecutorService oltInstallers = Executors.newFixedThreadPool(4,
-                                                                groupedThreads("onos/olt-service",
-                                                                               "olt-installer-%d"));
+                                                                         groupedThreads("onos/olt-service",
+                                                                                        "olt-installer-%d"));
 
     private Map<DeviceId, AccessDeviceData> oltData = new ConcurrentHashMap<>();
 
+    private Map<ConnectPoint, Set<ForwardingObjective.Builder>> objectives =
+            Maps.newConcurrentMap();
+
+    private Map<ConnectPoint, VlanId> subscribers = Maps.newConcurrentMap();
+
     private InternalNetworkConfigListener configListener =
             new InternalNetworkConfigListener();
     private static final Class<AccessDeviceConfig> CONFIG_CLASS =
@@ -102,11 +110,12 @@
     private ConfigFactory<DeviceId, AccessDeviceConfig> configFactory =
             new ConfigFactory<DeviceId, AccessDeviceConfig>(
                     SubjectFactories.DEVICE_SUBJECT_FACTORY, CONFIG_CLASS, "accessDevice") {
-        @Override
-        public AccessDeviceConfig createConfig() {
-            return new AccessDeviceConfig();
-        }
-    };
+                @Override
+                public AccessDeviceConfig createConfig() {
+                    return new AccessDeviceConfig();
+                }
+            };
+
 
     @Activate
     public void activate() {
@@ -152,7 +161,68 @@
 
     @Override
     public void removeSubscriber(ConnectPoint port) {
-        throw new UnsupportedOperationException();
+        AccessDeviceData olt = oltData.get(port.deviceId());
+
+        if (olt == null) {
+            log.warn("No data found for OLT device {}", port.deviceId());
+            return;
+        }
+
+        unprovisionSubscriber(olt.deviceId(), olt.uplink(), port.port(), olt.vlan());
+
+    }
+
+    private void unprovisionSubscriber(DeviceId deviceId, PortNumber uplink,
+                                       PortNumber subscriberPort, VlanId deviceVlan) {
+
+        //FIXME: This method is slightly ugly but it'll do until we have a better
+        // way to remove flows from the flow store.
+
+        CompletableFuture<ObjectiveError> downFuture = new CompletableFuture();
+        CompletableFuture<ObjectiveError> upFuture = new CompletableFuture();
+
+        ConnectPoint cp = new ConnectPoint(deviceId, subscriberPort);
+
+        VlanId subscriberVlan = subscribers.remove(cp);
+
+        Set<ForwardingObjective.Builder> fwds = objectives.remove(cp);
+
+        if (fwds == null || fwds.size() != 2) {
+            log.warn("Unknown or incomplete subscriber at {}", cp);
+            return;
+        }
+
+
+        fwds.stream().forEach(
+                fwd -> flowObjectiveService.forward(deviceId,
+                                                 fwd.remove(new ObjectiveContext() {
+                                                     @Override
+                                                     public void onSuccess(Objective objective) {
+                                                         upFuture.complete(null);
+                                                     }
+
+                                                     @Override
+                                                     public void onError(Objective objective, ObjectiveError error) {
+                                                         upFuture.complete(error);
+                                                     }
+                                                 })));
+
+        upFuture.thenAcceptBothAsync(downFuture, (upStatus, downStatus) -> {
+            if (upStatus == null && downStatus == null) {
+                post(new AccessDeviceEvent(AccessDeviceEvent.Type.SUBSCRIBER_UNREGISTERED,
+                                           deviceId,
+                                           deviceVlan,
+                                           subscriberVlan));
+            } else if (downStatus != null) {
+                log.error("Subscriber with vlan {} on device {} " +
+                                  "on port {} failed downstream uninstallation: {}",
+                          subscriberVlan, deviceId, subscriberPort, downStatus);
+            } else if (upStatus != null) {
+                log.error("Subscriber with vlan {} on device {} " +
+                                  "on port {} failed upstream uninstallation: {}",
+                          subscriberVlan, deviceId, subscriberPort, upStatus);
+            }
+        }, oltInstallers);
 
     }
 
@@ -190,46 +260,53 @@
                 .build();
 
 
-        ForwardingObjective upFwd = DefaultForwardingObjective.builder()
+        ForwardingObjective.Builder upFwd = DefaultForwardingObjective.builder()
                 .withFlag(ForwardingObjective.Flag.VERSATILE)
                 .withPriority(1000)
                 .makePermanent()
                 .withSelector(upstream)
                 .fromApp(appId)
-                .withTreatment(upstreamTreatment)
-                .add(new ObjectiveContext() {
-                    @Override
-                    public void onSuccess(Objective objective) {
-                        upFuture.complete(null);
-                    }
+                .withTreatment(upstreamTreatment);
 
-                    @Override
-                    public void onError(Objective objective, ObjectiveError error) {
-                        upFuture.complete(error);
-                    }
-                });
 
-        ForwardingObjective downFwd = DefaultForwardingObjective.builder()
+        ForwardingObjective.Builder downFwd = DefaultForwardingObjective.builder()
                 .withFlag(ForwardingObjective.Flag.VERSATILE)
                 .withPriority(1000)
                 .makePermanent()
                 .withSelector(downstream)
                 .fromApp(appId)
-                .withTreatment(downstreamTreatment)
-                .add(new ObjectiveContext() {
-                    @Override
-                    public void onSuccess(Objective objective) {
-                        downFuture.complete(null);
-                    }
+                .withTreatment(downstreamTreatment);
 
-                    @Override
-                    public void onError(Objective objective, ObjectiveError error) {
-                        downFuture.complete(error);
-                    }
-                });
+        ConnectPoint cp = new ConnectPoint(deviceId, subscriberPort);
 
-        flowObjectiveService.forward(deviceId, upFwd);
-        flowObjectiveService.forward(deviceId, downFwd);
+        subscribers.put(cp, subscriberVlan);
+        objectives.put(cp, Sets.newHashSet(upFwd, downFwd));
+
+
+        flowObjectiveService.forward(deviceId, upFwd.add(new ObjectiveContext() {
+            @Override
+            public void onSuccess(Objective objective) {
+                upFuture.complete(null);
+            }
+
+            @Override
+            public void onError(Objective objective, ObjectiveError error) {
+                upFuture.complete(error);
+            }
+        }));
+
+
+        flowObjectiveService.forward(deviceId, downFwd.add(new ObjectiveContext() {
+            @Override
+            public void onSuccess(Objective objective) {
+                downFuture.complete(null);
+            }
+
+            @Override
+            public void onError(Objective objective, ObjectiveError error) {
+                downFuture.complete(error);
+            }
+        }));
 
         upFuture.thenAcceptBothAsync(downFuture, (upStatus, downStatus) -> {
             if (upStatus == null && downStatus == null) {
@@ -288,20 +365,20 @@
         public void event(NetworkConfigEvent event) {
             switch (event.type()) {
 
-            case CONFIG_ADDED:
-            case CONFIG_UPDATED:
-                if (event.configClass().equals(CONFIG_CLASS)) {
-                    AccessDeviceConfig config =
-                            networkConfig.getConfig((DeviceId) event.subject(), CONFIG_CLASS);
-                    if (config != null) {
-                        oltData.put(config.getOlt().deviceId(), config.getOlt());
+                case CONFIG_ADDED:
+                case CONFIG_UPDATED:
+                    if (event.configClass().equals(CONFIG_CLASS)) {
+                        AccessDeviceConfig config =
+                                networkConfig.getConfig((DeviceId) event.subject(), CONFIG_CLASS);
+                        if (config != null) {
+                            oltData.put(config.getOlt().deviceId(), config.getOlt());
+                        }
                     }
-                }
-                break;
-            case CONFIG_UNREGISTERED:
-            case CONFIG_REMOVED:
-            default:
-                break;
+                    break;
+                case CONFIG_UNREGISTERED:
+                case CONFIG_REMOVED:
+                default:
+                    break;
             }
         }
     }