olt installs default rules on startup and when a port shows up

Change-Id: I0db62db020f94500aeae7191f7681745e1268672
diff --git a/apps/olt/src/main/java/org/onosproject/olt/Olt.java b/apps/olt/src/main/java/org/onosproject/olt/Olt.java
index d3120dd..4b2f55b 100644
--- a/apps/olt/src/main/java/org/onosproject/olt/Olt.java
+++ b/apps/olt/src/main/java/org/onosproject/olt/Olt.java
@@ -23,12 +23,15 @@
 import org.apache.felix.scr.annotations.Reference;
 import org.apache.felix.scr.annotations.ReferenceCardinality;
 import org.apache.felix.scr.annotations.Service;
+import org.onlab.packet.EthType;
+import org.onlab.packet.IPv4;
 import org.onlab.packet.VlanId;
 import org.onosproject.core.ApplicationId;
 import org.onosproject.core.CoreService;
 import org.onosproject.event.AbstractListenerManager;
 import org.onosproject.net.ConnectPoint;
 import org.onosproject.net.DeviceId;
+import org.onosproject.net.Port;
 import org.onosproject.net.PortNumber;
 import org.onosproject.net.config.ConfigFactory;
 import org.onosproject.net.config.NetworkConfigEvent;
@@ -42,7 +45,10 @@
 import org.onosproject.net.flow.DefaultTrafficTreatment;
 import org.onosproject.net.flow.TrafficSelector;
 import org.onosproject.net.flow.TrafficTreatment;
+import org.onosproject.net.flow.criteria.Criteria;
+import org.onosproject.net.flowobjective.DefaultFilteringObjective;
 import org.onosproject.net.flowobjective.DefaultForwardingObjective;
+import org.onosproject.net.flowobjective.FilteringObjective;
 import org.onosproject.net.flowobjective.FlowObjectiveService;
 import org.onosproject.net.flowobjective.ForwardingObjective;
 import org.onosproject.net.flowobjective.Objective;
@@ -126,6 +132,7 @@
         networkConfig.registerConfigFactory(configFactory);
         networkConfig.addListener(configListener);
 
+
         networkConfig.getSubjects(DeviceId.class, AccessDeviceConfig.class).forEach(
                 subject -> {
                     AccessDeviceConfig config = networkConfig.getConfig(subject, AccessDeviceConfig.class);
@@ -136,6 +143,12 @@
                 }
         );
 
+        oltData.keySet().stream()
+                .flatMap(did -> deviceService.getPorts(did).stream())
+                .filter(p -> oltData.get(p.element().id()).uplink() != p.number())
+                .filter(p -> p.isEnabled())
+                .forEach(p -> installFilteringObjectives((DeviceId) p.element().id(), p));
+
         log.info("Started with Application ID {}", appId.id());
     }
 
