Improve fabric.p4 to reduce pipeline resources and refactor pipeconf impl

This patch affects both the P4 pipeline implementation and the
Java pipeconf.

P4 PIPELINE
- Less tables and smarter use of metadata to reduce inter-tables
dependencies and favor parallel execution of tables.
- Removed unused actions / renamed existing ones to make forwarding
behavior clearer (e.g. ingress_port_vlan table)
- Remove co-existence of simple and hansed table. Hashed should be the
default one, but implementations that do not support action profiles
might compile fabric.p4 to use the simple one.
- Use @name annotations for match fields to make control plane
independent of table implementation.
- Use @hidden to avoid showing actions and table on the p4info that
cannot be controlled at runtime.
- First attempt to support double VLAN cross-connect (xconnect table).
- New design has been tested with "fabric-refactoring" branch of
fabric-p4test:
github.com/opennetworkinglab/fabric-p4test/tree/fabric-refactoring

JAVA PIPECONF
This patch brings a major refactoring that reflects the experience
gathered in the past months of working on fabric.p4 and reasoning on its
pipeconf implementation. Indeed, the FlowObjective API is
under-specified and sometimes ambiguous which makes the process of
creating and maintaining a pipeliner implementation tedious. This
refactoring brings a simplified implementation by removing unused/
unnecessary functionalities and by recognizing commonality when possible
(e.g. by means of abstract and utility classes). It also makes design
patterns more explicit and consistent. Overall, the goal is to reduce
technical debt and to make it easier to support new features as we
evolve fabric.p4

Changes include:
- Changes in pipeliner/interpreter to reflect new pipeline design.
- By default translate objective treatment to PiAction. This favors
debuggability of flow rules in ONOS.
- Support new NextObjective’s NextTreatment class.
- Remove lots of unused/unnecessary code (e.g. async callback handling
for pending objective install status in pipeliner as current
implementation was always returning success)
- Gather commonality in abstract classes and simplify implementation
for objective translator (filtering, forwarding, next)
- New implementation of ForwardingFunctionTypes (FFT) that looks at
criterion instance values along with their types (to avoid relying on
case-specific if-else conditions to recognize variants of an FFT)
- Adaptive translation of NextObjective based on presence of simple or
hashed table.
- Support DENY FilteringObjective

Also:
- Fix onos-p4-gen-constants to avoid generating conflicting
PiMatchFieldId variable names.
- Install Graphviz tools in p4vm to generate p4c graphs
- Generate p4c graphs by default when compiling fabric.p4
- Use more compact Hex string when printing PI values

Change-Id: Ife79e44054dc5bc48833f95d0551a7370150eac5
diff --git a/pipelines/fabric/src/main/java/org/onosproject/pipelines/fabric/pipeliner/FabricPipeliner.java b/pipelines/fabric/src/main/java/org/onosproject/pipelines/fabric/pipeliner/FabricPipeliner.java
index 7b3006d..4bf1bb5 100644
--- a/pipelines/fabric/src/main/java/org/onosproject/pipelines/fabric/pipeliner/FabricPipeliner.java
+++ b/pipelines/fabric/src/main/java/org/onosproject/pipelines/fabric/pipeliner/FabricPipeliner.java
@@ -16,56 +16,50 @@
 
 package org.onosproject.pipelines.fabric.pipeliner;
 
-import com.google.common.base.MoreObjects;
-import com.google.common.base.Objects;
 import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
 import org.onlab.util.KryoNamespace;
-import org.onlab.util.Tools;
-import org.onosproject.core.GroupId;
+import org.onlab.util.SharedExecutors;
 import org.onosproject.net.DeviceId;
 import org.onosproject.net.PortNumber;
 import org.onosproject.net.behaviour.NextGroup;
 import org.onosproject.net.behaviour.Pipeliner;
 import org.onosproject.net.behaviour.PipelinerContext;
