Implementation of new Flow Subsystem:

The subsystem no longer returns futures for tracking completion of work.
Notifications are explicitely returned via a call back mechanism. Also, the
subsystem is now asynchronous.

Change-Id: I1a4cef931c24820f9ae9ed9a5398f163f05dfbc9

more flowservice improvements

Change-Id: I5c9c1b6be4b2ebfa523b64f6f52e7634b7d3e05f

more flowservice impl

Change-Id: I05f6774460effb53ced8c36844bcda2f8f6c096f

Manager to store functional (at least i believe it)

Change-Id: I09b04989bd1004c98fe0bafed4c76714b9155d53

flow subsystem functional: need to fix unit tests

Change-Id: I1667f25b91320f625a03e5e1d5e92823184d9de0

flow subsystem functional

Change-Id: I429b3335c16d4fc16f5d55f233dd37c4d1d6111d

finished refactor of flow subsystem

Change-Id: I1899abc6ff6a974a2018d936cc555049c70a6804

fix for null flow provider to use new api

Change-Id: If2fd9bd5baf74d9c61c5c8085cef8bc2d204cbdc
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DistributedFlowRuleStore.java b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DistributedFlowRuleStore.java
index 09aa401..3d66386 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DistributedFlowRuleStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DistributedFlowRuleStore.java
@@ -15,33 +15,15 @@
  */
 package org.onosproject.store.flow.impl;
 
-import static com.google.common.base.Preconditions.checkNotNull;
-import static org.onlab.util.Tools.namedThreads;
-import static org.onosproject.net.flow.FlowRuleEvent.Type.RULE_REMOVED;
-import static org.onosproject.store.flow.impl.FlowStoreMessageSubjects.APPLY_BATCH_FLOWS;
-import static org.onosproject.store.flow.impl.FlowStoreMessageSubjects.GET_DEVICE_FLOW_ENTRIES;
-import static org.onosproject.store.flow.impl.FlowStoreMessageSubjects.GET_FLOW_ENTRY;
-import static org.onosproject.store.flow.impl.FlowStoreMessageSubjects.REMOVE_FLOW_ENTRY;
-import static org.slf4j.LoggerFactory.getLogger;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.hazelcast.core.IMap;
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
@@ -49,8 +31,11 @@
 import org.apache.felix.scr.annotations.ReferenceCardinality;
 import org.apache.felix.scr.annotations.Service;
 import org.onlab.util.KryoNamespace;
+import org.onlab.util.NewConcurrentHashMap;
 import org.onosproject.cluster.ClusterService;
 import org.onosproject.cluster.NodeId;
+import org.onosproject.core.CoreService;
+import org.onosproject.core.IdGenerator;
 import org.onosproject.net.Device;
 import org.onosproject.net.DeviceId;
 import org.onosproject.net.device.DeviceService;
@@ -67,6 +52,7 @@
 import org.onosproject.net.flow.FlowRuleBatchRequest;
 import org.onosproject.net.flow.FlowRuleEvent;
 import org.onosproject.net.flow.FlowRuleEvent.Type;
+import org.onosproject.net.flow.FlowRuleService;
 import org.onosproject.net.flow.FlowRuleStore;
 import org.onosproject.net.flow.FlowRuleStoreDelegate;
 import org.onosproject.net.flow.StoredFlowEntry;
@@ -79,27 +65,37 @@
 import org.onosproject.store.flow.ReplicaInfoService;
 import org.onosproject.store.hz.AbstractHazelcastStore;
 import org.onosproject.store.hz.SMap;
-import org.onosproject.store.serializers.DecodeTo;
 import org.onosproject.store.serializers.KryoSerializer;
 import org.onosproject.store.serializers.StoreSerializer;
 import org.onosproject.store.serializers.impl.DistributedStoreSerializers;
 import org.slf4j.Logger;
 
-import com.google.common.cache.Cache;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.CacheLoader;
-import com.google.common.cache.LoadingCache;
-import com.google.common.cache.RemovalListener;
-import com.google.common.cache.RemovalNotification;
-import com.google.common.collect.ArrayListMultimap;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Multimap;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.SettableFuture;
-import com.hazelcast.core.IMap;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.commons.lang3.concurrent.ConcurrentUtils.createIfAbsentUnchecked;
+import static org.onlab.util.Tools.namedThreads;
+import static org.onosproject.net.flow.FlowRuleEvent.Type.RULE_REMOVED;
+import static org.onosproject.store.flow.impl.FlowStoreMessageSubjects.*;
+import static org.slf4j.LoggerFactory.getLogger;
 
 /**
  * Manages inventory of flow rules using a distributed state management protocol.
@@ -112,12 +108,10 @@
 
     private final Logger log = getLogger(getClass());
 
-    // primary data:
-    //  read/write needs to be locked
-    private final ReentrantReadWriteLock flowEntriesLock = new ReentrantReadWriteLock();
-    // store entries as a pile of rules, no info about device tables
-    private final Multimap<DeviceId, StoredFlowEntry> flowEntries
-        = ArrayListMultimap.<DeviceId, StoredFlowEntry>create();
+    private InternalFlowTable flowTable = new InternalFlowTable();
+
+    /*private final ConcurrentMap<DeviceId, ConcurrentMap<FlowId, Set<StoredFlowEntry>>>
+            flowEntries = new ConcurrentHashMap<>();*/
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected ReplicaInfoService replicaInfoManager;
@@ -131,23 +125,15 @@
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected DeviceService deviceService;
 
