Implementation of new Flow Subsystem:

The subsystem no longer returns futures for tracking completion of work.
Notifications are explicitely returned via a call back mechanism. Also, the
subsystem is now asynchronous.

Change-Id: I1a4cef931c24820f9ae9ed9a5398f163f05dfbc9

more flowservice improvements

Change-Id: I5c9c1b6be4b2ebfa523b64f6f52e7634b7d3e05f

more flowservice impl

Change-Id: I05f6774460effb53ced8c36844bcda2f8f6c096f

Manager to store functional (at least i believe it)

Change-Id: I09b04989bd1004c98fe0bafed4c76714b9155d53

flow subsystem functional: need to fix unit tests

Change-Id: I1667f25b91320f625a03e5e1d5e92823184d9de0

flow subsystem functional

Change-Id: I429b3335c16d4fc16f5d55f233dd37c4d1d6111d

finished refactor of flow subsystem

Change-Id: I1899abc6ff6a974a2018d936cc555049c70a6804

fix for null flow provider to use new api

Change-Id: If2fd9bd5baf74d9c61c5c8085cef8bc2d204cbdc
diff --git a/providers/openflow/flow/src/main/java/org/onosproject/provider/of/flow/impl/OpenFlowRuleProvider.java b/providers/openflow/flow/src/main/java/org/onosproject/provider/of/flow/impl/OpenFlowRuleProvider.java
index dd0bcaa..70d35aa 100644
--- a/providers/openflow/flow/src/main/java/org/onosproject/provider/of/flow/impl/OpenFlowRuleProvider.java
+++ b/providers/openflow/flow/src/main/java/org/onosproject/provider/of/flow/impl/OpenFlowRuleProvider.java
@@ -15,25 +15,13 @@
  */
 package org.onosproject.provider.of.flow.impl;
 
-import static com.google.common.base.Preconditions.checkState;
-import static org.slf4j.LoggerFactory.getLogger;
 
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.stream.Collectors;
-
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.RemovalCause;
+import com.google.common.cache.RemovalNotification;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
@@ -41,19 +29,16 @@
 import org.apache.felix.scr.annotations.ReferenceCardinality;
 import org.onosproject.core.ApplicationId;
 import org.onosproject.net.DeviceId;
-import org.onosproject.net.flow.BatchOperation;
 import org.onosproject.net.flow.CompletedBatchOperation;
-import org.onosproject.net.flow.DefaultFlowEntry;
 import org.onosproject.net.flow.FlowEntry;
 import org.onosproject.net.flow.FlowRule;
 import org.onosproject.net.flow.FlowRuleBatchEntry;
-import org.onosproject.net.flow.FlowRuleBatchEntry.FlowRuleOperation;
+import org.onosproject.net.flow.FlowRuleBatchOperation;
 import org.onosproject.net.flow.FlowRuleProvider;
 import org.onosproject.net.flow.FlowRuleProviderRegistry;
 import org.onosproject.net.flow.FlowRuleProviderService;
 import org.onosproject.net.provider.AbstractProvider;
 import org.onosproject.net.provider.ProviderId;
-import org.onosproject.net.topology.TopologyService;
 import org.onosproject.openflow.controller.Dpid;
 import org.onosproject.openflow.controller.OpenFlowController;
 import org.onosproject.openflow.controller.OpenFlowEventListener;
@@ -63,6 +48,7 @@
 import org.projectfloodlight.openflow.protocol.OFActionType;
 import org.projectfloodlight.openflow.protocol.OFBarrierRequest;
 import org.projectfloodlight.openflow.protocol.OFErrorMsg;
+import org.projectfloodlight.openflow.protocol.OFErrorType;
 import org.projectfloodlight.openflow.protocol.OFFlowMod;
 import org.projectfloodlight.openflow.protocol.OFFlowRemoved;
 import org.projectfloodlight.openflow.protocol.OFFlowStatsEntry;