@@ -195,17 +208,17 @@
 
         fwds.stream().forEach(
                 fwd -> flowObjectiveService.forward(deviceId,
-                                                 fwd.remove(new ObjectiveContext() {
-                                                     @Override
-                                                     public void onSuccess(Objective objective) {
-                                                         upFuture.complete(null);
-                                                     }
+                                                    fwd.remove(new ObjectiveContext() {
+                                                        @Override
+                                                        public void onSuccess(Objective objective) {
+                                                            upFuture.complete(null);
+                                                        }
 
-                                                     @Override
-                                                     public void onError(Objective objective, ObjectiveError error) {
-                                                         upFuture.complete(error);
-                                                     }
-                                                 })));
+                                                        @Override
+                                                        public void onError(Objective objective, ObjectiveError error) {
+                                                            upFuture.complete(error);
+                                                        }
+                                                    })));
 
         upFuture.thenAcceptBothAsync(downFuture, (upStatus, downStatus) -> {
             if (upStatus == null && downStatus == null) {
@@ -327,6 +340,57 @@
 
     }
 
+    private void installFilteringObjectives(DeviceId devId, Port port) {
+        FilteringObjective eapol = DefaultFilteringObjective.builder()
+                .permit()
+                .withKey(Criteria.matchInPort(port.number()))
+                .addCondition(Criteria.matchEthType(EthType.EtherType.EAPOL.ethType()))
+                .withMeta(DefaultTrafficTreatment.builder()
+                                  .setOutput(PortNumber.CONTROLLER).build())
+                .fromApp(appId)
+                .withPriority(1000)
+                .add(new ObjectiveContext() {
+                    @Override
+                    public void onSuccess(Objective objective) {
+                        log.info("Eapol filter for {} on {} installed.",
+                                 devId, port);
+                    }
+
+                    @Override
+                    public void onError(Objective objective, ObjectiveError error) {
+                        log.info("Eapol filter for {} on {} failed because {}",
+                                 devId, port, error);
+                    }
+                });
+
+
+        FilteringObjective igmp = DefaultFilteringObjective.builder()
+                .permit()
+                .withKey(Criteria.matchInPort(port.number()))
+                .addCondition(Criteria.matchEthType(EthType.EtherType.IPV4.ethType()))
+                .addCondition(Criteria.matchIPProtocol(IPv4.PROTOCOL_IGMP))
+                .withMeta(DefaultTrafficTreatment.builder()
+                                  .setOutput(PortNumber.CONTROLLER).build())
+                .fromApp(appId)
+                .withPriority(1000)
+                .add(new ObjectiveContext() {
+                    @Override
+                    public void onSuccess(Objective objective) {
+                        log.info("Igmp filter for {} on {} installed.",
+                                 devId, port);
+                    }
+
+                    @Override
+                    public void onError(Objective objective, ObjectiveError error) {
+                        log.info("Igmp filter for {} on {} failed because {}.",
+                                 devId, port, error);
+                    }
+                });
+
+        flowObjectiveService.filter(devId, eapol);
+        flowObjectiveService.filter(devId, igmp);
+    }
+
     private class InternalDeviceListener implements DeviceListener {
         @Override
         public void event(DeviceEvent event) {
@@ -336,7 +400,20 @@
                 return;
             }
             switch (event.type()) {
+                //TODO: Port handling and bookkeeping should be inproved once
+                // olt firmware handles correct behaviour.
                 case PORT_ADDED:
+                    if (event.port().isEnabled()) {
+                        installFilteringObjectives(devId, event.port());
+                    }
+                    break;
+                case PORT_REMOVED:
+                    AccessDeviceData olt = oltData.get(devId);
+                    unprovisionSubscriber(devId, olt.uplink(),
+                                          event.port().number(),
+                                          olt.vlan());
+                    installFilteringObjectives(devId, event.port());
+                    break;
                 case PORT_UPDATED:
                     break;
                 case DEVICE_ADDED:
@@ -352,7 +429,6 @@
                 case DEVICE_UPDATED:
                 case DEVICE_SUSPENDED:
                 case DEVICE_AVAILABILITY_CHANGED:
-                case PORT_REMOVED:
                 case PORT_STATS_UPDATED:
                 default:
                     return;
diff --git a/drivers/src/main/java/org/onosproject/driver/pipeline/OltPipeline.java b/drivers/src/main/java/org/onosproject/driver/pipeline/OltPipeline.java
index e4b50a1..8980733 100644
--- a/drivers/src/main/java/org/onosproject/driver/pipeline/OltPipeline.java
+++ b/drivers/src/main/java/org/onosproject/driver/pipeline/OltPipeline.java
@@ -120,7 +120,8 @@
                     .limit(1)
                     .findFirst().get();
 
-            if (output != null && !output.port().equals(PortNumber.CONTROLLER)) {
+            if (output == null || !output.port().equals(PortNumber.CONTROLLER)) {
+                log.error("OLT can only filter packet to controller");
                 fail(filter, ObjectiveError.UNSUPPORTED);
                 return;
             }
@@ -142,15 +143,19 @@
             return;
         }
 
-        if (ethType.ethType().equals(EthType.EtherType.EAPOL)) {
+        if (ethType.ethType().equals(EthType.EtherType.EAPOL.ethType())) {
             provisionEapol(filter, ethType, output);
-        } else if (ethType.ethType().equals(EthType.EtherType.IPV4)) {
+        } else if (ethType.ethType().equals(EthType.EtherType.IPV4.ethType())) {
             IPProtocolCriterion ipProto = (IPProtocolCriterion)
                     filterForCriterion(filter.conditions(), Criterion.Type.IP_PROTO);
             if (ipProto.protocol() == IPv4.PROTOCOL_IGMP) {
                 provisionIGMP(filter, ethType, ipProto, output);
+            } else {
+                log.error("OLT can only filter igmp");
+                fail(filter, ObjectiveError.UNSUPPORTED);
             }
         } else {
+            log.error("OLT can only filter eapol and igmp");
             fail(filter, ObjectiveError.UNSUPPORTED);
         }
 
@@ -389,6 +394,7 @@
                 .makePermanent()
                 .withSelector(selector)
                 .withTreatment(treatment)
+                .withPriority(filter.priority())
                 .build();
 
         FlowRuleOperations.Builder opsBuilder = FlowRuleOperations.builder();
@@ -446,7 +452,7 @@
 
     private Criterion filterForCriterion(Collection<Criterion> criteria, Criterion.Type type) {
         return criteria.stream()
-                .filter(c -> c.type().equals(Criterion.Type.ETH_TYPE))
+                .filter(c -> c.type().equals(type))
                 .limit(1)
                 .findFirst().orElse(null);
     }