-import org.onosproject.net.driver.AbstractHandlerBehaviour;
-import org.onosproject.net.driver.Driver;
-import org.onosproject.net.flow.FlowId;
 import org.onosproject.net.flow.FlowRule;
 import org.onosproject.net.flow.FlowRuleOperations;
 import org.onosproject.net.flow.FlowRuleService;
-import org.onosproject.net.flow.instructions.Instruction;
-import org.onosproject.net.flow.instructions.Instructions;
 import org.onosproject.net.flowobjective.FilteringObjective;
 import org.onosproject.net.flowobjective.FlowObjectiveStore;
 import org.onosproject.net.flowobjective.ForwardingObjective;
+import org.onosproject.net.flowobjective.IdNextTreatment;
 import org.onosproject.net.flowobjective.NextObjective;
+import org.onosproject.net.flowobjective.NextTreatment;
 import org.onosproject.net.flowobjective.Objective;
 import org.onosproject.net.flowobjective.ObjectiveError;
 import org.onosproject.net.group.GroupDescription;
-import org.onosproject.net.group.GroupEvent;
 import org.onosproject.net.group.GroupService;
+import org.onosproject.pipelines.fabric.AbstractFabricHandlerBehavior;
 import org.onosproject.store.serializers.KryoNamespaces;
 import org.slf4j.Logger;
 
 import java.util.Collection;
 import java.util.List;
-import java.util.Map;
-import java.util.Set;
+import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.function.Consumer;
 import java.util.stream.Collectors;
 
+import static java.lang.String.format;
+import static org.onosproject.pipelines.fabric.FabricUtils.outputPort;
 import static org.slf4j.LoggerFactory.getLogger;
 
 /**
- * Pipeliner for fabric pipeline.
+ * Pipeliner implementation for fabric pipeline which uses ObjectiveTranslator
+ * implementations to translate flow objectives for the different blocks,
+ * filtering, forwarding and next.
  */