@@ -75,21 +61,22 @@
 import org.projectfloodlight.openflow.protocol.OFVersion;
 import org.projectfloodlight.openflow.protocol.action.OFAction;
 import org.projectfloodlight.openflow.protocol.action.OFActionOutput;
-import org.projectfloodlight.openflow.protocol.errormsg.OFBadActionErrorMsg;
-import org.projectfloodlight.openflow.protocol.errormsg.OFBadInstructionErrorMsg;
-import org.projectfloodlight.openflow.protocol.errormsg.OFBadMatchErrorMsg;
-import org.projectfloodlight.openflow.protocol.errormsg.OFBadRequestErrorMsg;
 import org.projectfloodlight.openflow.protocol.errormsg.OFFlowModFailedErrorMsg;
 import org.projectfloodlight.openflow.protocol.instruction.OFInstruction;
 import org.projectfloodlight.openflow.protocol.instruction.OFInstructionApplyActions;
 import org.projectfloodlight.openflow.types.OFPort;
 import org.slf4j.Logger;
 
-import com.google.common.base.MoreObjects;
-import com.google.common.collect.ArrayListMultimap;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Multimap;
-import com.google.common.collect.Sets;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static org.slf4j.LoggerFactory.getLogger;
+
 
 /**
  * Provider which uses an OpenFlow controller to detect network
@@ -98,8 +85,6 @@
 @Component(immediate = true)
 public class OpenFlowRuleProvider extends AbstractProvider implements FlowRuleProvider {
 
-    enum BatchState { STARTED, FINISHED, CANCELLED }
-
     private static final int LOWEST_PRIORITY = 0;
 
     private final Logger log = getLogger(getClass());
@@ -110,22 +95,15 @@
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected OpenFlowController controller;
 
-    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
-    protected TopologyService topologyService;
 
     private FlowRuleProviderService providerService;
 
     private final InternalFlowProvider listener = new InternalFlowProvider();
 
-    // FIXME: This should be an expiring map to ensure futures that don't have
-    // a future eventually get garbage collected.
-    private final Map<Long, InstallationFuture> pendingFutures = new ConcurrentHashMap<>();
-
-    private final Map<Long, InstallationFuture> pendingFMs = new ConcurrentHashMap<>();
+    private Cache<Long, InternalCacheEntry> pendingBatches;
 
     private final Map<Dpid, FlowStatsCollector> collectors = Maps.newHashMap();
 
-    private final AtomicLong xidCounter = new AtomicLong(1);
 
     /**
      * Creates an OpenFlow host provider.
@@ -140,6 +118,16 @@
         controller.addListener(listener);
         controller.addEventListener(listener);
 
+        pendingBatches = CacheBuilder.newBuilder()
+                .expireAfterWrite(10, TimeUnit.SECONDS)
+                .removalListener((RemovalNotification<Long, InternalCacheEntry> notification) -> {
+                    if (notification.getCause() == RemovalCause.EXPIRED) {
+                        providerService.batchOperationCompleted(notification.getKey(),
+                                                                notification.getValue().failedCompletion());
+                    }
+                }).build();
+
+
         for (OpenFlowSwitch sw : controller.getSwitches()) {
             FlowStatsCollector fsc = new FlowStatsCollector(sw, POLL_INTERVAL);
             fsc.start();
@@ -160,8 +148,8 @@
 
     @Override
     public void applyFlowRule(FlowRule... flowRules) {
-        for (int i = 0; i < flowRules.length; i++) {
-            applyRule(flowRules[i]);
+        for (FlowRule flowRule : flowRules) {
+            applyRule(flowRule);
         }
     }
 
@@ -179,8 +167,8 @@
 
     @Override
     public void removeFlowRule(FlowRule... flowRules) {
-        for (int i = 0; i < flowRules.length; i++) {
-            removeRule(flowRules[i]);
+        for (FlowRule flowRule : flowRules) {
+            removeRule(flowRule);
         }
 
     }
@@ -203,36 +191,20 @@
     }
 
     @Override
-    public Future<CompletedBatchOperation> executeBatch(BatchOperation<FlowRuleBatchEntry> batch) {
-        final Set<Dpid> sws = Sets.newConcurrentHashSet();
-        final Map<Long, FlowRuleBatchEntry> fmXids = new HashMap<>();
 
-        /*
-         * Use identity hash map for reference equality as we could have equal
-         * flow mods for different switches.
-         */
-        Map<OFFlowMod, OpenFlowSwitch> mods = Maps.newIdentityHashMap();
-        Map<OFFlowMod, OpenFlowSwitch.TableType> modTypes = Maps.newIdentityHashMap();
+    public void executeBatch(FlowRuleBatchOperation batch) {
+
+        pendingBatches.put(batch.id(), new InternalCacheEntry(batch));
+
+
+        OpenFlowSwitch sw = controller.getSwitch(Dpid.dpid(batch.deviceId().uri()));
+        OFFlowMod mod;
+
         for (FlowRuleBatchEntry fbe : batch.getOperations()) {
-            FlowRule flowRule = fbe.target();
-            final Dpid dpid = Dpid.dpid(flowRule.deviceId().uri());
-            OpenFlowSwitch sw = controller.getSwitch(dpid);
-            if (sw == null) {
-                /*
-                 * if a switch we are supposed to install to is gone then
-                 * cancel (ie. rollback) the work that has been done so far
-                 * and return the associated future.
-                 */
-                InstallationFuture failed = new InstallationFuture(sws, fmXids);
-                failed.cancel(true);
-                return failed;
-            }
-            sws.add(dpid);
-            final Long flowModXid = xidCounter.getAndIncrement();
+
             FlowModBuilder builder =
-                    FlowModBuilder.builder(flowRule, sw.factory(),
-                                           Optional.of(flowModXid));
-            OFFlowMod mod = null;
+                    FlowModBuilder.builder(fbe.target(), sw.factory(),
+                                           Optional.of(batch.id()));
             switch (fbe.operator()) {
                 case ADD:
                     mod = builder.buildFlowAdd();
@@ -244,34 +216,16 @@
                     mod = builder.buildFlowMod();
                     break;
                 default:
-                    log.error("Unsupported batch operation {}", fbe.operator());
-            }
-            if (mod != null) {
-                mods.put(mod, sw);
-                modTypes.put(mod, getTableType(flowRule.type()));
-                fmXids.put(flowModXid, fbe);
-            } else {
-                log.error("Conversion of flowrule {} failed.", flowRule);
-            }
+                    log.error("Unsupported batch operation {}; skipping flowmod {}",
+                              fbe.operator(), fbe);
+                    continue;
+                }
+            sw.sendMsg(mod);
         }