-    private final AtomicInteger localBatchIdGen = new AtomicInteger();
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected CoreService coreService;
 
-    private int pendingFutureTimeoutMinutes = 5;
-
-    private Cache<Integer, SettableFuture<CompletedBatchOperation>> pendingFutures =
-            CacheBuilder.newBuilder()
-                .expireAfterWrite(pendingFutureTimeoutMinutes, TimeUnit.MINUTES)
-                .removalListener(new TimeoutFuture())
-                .build();
+    private Map<Long, NodeId> pendingResponses = Maps.newConcurrentMap();
 
     // Cache of SMaps used for backup data.  each SMap contain device flow table
     private LoadingCache<DeviceId, SMap<FlowId, ImmutableList<StoredFlowEntry>>> smaps;
 
 
-    private final ExecutorService futureListeners =
-            Executors.newCachedThreadPool(namedThreads("onos-flowstore-peer-responders"));
-
     private final ExecutorService backupExecutors =
             Executors.newSingleThreadExecutor(namedThreads("onos-async-backups"));
 
@@ -169,6 +155,8 @@
 
     private ReplicaInfoEventListener replicaInfoEventListener;
 
+    private IdGenerator idGenerator;
+
     @Override
     @Activate
     public void activate() {
@@ -176,22 +164,33 @@
         super.serializer = SERIALIZER;
         super.theInstance = storeService.getHazelcastInstance();
 
+        idGenerator = coreService.getIdGenerator(FlowRuleService.FLOW_OP_TOPIC);
+
         // Cache to create SMap on demand
         smaps = CacheBuilder.newBuilder()
-                    .softValues()
-                    .build(new SMapLoader());
+                .softValues()
+                .build(new SMapLoader());
 
         final NodeId local = clusterService.getLocalNode().id();
 
         clusterCommunicator.addSubscriber(APPLY_BATCH_FLOWS, new OnStoreBatch(local));
 
+        clusterCommunicator.addSubscriber(REMOTE_APPLY_COMPLETED, new ClusterMessageHandler() {
+            @Override
+            public void handle(ClusterMessage message) {
+                FlowRuleBatchEvent event = SERIALIZER.decode(message.payload());
+                log.trace("received completed notification for {}", event);
+                notifyDelegate(event);
+            }
+        });
+
         clusterCommunicator.addSubscriber(GET_FLOW_ENTRY, new ClusterMessageHandler() {
 
             @Override
             public void handle(ClusterMessage message) {
                 FlowRule rule = SERIALIZER.decode(message.payload());
                 log.trace("received get flow entry request for {}", rule);
-                FlowEntry flowEntry = getFlowEntryInternal(rule);
+                FlowEntry flowEntry = flowTable.getFlowEntry(rule); //getFlowEntryInternal(rule);
                 try {
                     message.respond(SERIALIZER.encode(flowEntry));
                 } catch (IOException e) {
@@ -206,7 +205,7 @@
             public void handle(ClusterMessage message) {
                 DeviceId deviceId = SERIALIZER.decode(message.payload());
                 log.trace("Received get flow entries request for {} from {}", deviceId, message.sender());
-                Set<FlowEntry> flowEntries = getFlowEntriesInternal(deviceId);
+                Set<FlowEntry> flowEntries = flowTable.getFlowEntries(deviceId);
                 try {
                     message.respond(SERIALIZER.encode(flowEntries));
                 } catch (IOException e) {
@@ -272,11 +271,11 @@
         }
 
         if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) {
-            return getFlowEntryInternal(rule);
+            return flowTable.getFlowEntry(rule);
         }
 
         log.trace("Forwarding getFlowEntry to {}, which is the primary (master) for device {}",
-                replicaInfo.master().orNull(), rule.deviceId());
+                  replicaInfo.master().orNull(), rule.deviceId());
 
         ClusterMessage message = new ClusterMessage(
                 clusterService.getLocalNode().id(),
@@ -292,19 +291,7 @@
         return null;
     }
 
-    private StoredFlowEntry getFlowEntryInternal(FlowRule rule) {
-        flowEntriesLock.readLock().lock();
-        try {
-            for (StoredFlowEntry f : flowEntries.get(rule.deviceId())) {
-                if (f.equals(rule)) {
-                    return f;
-                }
-            }
-        } finally {
-            flowEntriesLock.readLock().unlock();
-        }
-        return null;
-    }
+
 
     @Override
     public Iterable<FlowEntry> getFlowEntries(DeviceId deviceId) {
@@ -317,11 +304,11 @@
         }
 
         if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) {
-            return getFlowEntriesInternal(deviceId);
+            return flowTable.getFlowEntries(deviceId);
         }
 
         log.trace("Forwarding getFlowEntries to {}, which is the primary (master) for device {}",
-                replicaInfo.master().orNull(), deviceId);
+                  replicaInfo.master().orNull(), deviceId);
 
         ClusterMessage message = new ClusterMessage(
                 clusterService.getLocalNode().id(),
@@ -337,30 +324,26 @@
         return Collections.emptyList();
     }
 
-    private Set<FlowEntry> getFlowEntriesInternal(DeviceId deviceId) {
-        flowEntriesLock.readLock().lock();
-        try {
-            Collection<? extends FlowEntry> rules = flowEntries.get(deviceId);
-            if (rules == null) {
-                return Collections.emptySet();
-            }
-            return ImmutableSet.copyOf(rules);
-        } finally {
-            flowEntriesLock.readLock().unlock();
-        }
-    }
+
 
     @Override
     public void storeFlowRule(FlowRule rule) {
-        storeBatch(new FlowRuleBatchOperation(Arrays.asList(new FlowRuleBatchEntry(FlowRuleOperation.ADD, rule))));
+        storeBatch(new FlowRuleBatchOperation(
+                Arrays.asList(new FlowRuleBatchEntry(FlowRuleOperation.ADD, rule)),
+                rule.deviceId(), idGenerator.getNewId()));
     }
 
     @Override
-    public Future<CompletedBatchOperation> storeBatch(FlowRuleBatchOperation operation) {
+    public void storeBatch(FlowRuleBatchOperation operation) {
+
 
         if (operation.getOperations().isEmpty()) {
-            return Futures.immediateFuture(new CompletedBatchOperation(true,
-                                                                       Collections.<FlowRule>emptySet()));
+
+            notifyDelegate(FlowRuleBatchEvent.completed(
+                    new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
+                    new CompletedBatchOperation(true, Collections.emptySet(),
+                                                operation.deviceId())));
+            return;
         }
 
         DeviceId deviceId = operation.getOperations().get(0).target().deviceId();
@@ -369,110 +352,129 @@
 
         if (!replicaInfo.master().isPresent()) {
             log.warn("Failed to storeBatch: No master for {}", deviceId);
-            // TODO: revisit if this should be "success" from Future point of view
-            // with every FlowEntry failed
-            return Futures.immediateFailedFuture(new IOException("Failed to storeBatch: No master for " + deviceId));
+
+            Set<FlowRule> allFailures = operation.getOperations().stream()
+                    .map(op -> op.getTarget())
+                    .collect(Collectors.toSet());
+
+            notifyDelegate(FlowRuleBatchEvent.completed(
+                    new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
+                    new CompletedBatchOperation(false, allFailures, operation.deviceId())));
+            return;
         }
 
         final NodeId local = clusterService.getLocalNode().id();
         if (replicaInfo.master().get().equals(local)) {
-            return storeBatchInternal(operation);
+            storeBatchInternal(operation);
+            return;
         }
 
         log.trace("Forwarding storeBatch to {}, which is the primary (master) for device {}",
-                replicaInfo.master().orNull(), deviceId);
+                  replicaInfo.master().orNull(), deviceId);
 
         ClusterMessage message = new ClusterMessage(
                 local,
                 APPLY_BATCH_FLOWS,
                 SERIALIZER.encode(operation));
 
+        //CompletedBatchOperation response;
         try {
             ListenableFuture<byte[]> responseFuture =
                     clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
-            return Futures.transform(responseFuture, new DecodeTo<CompletedBatchOperation>(SERIALIZER));
-        } catch (IOException e) {
-            return Futures.immediateFailedFuture(e);
+            /*response =
+                    Futures.transform(responseFuture,
+                                      new DecodeTo<CompletedBatchOperation>(SERIALIZER))
+                            .get(500 * operation.size(), TimeUnit.MILLISECONDS);
+
+            notifyDelegate(FlowRuleBatchEvent.completed(
+                    new FlowRuleBatchRequest(operation.id(), Collections.emptySet()), response));*/
+
+        } catch (IOException /*| InterruptedException | ExecutionException | TimeoutException*/ e) {
+            log.warn("Failed to storeBatch: {}", e.getMessage());
+
+            Set<FlowRule> allFailures = operation.getOperations().stream()
+                    .map(op -> op.getTarget())
+                    .collect(Collectors.toSet());
+
+            notifyDelegate(FlowRuleBatchEvent.completed(
+                    new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
+                    new CompletedBatchOperation(false, allFailures, deviceId)));
+            return;
         }
+
     }
 
-    private ListenableFuture<CompletedBatchOperation>
-                        storeBatchInternal(FlowRuleBatchOperation operation) {
+    private void storeBatchInternal(FlowRuleBatchOperation operation) {
 
-        final List<FlowRuleBatchEntry> toRemove = new ArrayList<>();
-        final List<FlowRuleBatchEntry> toAdd = new ArrayList<>();
-        DeviceId did = null;
+        final DeviceId did = operation.deviceId();
+        //final Collection<FlowEntry> ft = flowTable.getFlowEntries(did);
+        Set<FlowRuleBatchEntry> currentOps;
 
 
-        flowEntriesLock.writeLock().lock();
-        try {
-            for (FlowRuleBatchEntry batchEntry : operation.getOperations()) {
-                FlowRule flowRule = batchEntry.target();
-                FlowRuleOperation op = batchEntry.operator();
-                if (did == null) {
-                    did = flowRule.deviceId();
-                }
-                if (op.equals(FlowRuleOperation.REMOVE)) {
-                    StoredFlowEntry entry = getFlowEntryInternal(flowRule);
-                    if (entry != null) {
-                        entry.setState(FlowEntryState.PENDING_REMOVE);
-                        toRemove.add(batchEntry);
+        currentOps = operation.getOperations().stream().map(
+                op -> {
+                    StoredFlowEntry entry;
+                    switch (op.getOperator()) {
+                        case ADD:
+                            entry = new DefaultFlowEntry(op.getTarget());
+                            // always add requested FlowRule

+                            // Note: 2 equal FlowEntry may have different treatment
+                            flowTable.remove(entry.deviceId(), entry);
+                            flowTable.add(entry);
+
+                            return op;
+                        case REMOVE:
+                            entry = flowTable.getFlowEntry(op.target());
+                            if (entry != null) {
+                                entry.setState(FlowEntryState.PENDING_REMOVE);
+                                return op;
+                            }
+                            break;
+                        case MODIFY:
+                            //TODO: figure this out at some point
+                            break;
+                        default:
+                            log.warn("Unknown flow operation operator: {}", op.getOperator());
                     }
-                } else if (op.equals(FlowRuleOperation.ADD)) {
-                    StoredFlowEntry flowEntry = new DefaultFlowEntry(flowRule);
-                    DeviceId deviceId = flowRule.deviceId();
-                    Collection<StoredFlowEntry> ft = flowEntries.get(deviceId);
-
-                    // always add requested FlowRule
-                    // Note: 2 equal FlowEntry may have different treatment
-                    ft.remove(flowEntry);
-                    ft.add(flowEntry);
-                    toAdd.add(batchEntry);
+                    return null;
                 }
-            }
-            if (toAdd.isEmpty() && toRemove.isEmpty()) {
-                return Futures.immediateFuture(new CompletedBatchOperation(true, Collections.<FlowRule>emptySet()));
-            }
-
-            // create remote backup copies
-            updateBackup(did, toAdd, toRemove);
-        } finally {
-            flowEntriesLock.writeLock().unlock();
+        ).filter(op -> op != null).collect(Collectors.toSet());
+        if (currentOps.isEmpty()) {
+            batchOperationComplete(FlowRuleBatchEvent.completed(
+                    new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
+                    new CompletedBatchOperation(true, Collections.emptySet(), did)));
+            return;
         }
+        updateBackup(did, currentOps);
 
-        SettableFuture<CompletedBatchOperation> r = SettableFuture.create();
-        final int batchId = localBatchIdGen.incrementAndGet();
 
-        pendingFutures.put(batchId, r);
-        notifyDelegate(FlowRuleBatchEvent.requested(new FlowRuleBatchRequest(batchId, toAdd, toRemove)));
+        notifyDelegate(FlowRuleBatchEvent.requested(new
+             FlowRuleBatchRequest(operation.id(), currentOps), operation.deviceId()));
 
-        return r;
+
     }
 
-    private void updateBackup(final DeviceId deviceId,
-                              final List<FlowRuleBatchEntry> toAdd,
-                              final List<FlowRuleBatchEntry> list) {
-
-        Future<?> submit = backupExecutors.submit(new UpdateBackup(deviceId, toAdd, list));
+    private void updateBackup(DeviceId deviceId, final Set<FlowRuleBatchEntry> entries) {
+        Future<?> backup = backupExecutors.submit(new UpdateBackup(deviceId, entries));
 
         if (syncBackup) {
             // wait for backup to complete
             try {
-                submit.get();
+                backup.get();
             } catch (InterruptedException | ExecutionException e) {
                 log.error("Failed to create backups", e);
             }
         }
     }
 
-    private void updateBackup(DeviceId deviceId, List<FlowRuleBatchEntry> toAdd) {
-
-        updateBackup(deviceId, toAdd, Collections.<FlowRuleBatchEntry>emptyList());
-    }
-
     @Override
     public void deleteFlowRule(FlowRule rule) {
-        storeBatch(new FlowRuleBatchOperation(Arrays.asList(new FlowRuleBatchEntry(FlowRuleOperation.REMOVE, rule))));
+        storeBatch(
+                new FlowRuleBatchOperation(
+                        Arrays.asList(
+                                new FlowRuleBatchEntry(
+                                        FlowRuleOperation.REMOVE,
+                                        rule)), rule.deviceId(), idGenerator.getNewId()));
     }
 
     @Override
@@ -484,37 +486,35 @@
         }
 
         log.warn("Tried to update FlowRule {} state,"
-                + " while the Node was not the master.", rule);
+                         + " while the Node was not the master.", rule);
         return null;
     }
 
     private FlowRuleEvent addOrUpdateFlowRuleInternal(FlowEntry rule) {
         final DeviceId did = rule.deviceId();
 
-        flowEntriesLock.writeLock().lock();
-        try {
-            // check if this new rule is an update to an existing entry
-            StoredFlowEntry stored = getFlowEntryInternal(rule);
-            if (stored != null) {
-                stored.setBytes(rule.bytes());
-                stored.setLife(rule.life());
-                stored.setPackets(rule.packets());
-                if (stored.state() == FlowEntryState.PENDING_ADD) {
-                    stored.setState(FlowEntryState.ADDED);
-                    FlowRuleBatchEntry entry =
-                            new FlowRuleBatchEntry(FlowRuleOperation.ADD, stored);
-                    updateBackup(did, Arrays.asList(entry));
-                    return new FlowRuleEvent(Type.RULE_ADDED, rule);
-                }
-                return new FlowRuleEvent(Type.RULE_UPDATED, rule);
-            }
 
-            // TODO: Confirm if this behavior is correct. See SimpleFlowRuleStore
-            // TODO: also update backup if the behavior is correct.
-            flowEntries.put(did, new DefaultFlowEntry(rule));
-        } finally {
-            flowEntriesLock.writeLock().unlock();
+        // check if this new rule is an update to an existing entry
+        StoredFlowEntry stored = flowTable.getFlowEntry(rule);
+        if (stored != null) {
+            stored.setBytes(rule.bytes());
+            stored.setLife(rule.life());
+            stored.setPackets(rule.packets());
+            if (stored.state() == FlowEntryState.PENDING_ADD) {
+                stored.setState(FlowEntryState.ADDED);
+                FlowRuleBatchEntry entry =
+                        new FlowRuleBatchEntry(FlowRuleOperation.ADD, stored);
+                updateBackup(did, Sets.newHashSet(entry));
+                return new FlowRuleEvent(Type.RULE_ADDED, rule);
+            }
+            return new FlowRuleEvent(Type.RULE_UPDATED, rule);
         }
+
+        // TODO: Confirm if this behavior is correct. See SimpleFlowRuleStore
+        // TODO: also update backup if the behavior is correct.
+        flowTable.add(rule);
+
+
         return null;
 
     }
@@ -540,9 +540,9 @@
                   replicaInfo.master().orNull(), deviceId);
 
         ClusterMessage message = new ClusterMessage(
-                  clusterService.getLocalNode().id(),
-                  REMOVE_FLOW_ENTRY,
-                  SERIALIZER.encode(rule));
+                clusterService.getLocalNode().id(),
+                REMOVE_FLOW_ENTRY,
+                SERIALIZER.encode(rule));
 
         try {
             Future<byte[]> responseFuture = clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
@@ -555,38 +555,42 @@
 
     private FlowRuleEvent removeFlowRuleInternal(FlowEntry rule) {
         final DeviceId deviceId = rule.deviceId();
-        flowEntriesLock.writeLock().lock();
-        try {
-            // This is where one could mark a rule as removed and still keep it in the store.
-            final boolean removed = flowEntries.remove(deviceId, rule);
-            FlowRuleBatchEntry entry =
-                    new FlowRuleBatchEntry(FlowRuleOperation.REMOVE, rule);
-            updateBackup(deviceId, Collections.<FlowRuleBatchEntry>emptyList(), Arrays.asList(entry));
-            if (removed) {
-                return new FlowRuleEvent(RULE_REMOVED, rule);
-            } else {
-                return null;
-            }
-        } finally {
-            flowEntriesLock.writeLock().unlock();
+        // This is where one could mark a rule as removed and still keep it in the store.
+        final boolean removed = flowTable.remove(deviceId, rule); //flowEntries.remove(deviceId, rule);
+        FlowRuleBatchEntry entry =
+                new FlowRuleBatchEntry(FlowRuleOperation.REMOVE, rule);
+        updateBackup(deviceId, Sets.newHashSet(entry));
+        if (removed) {
+            return new FlowRuleEvent(RULE_REMOVED, rule);
+        } else {
+            return null;
         }
+
     }
 
     @Override
     public void batchOperationComplete(FlowRuleBatchEvent event) {
-        final Integer batchId = event.subject().batchId();
-        SettableFuture<CompletedBatchOperation> future
-            = pendingFutures.getIfPresent(batchId);
-        if (future != null) {
-            future.set(event.result());
-            pendingFutures.invalidate(batchId);
+        //FIXME: need a per device pending response
+
+        NodeId nodeId = pendingResponses.remove(event.subject().batchId());
+        if (nodeId == null) {
+            notifyDelegate(event);
+        } else {
+            try {
+                ClusterMessage message = new ClusterMessage(
+                        clusterService.getLocalNode().id(),
+                        REMOTE_APPLY_COMPLETED,
+                        SERIALIZER.encode(event));
+                clusterCommunicator.sendAndReceive(message, nodeId);
+            } catch (IOException e) {
+                log.warn("Failed to respond to peer for batch operation result");
+            }
         }
-        notifyDelegate(event);
     }
 
     private void loadFromBackup(final DeviceId did) {
 
-        flowEntriesLock.writeLock().lock();
+
         try {
             log.debug("Loading FlowRules for {} from backups", did);
             SMap<FlowId, ImmutableList<StoredFlowEntry>> backupFlowTable = smaps.get(did);
@@ -595,38 +599,21 @@
 
                 log.trace("loading {}", e.getValue());
                 for (StoredFlowEntry entry : e.getValue()) {
-                    flowEntries.remove(did, entry);
-                    flowEntries.put(did, entry);
+                    flowTable.getFlowEntriesById(entry).remove(entry);
+                    flowTable.getFlowEntriesById(entry).add(entry);
+
+
                 }
             }
         } catch (ExecutionException e) {
             log.error("Failed to load backup flowtable for {}", did, e);
-        } finally {
-            flowEntriesLock.writeLock().unlock();
         }
     }
 
     private void removeFromPrimary(final DeviceId did) {
-        Collection<StoredFlowEntry> removed = null;
-        flowEntriesLock.writeLock().lock();
-        try {
-            removed = flowEntries.removeAll(did);
-        } finally {
-            flowEntriesLock.writeLock().unlock();
-        }
-        log.trace("removedFromPrimary {}", removed);
+        flowTable.clearDevice(did);
     }
 
-    private static final class TimeoutFuture
-        implements RemovalListener<Integer, SettableFuture<CompletedBatchOperation>> {
-        @Override
-        public void onRemoval(RemovalNotification<Integer, SettableFuture<CompletedBatchOperation>> notification) {
-            // wrapping in ExecutionException to support Future.get
-            notification.getValue()
-                .setException(new ExecutionException("Timed out",
-                                                     new TimeoutException()));
-        }
-    }
 
     private final class OnStoreBatch implements ClusterMessageHandler {
         private final NodeId local;
@@ -640,7 +627,7 @@
             FlowRuleBatchOperation operation = SERIALIZER.decode(message.payload());
             log.debug("received batch request {}", operation);
 
-            final DeviceId deviceId = operation.getOperations().get(0).target().deviceId();
+            final DeviceId deviceId = operation.deviceId();
             ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(deviceId);
             if (!local.equals(replicaInfo.master().orNull())) {
 
@@ -648,7 +635,7 @@
                 for (FlowRuleBatchEntry op : operation.getOperations()) {
                     failures.add(op.target());
                 }
-                CompletedBatchOperation allFailed = new CompletedBatchOperation(false, failures);
+                CompletedBatchOperation allFailed = new CompletedBatchOperation(false, failures, deviceId);
                 // This node is no longer the master, respond as all failed.
                 // TODO: we might want to wrap response in envelope
                 // to distinguish sw programming failure and hand over
@@ -661,36 +648,15 @@
                 return;
             }
 
-            final ListenableFuture<CompletedBatchOperation> f = storeBatchInternal(operation);
 
-            f.addListener(new Runnable() {
+            pendingResponses.put(operation.id(), message.sender());
+            storeBatchInternal(operation);
 
-                @Override
-                public void run() {
-                    CompletedBatchOperation result;
-                    try {
-                        result = f.get();
-                    } catch (InterruptedException | ExecutionException e) {
-                        log.error("Batch operation failed", e);
-                        // create everything failed response
-                        Set<FlowRule> failures = new HashSet<>(operation.size());
-                        for (FlowRuleBatchEntry op : operation.getOperations()) {
-                            failures.add(op.target());
-                        }
-                        result = new CompletedBatchOperation(false, failures);
-                    }
-                    try {
-                        message.respond(SERIALIZER.encode(result));
-                    } catch (IOException e) {
-                        log.error("Failed to respond back", e);
-                    }
-                }
-            }, futureListeners);
         }
     }
 
     private final class SMapLoader
-        extends CacheLoader<DeviceId, SMap<FlowId, ImmutableList<StoredFlowEntry>>> {
+            extends CacheLoader<DeviceId, SMap<FlowId, ImmutableList<StoredFlowEntry>>> {
 
         @Override
         public SMap<FlowId, ImmutableList<StoredFlowEntry>> load(DeviceId id)
@@ -701,7 +667,7 @@
     }
 
     private final class InternalReplicaInfoEventListener
-        implements ReplicaInfoEventListener {
+            implements ReplicaInfoEventListener {
 
         @Override
         public void event(ReplicaInfoEvent event) {
@@ -710,98 +676,166 @@
             final ReplicaInfo rInfo = event.replicaInfo();
 
             switch (event.type()) {
-            case MASTER_CHANGED:
-                if (local.equals(rInfo.master().orNull())) {
-                    // This node is the new master, populate local structure
-                    // from backup
-                    loadFromBackup(did);
-                } else {
-                    // This node is no longer the master holder,
-                    // clean local structure
-                    removeFromPrimary(did);
-                    // TODO: probably should stop pending backup activities in
-                    // executors to avoid overwriting with old value
-                }
-                break;
-            default:
-                break;
+                case MASTER_CHANGED:
+                    if (local.equals(rInfo.master().orNull())) {
+                        // This node is the new master, populate local structure
+                        // from backup
+                        loadFromBackup(did);
+                    } else {
+                        // This node is no longer the master holder,
+                        // clean local structure
+                        removeFromPrimary(did);
+                        // TODO: probably should stop pending backup activities in
+                        // executors to avoid overwriting with old value
+                    }
+                    break;
+                default:
+                    break;
 
             }
         }
     }
 
     // Task to update FlowEntries in backup HZ store
-    // TODO: Should be refactored to contain only one list and not
-    //      toAdd and toRemove
     private final class UpdateBackup implements Runnable {
 
         private final DeviceId deviceId;
-        private final List<FlowRuleBatchEntry> toAdd;
-        private final List<FlowRuleBatchEntry> toRemove;
+        private final Set<FlowRuleBatchEntry> ops;
+
 
         public UpdateBackup(DeviceId deviceId,
-                             List<FlowRuleBatchEntry> toAdd,
-                             List<FlowRuleBatchEntry> list) {
+                            Set<FlowRuleBatchEntry> ops) {
             this.deviceId = checkNotNull(deviceId);
-            this.toAdd = checkNotNull(toAdd);
-            this.toRemove = checkNotNull(list);
+            this.ops = checkNotNull(ops);
+
         }
 
         @Override
         public void run() {
             try {
-                log.trace("update backup {} +{} -{}", deviceId, toAdd, toRemove);
+                log.trace("update backup {} {}", deviceId, ops
+                );
                 final SMap<FlowId, ImmutableList<StoredFlowEntry>> backupFlowTable = smaps.get(deviceId);
-                // Following should be rewritten using async APIs
-                for (FlowRuleBatchEntry bEntry : toAdd) {
-                    final FlowRule entry = bEntry.target();
-                    final FlowId id = entry.id();
-                    ImmutableList<StoredFlowEntry> original = backupFlowTable.get(id);
-                    List<StoredFlowEntry> list = new ArrayList<>();
-                    if (original != null) {
-                        list.addAll(original);
-                    }
 
-                    list.remove(bEntry.target());
-                    list.add((StoredFlowEntry) entry);
 
-                    ImmutableList<StoredFlowEntry> newValue = ImmutableList.copyOf(list);
-                    boolean success;
-                    if (original == null) {
-                        success = (backupFlowTable.putIfAbsent(id, newValue) == null);
-                    } else {
-                        success = backupFlowTable.replace(id, original, newValue);
-                    }
-                    if (!success) {
-                        log.error("Updating backup failed.");
-                    }
-                }
-                for (FlowRuleBatchEntry bEntry : toRemove) {
-                    final FlowRule entry = bEntry.target();
-                    final FlowId id = entry.id();
-                    ImmutableList<StoredFlowEntry> original = backupFlowTable.get(id);
-                    List<StoredFlowEntry> list = new ArrayList<>();
-                    if (original != null) {
-                        list.addAll(original);
-                    }
+                ops.stream().forEach(
+                        op -> {
+                            final FlowRule entry = op.getTarget();
+                            final FlowId id = entry.id();
+                            ImmutableList<StoredFlowEntry> original = backupFlowTable.get(id);
+                            List<StoredFlowEntry> list = new ArrayList<>();
+                            if (original != null) {
+                                list.addAll(original);
+                            }
+                            list.remove(op.getTarget());
+                            if (op.getOperator() == FlowRuleOperation.ADD) {
+                                list.add((StoredFlowEntry) entry);
+                            }
 
-                    list.remove(bEntry.target());
+                            ImmutableList<StoredFlowEntry> newValue = ImmutableList.copyOf(list);
+                            boolean success;
+                            if (original == null) {
+                                success = (backupFlowTable.putIfAbsent(id, newValue) == null);
+                            } else {
+                                success = backupFlowTable.replace(id, original, newValue);
+                            }
+                            if (!success) {
+                                log.error("Updating backup failed.");
+                            }
 
-                    ImmutableList<StoredFlowEntry> newValue = ImmutableList.copyOf(list);
-                    boolean success;
-                    if (original == null) {
-                        success = (backupFlowTable.putIfAbsent(id, newValue) == null);
-                    } else {
-                        success = backupFlowTable.replace(id, original, newValue);
-                    }
-                    if (!success) {
-                        log.error("Updating backup failed.");
-                    }
-                }
+                        }
+                );
             } catch (ExecutionException e) {
                 log.error("Failed to write to backups", e);
             }
 
         }
     }
+
+    private class InternalFlowTable {
+
+        /*
+            TODO: This needs to be cleaned up. Perhaps using the eventually consistent
+            map when it supports distributed to a sequence of instances.
+         */
+
+
+        private final ConcurrentMap<DeviceId, ConcurrentMap<FlowId, Set<StoredFlowEntry>>>
+                flowEntries = new ConcurrentHashMap<>();
+
+
+        private NewConcurrentHashMap<FlowId, Set<StoredFlowEntry>> lazyEmptyFlowTable() {
+            return NewConcurrentHashMap.<FlowId, Set<StoredFlowEntry>>ifNeeded();
+        }
+
+        /**
+         * Returns the flow table for specified device.
+         *
+         * @param deviceId identifier of the device
+         * @return Map representing Flow Table of given device.
+         */
+        private ConcurrentMap<FlowId, Set<StoredFlowEntry>> getFlowTable(DeviceId deviceId) {
+            return createIfAbsentUnchecked(flowEntries,
+                                           deviceId, lazyEmptyFlowTable());
+        }
+
+        private Set<StoredFlowEntry> getFlowEntriesInternal(DeviceId deviceId, FlowId flowId) {
+            final ConcurrentMap<FlowId, Set<StoredFlowEntry>> flowTable = getFlowTable(deviceId);
+            Set<StoredFlowEntry> r = flowTable.get(flowId);
+            if (r == null) {
+                final Set<StoredFlowEntry> concurrentlyAdded;
+                r = new CopyOnWriteArraySet<>();
+                concurrentlyAdded = flowTable.putIfAbsent(flowId, r);
+                if (concurrentlyAdded != null) {
+                    return concurrentlyAdded;
+                }
+            }
+            return r;
+        }
+
+        private StoredFlowEntry getFlowEntryInternal(FlowRule rule) {
+            for (StoredFlowEntry f : getFlowEntriesInternal(rule.deviceId(), rule.id())) {
+                if (f.equals(rule)) {
+                    return f;
+                }
+            }
+            return null;
+        }
+
+        private Set<FlowEntry> getFlowEntriesInternal(DeviceId deviceId) {
+            return getFlowTable(deviceId).values().stream()
+                    .flatMap((list -> list.stream())).collect(Collectors.toSet());
+
+        }
+
+
+        public StoredFlowEntry getFlowEntry(FlowRule rule) {
+            return getFlowEntryInternal(rule);
+        }
+
+        public Set<FlowEntry> getFlowEntries(DeviceId deviceId) {
+            return getFlowEntriesInternal(deviceId);
+        }
+
+        public Set<StoredFlowEntry> getFlowEntriesById(FlowEntry entry) {
+            return getFlowEntriesInternal(entry.deviceId(), entry.id());
+        }
+
+        public void add(FlowEntry rule) {
+            ((CopyOnWriteArraySet)
+                    getFlowEntriesInternal(rule.deviceId(), rule.id())).add(rule);
+        }
+
+        public boolean remove(DeviceId deviceId, FlowEntry rule) {
+            return ((CopyOnWriteArraySet)
+                    getFlowEntriesInternal(deviceId, rule.id())).remove(rule);
+            //return flowEntries.remove(deviceId, rule);
+        }
+
+        public void clearDevice(DeviceId did) {
+            flowEntries.remove(did);
+        }
+    }
+
+
 }
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/FlowStoreMessageSubjects.java b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/FlowStoreMessageSubjects.java
index a21d73a..79df272 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/FlowStoreMessageSubjects.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/FlowStoreMessageSubjects.java
@@ -34,4 +34,7 @@
 
     public static final MessageSubject REMOVE_FLOW_ENTRY
         = new MessageSubject("peer-forward-remove-flow-entry");
+
+    public static final MessageSubject REMOTE_APPLY_COMPLETED
+            = new MessageSubject("peer-apply-completed");
 }