Fix for [ONOS-5824]

Changes:
- Redesigns DefaultSingleTablePipeline;
- Changes the timeout of test to reduce the false-negatives;

Change-Id: I15fa20fd8a15908e70bc22de7913367b9ef113c7
diff --git a/cli/src/main/java/org/onosproject/cli/net/IntentPushTestCommand.java b/cli/src/main/java/org/onosproject/cli/net/IntentPushTestCommand.java
index 2782b76..d6405f0 100644
--- a/cli/src/main/java/org/onosproject/cli/net/IntentPushTestCommand.java
+++ b/cli/src/main/java/org/onosproject/cli/net/IntentPushTestCommand.java
@@ -165,7 +165,9 @@
         }
 
         try {
-            if (latch.await(1000 + count * 30, TimeUnit.MILLISECONDS)) {
+            // In this way with the tests in place the timeout will be
+            // 61 seconds.
+            if (latch.await(1000 + count * 60, TimeUnit.MILLISECONDS)) {
                 printResults(count);
             } else {
                 print("Failure: %d intents not installed", latch.getCount());
diff --git a/drivers/default/src/main/java/org/onosproject/driver/pipeline/DefaultSingleTablePipeline.java b/drivers/default/src/main/java/org/onosproject/driver/pipeline/DefaultSingleTablePipeline.java
index a004fde..426f36c 100644
--- a/drivers/default/src/main/java/org/onosproject/driver/pipeline/DefaultSingleTablePipeline.java
+++ b/drivers/default/src/main/java/org/onosproject/driver/pipeline/DefaultSingleTablePipeline.java
@@ -41,13 +41,14 @@
 import org.onosproject.net.flowobjective.NextObjective;
 import org.onosproject.net.flowobjective.Objective;
 import org.onosproject.net.flowobjective.ObjectiveError;
-import org.onosproject.net.group.DefaultGroupKey;
-import org.onosproject.net.group.GroupKey;
+import org.onosproject.store.serializers.KryoNamespaces;
 import org.slf4j.Logger;
 
+import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 
+import static org.onosproject.net.flowobjective.Objective.Operation.ADD;
 import static org.slf4j.LoggerFactory.getLogger;
 
 /**
@@ -62,15 +63,26 @@
     private FlowObjectiveStore flowObjectiveStore;
     private DeviceId deviceId;
 
-    private Cache<Integer, NextObjective> pendingNext;
-
     private KryoNamespace appKryo = new KryoNamespace.Builder()
-            .register(GroupKey.class)
-            .register(DefaultGroupKey.class)
+            .register(KryoNamespaces.API)
             .register(SingleGroup.class)
-            .register(byte[].class)
             .build("DefaultSingleTablePipeline");
 
+    // Fast path for the installation. If we don't find the nextobjective in
+    // the cache, as fallback mechanism we will try to retrieve the treatments
+    // from the store. It is safe to use this cache for the addition, while it
+    // should be avoided for the removal. This cache from Guava does not offer
+    // thread-safe read (the read does not take the lock).
+    private Cache<Integer, NextObjective> pendingAddNext = CacheBuilder.newBuilder()
+            .expireAfterWrite(20, TimeUnit.SECONDS)
+            .removalListener((RemovalNotification<Integer, NextObjective> notification) -> {
+                if (notification.getCause() == RemovalCause.EXPIRED) {
+                    notification.getValue().context()
+                            .ifPresent(c -> c.onError(notification.getValue(),
+                                                      ObjectiveError.FLOWINSTALLATIONFAILED));
+                }
+            }).build();
+
 
     @Override
     public void init(DeviceId deviceId, PipelinerContext context) {
@@ -80,15 +92,6 @@
         flowRuleService = serviceDirectory.get(FlowRuleService.class);
         flowObjectiveStore = serviceDirectory.get(FlowObjectiveStore.class);
 
-        pendingNext = CacheBuilder.newBuilder()
-                .expireAfterWrite(20, TimeUnit.SECONDS)
-                .removalListener((RemovalNotification<Integer, NextObjective> notification) -> {
-                    if (notification.getCause() == RemovalCause.EXPIRED) {
-                        notification.getValue().context()
-                                .ifPresent(c -> c.onError(notification.getValue(),
-                                        ObjectiveError.FLOWINSTALLATIONFAILED));
-                    }
-                }).build();
     }
 
     @Override
@@ -140,7 +143,6 @@
     @Override
     public void forward(ForwardingObjective fwd) {
         TrafficSelector selector = fwd.selector();
-
         if (fwd.treatment() != null) {
             // Deal with SPECIFIC and VERSATILE in the same manner.
             FlowRule.Builder ruleBuilder = DefaultFlowRule.builder()
@@ -158,28 +160,56 @@
             installObjective(ruleBuilder, fwd);
 
         } else {
-            NextObjective nextObjective = pendingNext.getIfPresent(fwd.nextId());
-            if (nextObjective != null) {
-                pendingNext.invalidate(fwd.nextId());
-                nextObjective.next().forEach(treat -> {
-                    FlowRule.Builder ruleBuilder = DefaultFlowRule.builder()
-                            .forDevice(deviceId)
-                            .withSelector(selector)
-                            .fromApp(fwd.appId())
-                            .withPriority(fwd.priority())
-                            .withTreatment(treat);
-
-                    if (fwd.permanent()) {
-                        ruleBuilder.makePermanent();
-                    } else {
-                        ruleBuilder.makeTemporary(fwd.timeout());
+            NextObjective nextObjective;
+            NextGroup next;
+            TrafficTreatment treatment;
+            if (fwd.op() == ADD) {
+                // Give a try to the cache. Doing an operation
+                // on the store seems to be very expensive.
+                nextObjective = pendingAddNext.getIfPresent(fwd.nextId());
+                // If the next objective is not present
+                // We will try with the store
+                if (nextObjective == null) {
+                    next = flowObjectiveStore.getNextGroup(fwd.nextId());
+                    // We verify that next was in the store and then de-serialize
+                    // the treatment in order to re-build the flow rule.
+                    if (next == null) {
+                        fwd.context().ifPresent(c -> c.onError(fwd, ObjectiveError.GROUPMISSING));
+                        return;
                     }
-                    installObjective(ruleBuilder, fwd);
-                });
+                    treatment = appKryo.deserialize(next.data());
+                } else {
+                    pendingAddNext.invalidate(fwd.nextId());
+                    treatment = nextObjective.next().iterator().next();
+                }
             } else {
-                fwd.context().ifPresent(c -> c.onError(fwd,
-                        ObjectiveError.GROUPMISSING));
+                // We get the NextGroup from the remove operation.
+                // Doing an operation on the store seems to be very expensive.
+                next = flowObjectiveStore.removeNextGroup(fwd.nextId());
+                if (next == null) {
+                    fwd.context().ifPresent(c -> c.onError(fwd, ObjectiveError.GROUPMISSING));
+                    return;
+                }
+                treatment = appKryo.deserialize(next.data());
             }
+            // If the treatment is null we cannot re-build the original flow
+            if (treatment == null)  {
+                fwd.context().ifPresent(c -> c.onError(fwd, ObjectiveError.GROUPMISSING));
+                return;
+            }
+            // Finally we build the flow rule and push to the flowrule subsystem.
+            FlowRule.Builder ruleBuilder = DefaultFlowRule.builder()
+                    .forDevice(deviceId)
+                    .withSelector(selector)
+                    .fromApp(fwd.appId())
+                    .withPriority(fwd.priority())
+                    .withTreatment(treatment);
+            if (fwd.permanent()) {
+                ruleBuilder.makePermanent();
+            } else {
+                ruleBuilder.makeTemporary(fwd.timeout());
+            }
+            installObjective(ruleBuilder, fwd);
         }
     }
 
@@ -213,34 +243,45 @@
 
     @Override
     public void next(NextObjective nextObjective) {
-
-        pendingNext.put(nextObjective.id(), nextObjective);
-        flowObjectiveStore.putNextGroup(nextObjective.id(),
-                new SingleGroup(new DefaultGroupKey(appKryo.serialize(nextObjective.id()))));
+        switch (nextObjective.op()) {
+            case ADD:
+                // We insert the value in the cache
+                pendingAddNext.put(nextObjective.id(), nextObjective);
+                // Then in the store, this will unblock the queued fwd obj
+                flowObjectiveStore.putNextGroup(
+                        nextObjective.id(),
+                        new SingleGroup(nextObjective.next().iterator().next())
+                );
+                break;
+            case REMOVE:
+                break;
+            default:
+                log.warn("Unsupported operation {}", nextObjective.op());
+        }
         nextObjective.context().ifPresent(context -> context.onSuccess(nextObjective));
     }
 
     @Override
     public List<String> getNextMappings(NextGroup nextGroup) {
         // Default single table pipeline does not use nextObjectives or groups
-        return null;
+        return Collections.emptyList();
     }
 
     private class SingleGroup implements NextGroup {
 
-        private final GroupKey key;
+        private TrafficTreatment nextActions;
 
-        public SingleGroup(GroupKey key) {
-            this.key = key;
-        }
-
-        public GroupKey key() {
-            return key;
+        SingleGroup(TrafficTreatment next) {
+            this.nextActions = next;
         }
 
         @Override
         public byte[] data() {
-            return appKryo.serialize(key);
+            return appKryo.serialize(nextActions);
+        }
+
+        public TrafficTreatment treatment() {
+            return nextActions;
         }
 
     }