-        InstallationFuture installation = new InstallationFuture(sws, fmXids);
-        for (Long xid : fmXids.keySet()) {
-            pendingFMs.put(xid, installation);
-        }
-
-        pendingFutures.put(installation.xid(), installation);
-        for (Map.Entry<OFFlowMod, OpenFlowSwitch> entry : mods.entrySet()) {
-            OpenFlowSwitch sw = entry.getValue();
-            OFFlowMod mod = entry.getKey();
-            OpenFlowSwitch.TableType tableType = modTypes.get(mod);
-            if (tableType == OpenFlowSwitch.TableType.NONE) {
-                sw.sendMsg(mod);
-            } else {
-                sw.sendMsg(mod, tableType);
-            }
-        }
-        installation.verify();
-        return installation;
+        OFBarrierRequest.Builder builder = sw.factory()
+                .buildBarrierRequest()
+                .setXid(batch.id());
+        sw.sendMsg(builder.build());
     }
 
     private OpenFlowSwitch.TableType getTableType(FlowRule.Type type) {
@@ -287,13 +241,12 @@
         }
     }
 
+
+
+
     private class InternalFlowProvider
             implements OpenFlowSwitchListener, OpenFlowEventListener {
 
-
-        private final Multimap<DeviceId, FlowEntry> completeEntries =
-                ArrayListMultimap.create();
-
         @Override
         public void switchAdded(Dpid dpid) {
             FlowStatsCollector fsc = new FlowStatsCollector(controller.getSwitch(dpid), POLL_INTERVAL);
@@ -320,7 +273,6 @@
 
         @Override
         public void handleMessage(Dpid dpid, OFMessage msg) {
-            InstallationFuture future = null;
             switch (msg.getType()) {
                 case FLOW_REMOVED:
                     OFFlowRemoved removed = (OFFlowRemoved) msg;
@@ -334,22 +286,42 @@
                     }
                     break;
                 case BARRIER_REPLY:
-                    future = pendingFutures.get(msg.getXid());
-                    if (future != null) {
-                        future.satisfyRequirement(dpid);
-                    } else {
-                        log.warn("Received unknown Barrier Reply: {}", msg.getXid());
+                    try {
+                        InternalCacheEntry entry = pendingBatches.getIfPresent(msg.getXid());
+                        if (entry != null) {
+                            providerService.batchOperationCompleted(msg.getXid(), entry.completed());
+                        } else {
+                            log.warn("Received unknown Barrier Reply: {}", msg.getXid());
+                        }
+                    } finally {
+                        pendingBatches.invalidate(msg.getXid());
                     }
                     break;
                 case ERROR:
                     log.warn("received Error message {} from {}", msg, dpid);
-                    future = pendingFMs.get(msg.getXid());
-                    if (future != null) {
-                        future.fail((OFErrorMsg) msg, dpid);
+
+                    OFErrorMsg error = (OFErrorMsg) msg;
+                    if (error.getErrType() == OFErrorType.FLOW_MOD_FAILED) {
+                        OFFlowModFailedErrorMsg fmFailed = (OFFlowModFailedErrorMsg) error;
+                        if (fmFailed.getData().getParsedMessage().isPresent()) {
+                            OFMessage m = fmFailed.getData().getParsedMessage().get();
+                            OFFlowMod fm = (OFFlowMod) m;
+                            InternalCacheEntry entry = pendingBatches.getIfPresent(msg.getXid());
+                            if (entry != null) {
+                                entry.appendFailure(new FlowEntryBuilder(dpid, fm).build());
+                            } else {
+                                log.error("No matching batch for this error: {}", error);
+                            }
+                        } else {
+                            //FIXME: Potentially add flowtracking to avoid this message.
+                            log.error("Flow installation failed but switch didn't" +
+                                              " tell us which one.");
+                        }
                     } else {
-                        log.warn("Received unknown Error Reply: {} {}", msg.getXid(), msg);
+                        log.warn("Received error {}", error);
                     }
-                    break;
+
+
                 default:
                     log.debug("Unhandled message type: {}", msg.getType());
             }
@@ -402,198 +374,50 @@
 
     }
 
-    private class InstallationFuture implements Future<CompletedBatchOperation> {
+    /**
+     * The internal cache entry holding the original request as well
+     * as accumulating the any failures along the way.
+     *
+     * If this entry is evicted from the cache then the entire operation
+     * is considered failed. Otherwise, only the failures reported by the device
+     * will be propagated up.
+     */
+    private class InternalCacheEntry {
 
-        // barrier xid
-        private final Long xid;
-        // waiting for barrier reply from...
-        private final Set<Dpid> sws;
-        private final AtomicBoolean ok = new AtomicBoolean(true);
-        // FlowMod xid ->
-        private final Map<Long, FlowRuleBatchEntry> fms;
+        private final FlowRuleBatchOperation operation;
+        private final Set<FlowRule> failures = Sets.newConcurrentHashSet();
 
-
-        private final Set<FlowEntry> offendingFlowMods = Sets.newHashSet();
-        // Failed batch operation id
-        private Long failedId;
-
-        private final CountDownLatch countDownLatch;
-        private BatchState state;
-
-        public InstallationFuture(Set<Dpid> sws, Map<Long, FlowRuleBatchEntry> fmXids) {
-            this.xid = xidCounter.getAndIncrement();
-            this.state = BatchState.STARTED;
-            this.sws = sws;
-            this.fms = fmXids;
-            countDownLatch = new CountDownLatch(sws.size());
+        public InternalCacheEntry(FlowRuleBatchOperation operation) {
+            this.operation = operation;
         }
 
-        public Long xid() {
-            return xid;
+        /**
+         * Appends a failed rule to the set of failed items.
+         * @param rule the failed rule
+         */
+        public void appendFailure(FlowRule rule) {
+            failures.add(rule);
         }
 
-        public void fail(OFErrorMsg msg, Dpid dpid) {
-
-            ok.set(false);
-            FlowEntry fe = null;
-            FlowRuleBatchEntry fbe = fms.get(msg.getXid());
-            failedId = fbe.id();
-            FlowRule offending = fbe.target();
-            //TODO handle specific error msgs
-            switch (msg.getErrType()) {
-                case BAD_ACTION:
-                    OFBadActionErrorMsg bad = (OFBadActionErrorMsg) msg;
-                    fe = new DefaultFlowEntry(offending, bad.getErrType().ordinal(),
-                                              bad.getCode().ordinal());
-                    break;
-                case BAD_INSTRUCTION:
-                    OFBadInstructionErrorMsg badins = (OFBadInstructionErrorMsg) msg;
-                    fe = new DefaultFlowEntry(offending, badins.getErrType().ordinal(),
-                                              badins.getCode().ordinal());
-                    break;
-                case BAD_MATCH:
-                    OFBadMatchErrorMsg badMatch = (OFBadMatchErrorMsg) msg;
-                    fe = new DefaultFlowEntry(offending, badMatch.getErrType().ordinal(),
-                                              badMatch.getCode().ordinal());
-                    break;
-                case BAD_REQUEST:
-                    OFBadRequestErrorMsg badReq = (OFBadRequestErrorMsg) msg;
-                    fe = new DefaultFlowEntry(offending, badReq.getErrType().ordinal(),
-                                              badReq.getCode().ordinal());
-                    break;
-                case FLOW_MOD_FAILED:
-                    OFFlowModFailedErrorMsg fmFail = (OFFlowModFailedErrorMsg) msg;
-                    fe = new DefaultFlowEntry(offending, fmFail.getErrType().ordinal(),
-                                              fmFail.getCode().ordinal());
-                    break;
-                case EXPERIMENTER:
-                case GROUP_MOD_FAILED:
-                case HELLO_FAILED:
-                case METER_MOD_FAILED:
-                case PORT_MOD_FAILED:
-                case QUEUE_OP_FAILED:
-                case ROLE_REQUEST_FAILED:
-                case SWITCH_CONFIG_FAILED:
-                case TABLE_FEATURES_FAILED:
-                case TABLE_MOD_FAILED:
-                    fe = new DefaultFlowEntry(offending, msg.getErrType().ordinal(), 0);
-                    break;
-                default:
-                    log.error("Unknown error type {}", msg.getErrType());
-
-            }
-            offendingFlowMods.add(fe);
-
-            removeRequirement(dpid);
+        /**
+         * Fails the entire batch and returns the failed operation.
+         * @return the failed operation
+         */
+        public CompletedBatchOperation failedCompletion() {
+            Set<FlowRule> fails = operation.getOperations().stream()
+                    .map(op -> op.target()).collect(Collectors.toSet());
+            return new CompletedBatchOperation(false, Collections.unmodifiableSet(fails), operation.deviceId());
         }
 
-
-        public void satisfyRequirement(Dpid dpid) {
-            log.debug("Satisfaction from switch {}", dpid);
-            removeRequirement(dpid);
+        /**
+         * Returns the completed operation and whether the batch suceeded.
+         * @return the completed operation
+         */
+        public CompletedBatchOperation completed() {
+            return new CompletedBatchOperation(failures.isEmpty(),
+                                               Collections.unmodifiableSet(failures), operation.deviceId());
         }
 
-
-        public void verify() {
-            checkState(!sws.isEmpty());
-            for (Dpid dpid : sws) {
-                OpenFlowSwitch sw = controller.getSwitch(dpid);
-                OFBarrierRequest.Builder builder = sw.factory()
-                        .buildBarrierRequest()
-                        .setXid(xid);
-                sw.sendMsg(builder.build());
-            }
-        }
-
-        @Override
-        public boolean cancel(boolean mayInterruptIfRunning) {
-            if (isDone()) {
-                return false;
-            }
-            ok.set(false);
-            this.state = BatchState.CANCELLED;
-            cleanUp();
-            for (FlowRuleBatchEntry fbe : fms.values()) {
-                if (fbe.operator() == FlowRuleOperation.ADD ||
-                        fbe.operator() == FlowRuleOperation.MODIFY) {
-                    removeFlowRule(fbe.target());
-                } else if (fbe.operator() == FlowRuleOperation.REMOVE) {
-                    applyRule(fbe.target());
-                }
-
-            }
-            return true;
-        }
-
-        @Override
-        public boolean isCancelled() {
-            return this.state == BatchState.CANCELLED;
-        }
-
-        @Override
-        public boolean isDone() {
-            return this.state == BatchState.FINISHED || isCancelled();
-        }
-
-        @Override
-        public CompletedBatchOperation get() throws InterruptedException, ExecutionException {
-            countDownLatch.await();
-            this.state = BatchState.FINISHED;
-            Set<Long> failedIds = (failedId != null) ?  Sets.newHashSet(failedId) : Collections.emptySet();
-            CompletedBatchOperation result =
-                    new CompletedBatchOperation(ok.get(), offendingFlowMods, failedIds);
-            //FIXME do cleanup here (moved by BOC)
-            cleanUp();
-            return result;
-        }
-
-        @Override
-        public CompletedBatchOperation get(long timeout, TimeUnit unit)
-                throws InterruptedException, ExecutionException,
-                TimeoutException {
-            if (countDownLatch.await(timeout, unit)) {
-                this.state = BatchState.FINISHED;
-                Set<Long> failedIds = (failedId != null) ?  Sets.newHashSet(failedId) : Collections.emptySet();
-                CompletedBatchOperation result =
-                        new CompletedBatchOperation(ok.get(), offendingFlowMods, failedIds);
-                // FIXME do cleanup here (moved by BOC)
-                cleanUp();
-                return result;
-            }
-            throw new TimeoutException(this.toString());
-        }
-
-        private void cleanUp() {
-            if (isDone() || isCancelled()) {
-                pendingFutures.remove(xid);
-                for (Long xid : fms.keySet()) {
-                    pendingFMs.remove(xid);
-                }
-            }
-        }
-
-        private void removeRequirement(Dpid dpid) {
-            countDownLatch.countDown();
-            sws.remove(dpid);
-            //FIXME don't do cleanup here (moved by BOC)
-            //cleanUp();
-        }
-
-        @Override
-        public String toString() {
-            return MoreObjects.toStringHelper(getClass())
-                    .add("xid", xid)
-                    .add("pending devices", sws)
-                    .add("devices in batch",
-                         fms.values().stream()
-                             .map((fbe) -> fbe.target().deviceId())
-                             .distinct().collect(Collectors.toList()))
-                    .add("failedId", failedId)
-                    .add("latchCount", countDownLatch.getCount())
-                    .add("state", state)
-                    .add("no error?", ok.get())
-                    .toString();
-        }
     }
 
 }