-public class FabricPipeliner  extends AbstractHandlerBehaviour implements Pipeliner {
+public class FabricPipeliner extends AbstractFabricHandlerBehavior
+        implements Pipeliner {
+
     private static final Logger log = getLogger(FabricPipeliner.class);
 
     protected static final KryoNamespace KRYO = new KryoNamespace.Builder()
@@ -73,270 +67,213 @@
             .register(FabricNextGroup.class)
             .build("FabricPipeliner");
 
-    private static final int NUM_CALLBACK_THREAD = 2;
-
     protected DeviceId deviceId;
     protected FlowRuleService flowRuleService;
     protected GroupService groupService;
     protected FlowObjectiveStore flowObjectiveStore;
-    FabricFilteringPipeliner pipelinerFilter;
-    FabricForwardingPipeliner pipelinerForward;
-    FabricNextPipeliner pipelinerNext;
 
-    private Map<PendingFlowKey, PendingInstallObjective> pendingInstallObjectiveFlows = new ConcurrentHashMap<>();
-    private Map<PendingGroupKey, PendingInstallObjective> pendingInstallObjectiveGroups = new ConcurrentHashMap<>();
-    private Map<Objective, PendingInstallObjective> pendingInstallObjectives = Maps.newConcurrentMap();
+    private FilteringObjectiveTranslator filteringTranslator;
+    private ForwardingObjectiveTranslator forwardingTranslator;
+    private NextObjectiveTranslator nextTranslator;
 
-    private static ExecutorService flowObjCallbackExecutor =
-            Executors.newFixedThreadPool(NUM_CALLBACK_THREAD, Tools.groupedThreads("fabric-pipeliner", "cb-", log));
-
+    private final ExecutorService callbackExecutor = SharedExecutors.getPoolThreadExecutor();
 
     @Override
     public void init(DeviceId deviceId, PipelinerContext context) {
-        Driver driver = handler().driver();
         this.deviceId = deviceId;
         this.flowRuleService = context.directory().get(FlowRuleService.class);
         this.groupService = context.directory().get(GroupService.class);
         this.flowObjectiveStore = context.directory().get(FlowObjectiveStore.class);
-        this.pipelinerFilter = new FabricFilteringPipeliner(deviceId);
-        this.pipelinerForward = new FabricForwardingPipeliner(deviceId);
-        this.pipelinerNext = new FabricNextPipeliner(deviceId, driver);
+        this.filteringTranslator = new FilteringObjectiveTranslator(deviceId, capabilities);
+        this.forwardingTranslator = new ForwardingObjectiveTranslator(deviceId, capabilities);
+        this.nextTranslator = new NextObjectiveTranslator(deviceId, capabilities);
     }
 
     @Override
-    public void filter(FilteringObjective filterObjective) {
-        PipelinerTranslationResult result = pipelinerFilter.filter(filterObjective);
-        if (result.error().isPresent()) {
-            fail(filterObjective, result.error().get());
-            return;
-        }
-
-        applyTranslationResult(filterObjective, result, error -> {
-            if (error == null) {
-                success(filterObjective);
-            } else {
-                log.info("Ignore error {}. Let flow subsystem retry", error);
-                success(filterObjective);
-            }
-        });
+    public void filter(FilteringObjective obj) {
+        final ObjectiveTranslation result = filteringTranslator.translate(obj);
+        handleResult(obj, result);
     }
 
     @Override
-    public void forward(ForwardingObjective forwardObjective) {
-        PipelinerTranslationResult result = pipelinerForward.forward(forwardObjective);
-        if (result.error().isPresent()) {
-            fail(forwardObjective, result.error().get());
-            return;
-        }
-
-        applyTranslationResult(forwardObjective, result, error -> {
-            if (error == null) {
-                success(forwardObjective);
-            } else {
-                log.info("Ignore error {}. Let flow subsystem retry", error);
-                success(forwardObjective);
-            }
-        });
+    public void forward(ForwardingObjective obj) {
+        final ObjectiveTranslation result = forwardingTranslator.translate(obj);
+        handleResult(obj, result);
     }
 
     @Override
-    public void next(NextObjective nextObjective) {
-        PipelinerTranslationResult result = pipelinerNext.next(nextObjective);
-
-        if (result.error().isPresent()) {
-            fail(nextObjective, result.error().get());
-            return;
-        }
-
-        if (nextObjective.op() == Objective.Operation.VERIFY) {
+    public void next(NextObjective obj) {
+        if (obj.op() == Objective.Operation.VERIFY) {
             // TODO: support VERIFY operation
-            log.debug("Currently we don't support VERIFY operation, return success directly to the context");
-            success(nextObjective);
+            log.debug("VERIFY operation not yet supported for NextObjective, will return success");
+            success(obj);
             return;
         }
 
-        if (nextObjective.op() == Objective.Operation.MODIFY) {
+        if (obj.op() == Objective.Operation.MODIFY) {
             // TODO: support MODIFY operation
-            log.debug("Currently we don't support MODIFY operation, return failure directly to the context");
-            fail(nextObjective, ObjectiveError.UNSUPPORTED);
+            log.warn("MODIFY operation not yet supported for NextObjective, will return failure :(");
+            fail(obj, ObjectiveError.UNSUPPORTED);
             return;
         }
 
-        applyTranslationResult(nextObjective, result, error -> {
-            if (error != null) {
-                log.info("Ignore error {}. Let flow/group subsystem retry", error);
-                success(nextObjective);
-                return;
-            }
-
-            if (nextObjective.op() == Objective.Operation.REMOVE) {
-                if (flowObjectiveStore.getNextGroup(nextObjective.id()) == null) {
-                    log.warn("Can not find next obj {} from store", nextObjective.id());
-                    return;
-                }
-                flowObjectiveStore.removeNextGroup(nextObjective.id());
-            } else {
-                // Success, put next group to objective store
-                List<PortNumber> portNumbers = Lists.newArrayList();
-                nextObjective.next().forEach(treatment ->
-                        treatment.allInstructions()
-                                .stream()
-                                .filter(inst -> inst.type() == Instruction.Type.OUTPUT)
-                                .map(inst -> (Instructions.OutputInstruction) inst)
-                                .findFirst()
-                                .map(Instructions.OutputInstruction::port)
-                                .ifPresent(portNumbers::add)
-                );
-                FabricNextGroup nextGroup = new FabricNextGroup(nextObjective.type(),
-                                                                portNumbers);
-                flowObjectiveStore.putNextGroup(nextObjective.id(), nextGroup);
-            }
-
-            success(nextObjective);
-        });
+        final ObjectiveTranslation result = nextTranslator.translate(obj);
+        handleResult(obj, result);
     }
 
     @Override
     public List<String> getNextMappings(NextGroup nextGroup) {
-        FabricNextGroup fabricNextGroup = KRYO.deserialize(nextGroup.data());
-        NextObjective.Type type = fabricNextGroup.type();
-        Collection<PortNumber> outputPorts = fabricNextGroup.outputPorts();
-
-        return outputPorts.stream()
-                .map(port -> String.format("%s -> %s", type, port))
+        final FabricNextGroup fabricNextGroup = KRYO.deserialize(nextGroup.data());
+        return fabricNextGroup.nextMappings().stream()
+                .map(m -> format("%s -> %s", fabricNextGroup.type(), m))
                 .collect(Collectors.toList());
     }
 
-    private void applyTranslationResult(Objective objective,
-                                        PipelinerTranslationResult result,
-                                        Consumer<ObjectiveError> callback) {
-        Collection<GroupDescription> groups = result.groups();
-        Collection<FlowRule> flowRules = result.flowRules();
-
-        Set<FlowId> flowIds = flowRules.stream().map(FlowRule::id).collect(Collectors.toSet());
-        Set<PendingGroupKey> pendingGroupKeys = groups.stream().map(GroupDescription::givenGroupId)
-                .map(GroupId::new)
-                .map(gid -> new PendingGroupKey(gid, objective.op()))
-                .collect(Collectors.toSet());
-
-        PendingInstallObjective pio =
-                new PendingInstallObjective(objective, flowIds, pendingGroupKeys, callback);
-
-        flowIds.forEach(flowId -> {
-            PendingFlowKey pfk = new PendingFlowKey(flowId, objective.id());
-            pendingInstallObjectiveFlows.put(pfk, pio);
-        });
-
-        pendingGroupKeys.forEach(pendingGroupKey ->
-            pendingInstallObjectiveGroups.put(pendingGroupKey, pio)
-        );
-
-        pendingInstallObjectives.put(objective, pio);
-        installGroups(objective, groups);
-        installFlows(objective, flowRules);
+    private void handleResult(Objective obj, ObjectiveTranslation result) {
+        if (result.error().isPresent()) {
+            fail(obj, result.error().get());
+            return;
+        }
+        processGroups(obj, result.groups());
+        processFlows(obj, result.flowRules());
+        if (obj instanceof NextObjective) {
+            handleNextGroup((NextObjective) obj);
+        }
+        success(obj);
     }
 
-    private void installFlows(Objective objective, Collection<FlowRule> flowRules) {
+    private void handleNextGroup(NextObjective obj) {
+        switch (obj.op()) {
+            case REMOVE:
+                removeNextGroup(obj);
+                break;
+            case ADD:
+            case ADD_TO_EXISTING:
+            case REMOVE_FROM_EXISTING:
+            case MODIFY:
+                putNextGroup(obj);
+                break;
+            case VERIFY:
+                break;
+            default:
+                log.error("Unknown NextObjective operation '{}'", obj.op());
+        }
+    }
+
+    private void processFlows(Objective objective, Collection<FlowRule> flowRules) {
         if (flowRules.isEmpty()) {
             return;
         }
-
-        FlowRuleOperations ops = buildFlowRuleOps(objective, flowRules);
-        if (ops == null) {
-            return;
+        final FlowRuleOperations.Builder ops = FlowRuleOperations.builder();
+        switch (objective.op()) {
+            case ADD:
+            case ADD_TO_EXISTING:
+                flowRules.forEach(ops::add);
+                break;
+            case REMOVE:
+            case REMOVE_FROM_EXISTING:
+                flowRules.forEach(ops::remove);
+                break;
+            default:
+                log.warn("Unsupported Objective operation '{}'", objective.op());
+                return;
         }
-        flowRuleService.apply(ops);
-
-        flowRules.forEach(flow -> {
-            PendingFlowKey pfk = new PendingFlowKey(flow.id(), objective.id());
-            PendingInstallObjective pio = pendingInstallObjectiveFlows.remove(pfk);
-
-            if (pio != null) {
-                pio.flowInstalled(flow.id());
-            }
-        });
+        flowRuleService.apply(ops.build());
     }
 
-    private void installGroups(Objective objective, Collection<GroupDescription> groups) {
+    private void processGroups(Objective objective, Collection<GroupDescription> groups) {
         if (groups.isEmpty()) {
             return;
         }
-
         switch (objective.op()) {
             case ADD:
                 groups.forEach(groupService::addGroup);
                 break;
             case REMOVE:
-                groups.forEach(group -> groupService.removeGroup(deviceId, group.appCookie(), objective.appId()));
+                groups.forEach(group -> groupService.removeGroup(
+                        deviceId, group.appCookie(), objective.appId()));
                 break;
             case ADD_TO_EXISTING:
-                groups.forEach(group -> groupService.addBucketsToGroup(deviceId, group.appCookie(),
-                        group.buckets(), group.appCookie(), group.appId())
+                groups.forEach(group -> groupService.addBucketsToGroup(
+                        deviceId, group.appCookie(), group.buckets(),
+                        group.appCookie(), group.appId())
                 );
                 break;
             case REMOVE_FROM_EXISTING:
-                groups.forEach(group -> groupService.removeBucketsFromGroup(deviceId, group.appCookie(),
-                        group.buckets(), group.appCookie(), group.appId())
+                groups.forEach(group -> groupService.removeBucketsFromGroup(
+                        deviceId, group.appCookie(), group.buckets(),
+                        group.appCookie(), group.appId())
                 );
                 break;
             default:
-                log.warn("Unsupported objective operation {}", objective.op());
-                return;
+                log.warn("Unsupported Objective operation {}", objective.op());
         }
+    }
 
-        groups.forEach(group -> {
-            PendingGroupKey pendingGroupKey = new PendingGroupKey(new GroupId(group.givenGroupId()), objective.op());
-            PendingInstallObjective pio = pendingInstallObjectiveGroups.remove(pendingGroupKey);
-            pio.groupInstalled(pendingGroupKey);
-        });
+    private void fail(Objective objective, ObjectiveError error) {
+        CompletableFuture.runAsync(
+                () -> objective.context().ifPresent(
+                        ctx -> ctx.onError(objective, error)), callbackExecutor);
 
     }
 
-    private static void fail(Objective objective, ObjectiveError error) {
-        CompletableFuture.runAsync(() -> objective.context().ifPresent(ctx -> ctx.onError(objective, error)),
-                flowObjCallbackExecutor);
 
+    private void success(Objective objective) {
+        CompletableFuture.runAsync(
+                () -> objective.context().ifPresent(
+                        ctx -> ctx.onSuccess(objective)), callbackExecutor);
     }
 
-    private static void success(Objective objective) {
-        CompletableFuture.runAsync(() -> objective.context().ifPresent(ctx -> ctx.onSuccess(objective)),
-                flowObjCallbackExecutor);
+    private void removeNextGroup(NextObjective obj) {
+        final NextGroup removed = flowObjectiveStore.removeNextGroup(obj.id());
+        if (removed == null) {
+            log.debug("NextGroup {} was not found in FlowObjectiveStore");
+        }
     }
 
-    private static FlowRuleOperations buildFlowRuleOps(Objective objective, Collection<FlowRule> flowRules) {
-        FlowRuleOperations.Builder ops = FlowRuleOperations.builder();
-        switch (objective.op()) {
-            case ADD:
-            case ADD_TO_EXISTING: // For egress VLAN
-                flowRules.forEach(ops::add);
-                break;
-            case REMOVE:
-            case REMOVE_FROM_EXISTING: // For egress VLAN
-                flowRules.forEach(ops::remove);
-                break;
+    private void putNextGroup(NextObjective obj) {
+        final List<String> nextMappings = obj.nextTreatments().stream()
+                .map(this::nextTreatmentToMappingString)
+                .filter(Objects::nonNull)
+                .collect(Collectors.toList());
+        final FabricNextGroup nextGroup = new FabricNextGroup(obj.type(), nextMappings);
+        flowObjectiveStore.putNextGroup(obj.id(), nextGroup);
+    }
+
+    private String nextTreatmentToMappingString(NextTreatment n) {
+        switch (n.type()) {
+            case TREATMENT:
+                final PortNumber p = outputPort(n);
+                return p == null ? "UNKNOWN"
+                        : format("OUTPUT:%s", p.toString());
+            case ID:
+                final IdNextTreatment id = (IdNextTreatment) n;
+                return format("NEXT_ID:%d", id.nextId());
             default:
-                log.warn("Unsupported op {} for {}", objective.op(), objective);
-                fail(objective, ObjectiveError.BADPARAMS);
-                return null;
+                log.warn("Unknown NextTreatment type '{}'", n.type());
+                return "???";
         }
-        return ops.build();
     }
 
-    class FabricNextGroup implements NextGroup {
-        private NextObjective.Type type;
-        private Collection<PortNumber> outputPorts;
+    /**
+     * NextGroup implementation.
+     */
+    private static class FabricNextGroup implements NextGroup {
 
-        FabricNextGroup(NextObjective.Type type, Collection<PortNumber> outputPorts) {
+        private final NextObjective.Type type;
+        private final List<String> nextMappings;
+
+        FabricNextGroup(NextObjective.Type type, List<String> nextMappings) {
             this.type = type;
-            this.outputPorts = ImmutableList.copyOf(outputPorts);
+            this.nextMappings = ImmutableList.copyOf(nextMappings);
         }
 
         NextObjective.Type type() {
             return type;
         }
 
-        Collection<PortNumber> outputPorts() {
-            return outputPorts;
+        Collection<String> nextMappings() {
+            return nextMappings;
         }
 
         @Override
@@ -344,168 +281,4 @@
             return KRYO.serialize(this);
         }
     }
-
-    class PendingInstallObjective {
-        Objective objective;
-        Collection<FlowId> flowIds;
-        Collection<PendingGroupKey> pendingGroupKeys;
-        Consumer<ObjectiveError> callback;
-
-        PendingInstallObjective(Objective objective, Collection<FlowId> flowIds,
-                                       Collection<PendingGroupKey> pendingGroupKeys,
-                                       Consumer<ObjectiveError> callback) {
-            this.objective = objective;
-            this.flowIds = flowIds;
-            this.pendingGroupKeys = pendingGroupKeys;
-            this.callback = callback;
-        }
-
-        void flowInstalled(FlowId flowId) {
-            synchronized (this) {
-                flowIds.remove(flowId);
-                checkIfFinished();
-            }
-        }
-
-        void groupInstalled(PendingGroupKey pendingGroupKey) {
-            synchronized (this) {
-                pendingGroupKeys.remove(pendingGroupKey);
-                checkIfFinished();
-            }
-        }
-
-        private void checkIfFinished() {
-            if (flowIds.isEmpty() && pendingGroupKeys.isEmpty()) {
-                pendingInstallObjectives.remove(objective);
-                callback.accept(null);
-            }
-        }
-
-        void failed(Objective obj, ObjectiveError error) {
-            flowIds.forEach(flowId -> {
-                PendingFlowKey pfk = new PendingFlowKey(flowId, obj.id());
-                pendingInstallObjectiveFlows.remove(pfk);
-            });
-            pendingGroupKeys.forEach(pendingInstallObjectiveGroups::remove);
-            pendingInstallObjectives.remove(objective);
-            callback.accept(error);
-        }
-
-        @Override
-        public boolean equals(Object o) {
-            if (this == o) {
-                return true;
-            }
-            if (o == null || getClass() != o.getClass()) {
-                return false;
-            }
-            PendingInstallObjective pio = (PendingInstallObjective) o;
-            return Objects.equal(objective, pio.objective) &&
-                    Objects.equal(flowIds, pio.flowIds) &&
-                    Objects.equal(pendingGroupKeys, pio.pendingGroupKeys) &&
-                    Objects.equal(callback, pio.callback);
-        }
-
-        @Override
-        public int hashCode() {
-            return Objects.hashCode(objective, flowIds, pendingGroupKeys, callback);
-        }
-
-        @Override
-        public String toString() {
-            return MoreObjects.toStringHelper(this)
-                    .add("obj", objective)
-                    .add("flowIds", flowIds)
-                    .add("pendingGroupKeys", pendingGroupKeys)
-                    .add("callback", callback)
-                    .toString();
-        }
-    }
-
-    class PendingFlowKey {
-        private FlowId flowId;
-        private int objId;
-
-        PendingFlowKey(FlowId flowId, int objId) {
-            this.flowId = flowId;
-            this.objId = objId;
-        }
-
-        @Override
-        public boolean equals(Object o) {
-            if (this == o) {
-                return true;
-            }
-            if (o == null || getClass() != o.getClass()) {
-                return false;
-            }
-            PendingFlowKey pendingFlowKey = (PendingFlowKey) o;
-            return Objects.equal(flowId, pendingFlowKey.flowId) &&
-                    objId == pendingFlowKey.objId;
-        }
-
-        @Override
-        public int hashCode() {
-            return Objects.hashCode(flowId, objId);
-        }
-
-        @Override
-        public String toString() {
-            return MoreObjects.toStringHelper(this)
-                    .add("flowId", flowId)
-                    .add("objId", objId)
-                    .toString();
-        }
-    }
-
-    class PendingGroupKey {
-        private GroupId groupId;
-        private GroupEvent.Type expectedEventType;
-
-        PendingGroupKey(GroupId groupId, NextObjective.Operation objOp) {
-            this.groupId = groupId;
-
-            switch (objOp) {
-                case ADD:
-                    expectedEventType = GroupEvent.Type.GROUP_ADDED;
-                    break;
-                case REMOVE:
-                    expectedEventType = GroupEvent.Type.GROUP_REMOVED;
-                    break;
-                case MODIFY:
-                case ADD_TO_EXISTING:
-                case REMOVE_FROM_EXISTING:
-                    expectedEventType = GroupEvent.Type.GROUP_UPDATED;
-                    break;
-                default:
-                    expectedEventType = null;
-            }
-        }
-
-        @Override
-        public boolean equals(Object o) {
-            if (this == o) {
-                return true;
-            }
-            if (o == null || getClass() != o.getClass()) {
-                return false;
-            }
-            PendingGroupKey pendingGroupKey = (PendingGroupKey) o;
-            return Objects.equal(groupId, pendingGroupKey.groupId) &&
-                    expectedEventType == pendingGroupKey.expectedEventType;
-        }
-
-        @Override
-        public int hashCode() {
-            return Objects.hashCode(groupId, expectedEventType);
-        }
-
-        @Override
-        public String toString() {
-            return MoreObjects.toStringHelper(this)
-                    .add("groupId", groupId)
-                    .add("expectedEventType", expectedEventType)
-                    .toString();
-        }
-    }
 }