blob: 61d3824266f5eaf201f00e57b36cd5ab5f1de8f7 [file] [log] [blame]
Yi Tseng0b809722017-11-03 10:23:26 -07001/*
2 * Copyright 2017-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.pipelines.fabric.pipeliner;
18
Yi Tsengfe13f3e2018-08-19 03:09:54 +080019import com.google.common.base.MoreObjects;
20import com.google.common.base.Objects;
Yi Tseng0b809722017-11-03 10:23:26 -070021import com.google.common.collect.ImmutableList;
22import com.google.common.collect.Lists;
Charles Chan91ea9722018-08-30 15:56:32 -070023import com.google.common.collect.Maps;
Yi Tseng0b809722017-11-03 10:23:26 -070024import org.onlab.util.KryoNamespace;
Yi Tsengfe13f3e2018-08-19 03:09:54 +080025import org.onlab.util.Tools;
Yi Tsengf78e1742018-04-08 19:57:17 +080026import org.onosproject.core.GroupId;
Yi Tseng0b809722017-11-03 10:23:26 -070027import org.onosproject.net.DeviceId;
28import org.onosproject.net.PortNumber;
29import org.onosproject.net.behaviour.NextGroup;
30import org.onosproject.net.behaviour.Pipeliner;
31import org.onosproject.net.behaviour.PipelinerContext;
32import org.onosproject.net.driver.AbstractHandlerBehaviour;
Yi Tseng4fd28432018-02-01 14:48:03 -080033import org.onosproject.net.driver.Driver;
Yi Tsengf78e1742018-04-08 19:57:17 +080034import org.onosproject.net.flow.FlowId;
Yi Tseng0b809722017-11-03 10:23:26 -070035import org.onosproject.net.flow.FlowRule;
36import org.onosproject.net.flow.FlowRuleOperations;
Yi Tseng0b809722017-11-03 10:23:26 -070037import org.onosproject.net.flow.FlowRuleService;
38import org.onosproject.net.flow.instructions.Instruction;
39import org.onosproject.net.flow.instructions.Instructions;
40import org.onosproject.net.flowobjective.FilteringObjective;
41import org.onosproject.net.flowobjective.FlowObjectiveStore;
42import org.onosproject.net.flowobjective.ForwardingObjective;
43import org.onosproject.net.flowobjective.NextObjective;
44import org.onosproject.net.flowobjective.Objective;
45import org.onosproject.net.flowobjective.ObjectiveError;
46import org.onosproject.net.group.GroupDescription;
47import org.onosproject.net.group.GroupEvent;
Yi Tseng0b809722017-11-03 10:23:26 -070048import org.onosproject.net.group.GroupService;
49import org.onosproject.store.serializers.KryoNamespaces;
50import org.slf4j.Logger;
51
52import java.util.Collection;
53import java.util.List;
Yi Tseng1b154bd2017-11-20 17:48:19 -080054import java.util.Map;
Yi Tsengf78e1742018-04-08 19:57:17 +080055import java.util.Set;
Yi Tsengfe13f3e2018-08-19 03:09:54 +080056import java.util.concurrent.CompletableFuture;
Yi Tsengf78e1742018-04-08 19:57:17 +080057import java.util.concurrent.ConcurrentHashMap;
Yi Tsengfe13f3e2018-08-19 03:09:54 +080058import java.util.concurrent.ExecutorService;
59import java.util.concurrent.Executors;
Yi Tseng0b809722017-11-03 10:23:26 -070060import java.util.function.Consumer;
Yi Tseng1b154bd2017-11-20 17:48:19 -080061import java.util.stream.Collectors;
Yi Tseng0b809722017-11-03 10:23:26 -070062
63import static org.slf4j.LoggerFactory.getLogger;
64
65/**
66 * Pipeliner for fabric pipeline.
67 */
68public class FabricPipeliner extends AbstractHandlerBehaviour implements Pipeliner {
69 private static final Logger log = getLogger(FabricPipeliner.class);
70
71 protected static final KryoNamespace KRYO = new KryoNamespace.Builder()
72 .register(KryoNamespaces.API)
73 .register(FabricNextGroup.class)
74 .build("FabricPipeliner");
75
Yi Tsengfe13f3e2018-08-19 03:09:54 +080076 private static final int NUM_CALLBACK_THREAD = 2;
Yi Tseng0b809722017-11-03 10:23:26 -070077
78 protected DeviceId deviceId;
79 protected FlowRuleService flowRuleService;
80 protected GroupService groupService;
81 protected FlowObjectiveStore flowObjectiveStore;
Charles Chan91ea9722018-08-30 15:56:32 -070082 FabricFilteringPipeliner pipelinerFilter;
83 FabricForwardingPipeliner pipelinerForward;
84 FabricNextPipeliner pipelinerNext;
Yi Tseng0b809722017-11-03 10:23:26 -070085
Charles Chan68e59732018-08-21 15:56:07 -070086 private Map<PendingFlowKey, PendingInstallObjective> pendingInstallObjectiveFlows = new ConcurrentHashMap<>();
Yi Tsengfe13f3e2018-08-19 03:09:54 +080087 private Map<PendingGroupKey, PendingInstallObjective> pendingInstallObjectiveGroups = new ConcurrentHashMap<>();
Charles Chan91ea9722018-08-30 15:56:32 -070088 private Map<Objective, PendingInstallObjective> pendingInstallObjectives = Maps.newConcurrentMap();
89
Yi Tsengfe13f3e2018-08-19 03:09:54 +080090 private static ExecutorService flowObjCallbackExecutor =
91 Executors.newFixedThreadPool(NUM_CALLBACK_THREAD, Tools.groupedThreads("fabric-pipeliner", "cb-", log));
Yi Tsengf78e1742018-04-08 19:57:17 +080092
Yi Tseng0b809722017-11-03 10:23:26 -070093
94 @Override
95 public void init(DeviceId deviceId, PipelinerContext context) {
Yi Tseng4fd28432018-02-01 14:48:03 -080096 Driver driver = handler().driver();
Yi Tseng0b809722017-11-03 10:23:26 -070097 this.deviceId = deviceId;
98 this.flowRuleService = context.directory().get(FlowRuleService.class);
99 this.groupService = context.directory().get(GroupService.class);
100 this.flowObjectiveStore = context.directory().get(FlowObjectiveStore.class);
101 this.pipelinerFilter = new FabricFilteringPipeliner(deviceId);
102 this.pipelinerForward = new FabricForwardingPipeliner(deviceId);
Yi Tseng4fd28432018-02-01 14:48:03 -0800103 this.pipelinerNext = new FabricNextPipeliner(deviceId, driver);
Yi Tseng0b809722017-11-03 10:23:26 -0700104 }
105
106 @Override
107 public void filter(FilteringObjective filterObjective) {
108 PipelinerTranslationResult result = pipelinerFilter.filter(filterObjective);
109 if (result.error().isPresent()) {
110 fail(filterObjective, result.error().get());
111 return;
112 }
113
Yi Tsengfe13f3e2018-08-19 03:09:54 +0800114 applyTranslationResult(filterObjective, result, error -> {
115 if (error == null) {
Yi Tseng0b809722017-11-03 10:23:26 -0700116 success(filterObjective);
117 } else {
Charles Chan91ea9722018-08-30 15:56:32 -0700118 log.info("Ignore error {}. Let flow subsystem retry", error);
119 success(filterObjective);
Yi Tseng0b809722017-11-03 10:23:26 -0700120 }
121 });
122 }
123
124 @Override
125 public void forward(ForwardingObjective forwardObjective) {
126 PipelinerTranslationResult result = pipelinerForward.forward(forwardObjective);
127 if (result.error().isPresent()) {
128 fail(forwardObjective, result.error().get());
129 return;
130 }
131
Yi Tsengfe13f3e2018-08-19 03:09:54 +0800132 applyTranslationResult(forwardObjective, result, error -> {
133 if (error == null) {
Yi Tseng0b809722017-11-03 10:23:26 -0700134 success(forwardObjective);
135 } else {
Charles Chan91ea9722018-08-30 15:56:32 -0700136 log.info("Ignore error {}. Let flow subsystem retry", error);
137 success(forwardObjective);
Yi Tseng0b809722017-11-03 10:23:26 -0700138 }
139 });
140 }
141
142 @Override
143 public void next(NextObjective nextObjective) {
144 PipelinerTranslationResult result = pipelinerNext.next(nextObjective);
145
146 if (result.error().isPresent()) {
147 fail(nextObjective, result.error().get());
148 return;
149 }
150
Yi Tseng1b154bd2017-11-20 17:48:19 -0800151 if (nextObjective.op() == Objective.Operation.VERIFY) {
152 // TODO: support VERIFY operation
153 log.debug("Currently we don't support VERIFY operation, return success directly to the context");
154 success(nextObjective);
155 return;
156 }
157
Charles Chan91ea9722018-08-30 15:56:32 -0700158 if (nextObjective.op() == Objective.Operation.MODIFY) {
159 // TODO: support MODIFY operation
160 log.debug("Currently we don't support MODIFY operation, return failure directly to the context");
161 fail(nextObjective, ObjectiveError.UNSUPPORTED);
162 return;
163 }
164
Yi Tsengfe13f3e2018-08-19 03:09:54 +0800165 applyTranslationResult(nextObjective, result, error -> {
166 if (error != null) {
Charles Chan91ea9722018-08-30 15:56:32 -0700167 log.info("Ignore error {}. Let flow/group subsystem retry", error);
168 success(nextObjective);
Yi Tseng0b809722017-11-03 10:23:26 -0700169 return;
170 }
171
172 // Success, put next group to objective store
173 List<PortNumber> portNumbers = Lists.newArrayList();
Charles Chan91ea9722018-08-30 15:56:32 -0700174 nextObjective.next().forEach(treatment ->
Yi Tsengfe13f3e2018-08-19 03:09:54 +0800175 treatment.allInstructions()
Yi Tseng0b809722017-11-03 10:23:26 -0700176 .stream()
177 .filter(inst -> inst.type() == Instruction.Type.OUTPUT)
178 .map(inst -> (Instructions.OutputInstruction) inst)
179 .findFirst()
Yi Tsengfe13f3e2018-08-19 03:09:54 +0800180 .map(Instructions.OutputInstruction::port)
Charles Chan91ea9722018-08-30 15:56:32 -0700181 .ifPresent(portNumbers::add)
182 );
Yi Tseng0b809722017-11-03 10:23:26 -0700183 FabricNextGroup nextGroup = new FabricNextGroup(nextObjective.type(),
184 portNumbers);
185 flowObjectiveStore.putNextGroup(nextObjective.id(), nextGroup);
186 success(nextObjective);
187 });
188 }
189
190 @Override
191 public List<String> getNextMappings(NextGroup nextGroup) {
Yi Tseng1b154bd2017-11-20 17:48:19 -0800192 FabricNextGroup fabricNextGroup = KRYO.deserialize(nextGroup.data());
193 NextObjective.Type type = fabricNextGroup.type();
194 Collection<PortNumber> outputPorts = fabricNextGroup.outputPorts();
195
196 return outputPorts.stream()
197 .map(port -> String.format("%s -> %s", type, port))
198 .collect(Collectors.toList());
Yi Tseng0b809722017-11-03 10:23:26 -0700199 }
200
201 private void applyTranslationResult(Objective objective,
202 PipelinerTranslationResult result,
Yi Tsengfe13f3e2018-08-19 03:09:54 +0800203 Consumer<ObjectiveError> callback) {
Yi Tseng0b809722017-11-03 10:23:26 -0700204 Collection<GroupDescription> groups = result.groups();
205 Collection<FlowRule> flowRules = result.flowRules();
Yi Tsengf78e1742018-04-08 19:57:17 +0800206
207 Set<FlowId> flowIds = flowRules.stream().map(FlowRule::id).collect(Collectors.toSet());
Yi Tsengfe13f3e2018-08-19 03:09:54 +0800208 Set<PendingGroupKey> pendingGroupKeys = groups.stream().map(GroupDescription::givenGroupId)
209 .map(GroupId::new)
210 .map(gid -> new PendingGroupKey(gid, objective.op()))
211 .collect(Collectors.toSet());
Yi Tsengf78e1742018-04-08 19:57:17 +0800212
213 PendingInstallObjective pio =
Yi Tsengfe13f3e2018-08-19 03:09:54 +0800214 new PendingInstallObjective(objective, flowIds, pendingGroupKeys, callback);
Yi Tsengf78e1742018-04-08 19:57:17 +0800215
216 flowIds.forEach(flowId -> {
Charles Chan68e59732018-08-21 15:56:07 -0700217 PendingFlowKey pfk = new PendingFlowKey(flowId, objective.id());
218 pendingInstallObjectiveFlows.put(pfk, pio);
Yi Tsengf78e1742018-04-08 19:57:17 +0800219 });
220
Charles Chan91ea9722018-08-30 15:56:32 -0700221 pendingGroupKeys.forEach(pendingGroupKey ->
222 pendingInstallObjectiveGroups.put(pendingGroupKey, pio)
223 );
Yi Tsengf78e1742018-04-08 19:57:17 +0800224
225 pendingInstallObjectives.put(objective, pio);
226 installGroups(objective, groups);
227 installFlows(objective, flowRules);
Yi Tseng0b809722017-11-03 10:23:26 -0700228 }
229
Yi Tsengf78e1742018-04-08 19:57:17 +0800230 private void installFlows(Objective objective, Collection<FlowRule> flowRules) {
Yi Tseng0b809722017-11-03 10:23:26 -0700231 if (flowRules.isEmpty()) {
Yi Tsengf78e1742018-04-08 19:57:17 +0800232 return;
Yi Tseng0b809722017-11-03 10:23:26 -0700233 }
Yi Tsengf78e1742018-04-08 19:57:17 +0800234
Charles Chan91ea9722018-08-30 15:56:32 -0700235 FlowRuleOperations ops = buildFlowRuleOps(objective, flowRules);
236 flowRuleService.apply(ops);
Yi Tsengf78e1742018-04-08 19:57:17 +0800237
Charles Chan91ea9722018-08-30 15:56:32 -0700238 flowRules.forEach(flow -> {
239 PendingFlowKey pfk = new PendingFlowKey(flow.id(), objective.id());
240 PendingInstallObjective pio = pendingInstallObjectiveFlows.remove(pfk);
241
242 if (pio != null) {
243 pio.flowInstalled(flow.id());
Yi Tseng0b809722017-11-03 10:23:26 -0700244 }
Charles Chan91ea9722018-08-30 15:56:32 -0700245 });
Yi Tseng0b809722017-11-03 10:23:26 -0700246 }
247
Yi Tsengf78e1742018-04-08 19:57:17 +0800248 private void installGroups(Objective objective, Collection<GroupDescription> groups) {
Yi Tseng0b809722017-11-03 10:23:26 -0700249 if (groups.isEmpty()) {
Yi Tsengf78e1742018-04-08 19:57:17 +0800250 return;
Yi Tseng0b809722017-11-03 10:23:26 -0700251 }
Yi Tseng0b809722017-11-03 10:23:26 -0700252
253 switch (objective.op()) {
254 case ADD:
255 groups.forEach(groupService::addGroup);
256 break;
257 case REMOVE:
258 groups.forEach(group -> groupService.removeGroup(deviceId, group.appCookie(), objective.appId()));
259 break;
Yi Tseng1b154bd2017-11-20 17:48:19 -0800260 case ADD_TO_EXISTING:
Charles Chan91ea9722018-08-30 15:56:32 -0700261 groups.forEach(group -> groupService.addBucketsToGroup(deviceId, group.appCookie(),
262 group.buckets(), group.appCookie(), group.appId())
263 );
Yi Tseng1b154bd2017-11-20 17:48:19 -0800264 break;
265 case REMOVE_FROM_EXISTING:
Charles Chan91ea9722018-08-30 15:56:32 -0700266 groups.forEach(group -> groupService.removeBucketsFromGroup(deviceId, group.appCookie(),
267 group.buckets(), group.appCookie(), group.appId())
268 );
Yi Tseng1b154bd2017-11-20 17:48:19 -0800269 break;
Yi Tseng0b809722017-11-03 10:23:26 -0700270 default:
271 log.warn("Unsupported objective operation {}", objective.op());
Charles Chan91ea9722018-08-30 15:56:32 -0700272 return;
Yi Tseng0b809722017-11-03 10:23:26 -0700273 }
Yi Tseng0b809722017-11-03 10:23:26 -0700274
Charles Chan91ea9722018-08-30 15:56:32 -0700275 groups.forEach(group -> {
276 PendingGroupKey pendingGroupKey = new PendingGroupKey(new GroupId(group.givenGroupId()), objective.op());
277 PendingInstallObjective pio = pendingInstallObjectiveGroups.remove(pendingGroupKey);
278 pio.groupInstalled(pendingGroupKey);
279 });
Yi Tsengfe13f3e2018-08-19 03:09:54 +0800280
Yi Tseng0b809722017-11-03 10:23:26 -0700281 }
282
Charles Chan91ea9722018-08-30 15:56:32 -0700283 private static void fail(Objective objective, ObjectiveError error) {
284 CompletableFuture.runAsync(() -> objective.context().ifPresent(ctx -> ctx.onError(objective, error)),
285 flowObjCallbackExecutor);
286
Yi Tseng0b809722017-11-03 10:23:26 -0700287 }
288
Charles Chan91ea9722018-08-30 15:56:32 -0700289 private static void success(Objective objective) {
290 CompletableFuture.runAsync(() -> objective.context().ifPresent(ctx -> ctx.onSuccess(objective)),
291 flowObjCallbackExecutor);
292 }
293
294 private static FlowRuleOperations buildFlowRuleOps(Objective objective, Collection<FlowRule> flowRules) {
Yi Tseng0b809722017-11-03 10:23:26 -0700295 FlowRuleOperations.Builder ops = FlowRuleOperations.builder();
296 switch (objective.op()) {
297 case ADD:
Yi Tsengfe13f3e2018-08-19 03:09:54 +0800298 case ADD_TO_EXISTING: // For egress VLAN
Yi Tseng0b809722017-11-03 10:23:26 -0700299 flowRules.forEach(ops::add);
300 break;
301 case REMOVE:
Yi Tsengfe13f3e2018-08-19 03:09:54 +0800302 case REMOVE_FROM_EXISTING: // For egress VLAN
Yi Tseng0b809722017-11-03 10:23:26 -0700303 flowRules.forEach(ops::remove);
304 break;
305 default:
Yi Tseng20f9e7b2018-05-24 23:27:39 +0800306 log.warn("Unsupported op {} for {}", objective.op(), objective);
Yi Tseng0b809722017-11-03 10:23:26 -0700307 fail(objective, ObjectiveError.BADPARAMS);
308 return null;
309 }
Charles Chan91ea9722018-08-30 15:56:32 -0700310 return ops.build();
Yi Tseng0b809722017-11-03 10:23:26 -0700311 }
312
313 class FabricNextGroup implements NextGroup {
314 private NextObjective.Type type;
315 private Collection<PortNumber> outputPorts;
316
Charles Chan91ea9722018-08-30 15:56:32 -0700317 FabricNextGroup(NextObjective.Type type, Collection<PortNumber> outputPorts) {
Yi Tseng0b809722017-11-03 10:23:26 -0700318 this.type = type;
319 this.outputPorts = ImmutableList.copyOf(outputPorts);
320 }
321
Charles Chan91ea9722018-08-30 15:56:32 -0700322 NextObjective.Type type() {
Yi Tseng0b809722017-11-03 10:23:26 -0700323 return type;
324 }
325
Charles Chan91ea9722018-08-30 15:56:32 -0700326 Collection<PortNumber> outputPorts() {
Yi Tseng0b809722017-11-03 10:23:26 -0700327 return outputPorts;
328 }
329
330 @Override
331 public byte[] data() {
332 return KRYO.serialize(this);
333 }
334 }
Yi Tseng1b154bd2017-11-20 17:48:19 -0800335
Yi Tsengf78e1742018-04-08 19:57:17 +0800336 class PendingInstallObjective {
337 Objective objective;
338 Collection<FlowId> flowIds;
Yi Tsengfe13f3e2018-08-19 03:09:54 +0800339 Collection<PendingGroupKey> pendingGroupKeys;
340 Consumer<ObjectiveError> callback;
Yi Tsengf78e1742018-04-08 19:57:17 +0800341
Charles Chan91ea9722018-08-30 15:56:32 -0700342 PendingInstallObjective(Objective objective, Collection<FlowId> flowIds,
Yi Tsengfe13f3e2018-08-19 03:09:54 +0800343 Collection<PendingGroupKey> pendingGroupKeys,
344 Consumer<ObjectiveError> callback) {
Yi Tsengf78e1742018-04-08 19:57:17 +0800345 this.objective = objective;
346 this.flowIds = flowIds;
Yi Tsengfe13f3e2018-08-19 03:09:54 +0800347 this.pendingGroupKeys = pendingGroupKeys;
Yi Tsengf78e1742018-04-08 19:57:17 +0800348 this.callback = callback;
349 }
350
351 void flowInstalled(FlowId flowId) {
Charles Chan68e59732018-08-21 15:56:07 -0700352 synchronized (this) {
353 flowIds.remove(flowId);
354 checkIfFinished();
355 }
Yi Tsengf78e1742018-04-08 19:57:17 +0800356 }
357
Yi Tsengfe13f3e2018-08-19 03:09:54 +0800358 void groupInstalled(PendingGroupKey pendingGroupKey) {
Charles Chan68e59732018-08-21 15:56:07 -0700359 synchronized (this) {
360 pendingGroupKeys.remove(pendingGroupKey);
361 checkIfFinished();
362 }
Yi Tsengf78e1742018-04-08 19:57:17 +0800363 }
364
365 private void checkIfFinished() {
Yi Tsengfe13f3e2018-08-19 03:09:54 +0800366 if (flowIds.isEmpty() && pendingGroupKeys.isEmpty()) {
Charles Chan91ea9722018-08-30 15:56:32 -0700367 pendingInstallObjectives.remove(objective);
Yi Tsengfe13f3e2018-08-19 03:09:54 +0800368 callback.accept(null);
Yi Tsengf78e1742018-04-08 19:57:17 +0800369 }
370 }
371
Charles Chan68e59732018-08-21 15:56:07 -0700372 void failed(Objective obj, ObjectiveError error) {
373 flowIds.forEach(flowId -> {
374 PendingFlowKey pfk = new PendingFlowKey(flowId, obj.id());
375 pendingInstallObjectiveFlows.remove(pfk);
376 });
Yi Tsengfe13f3e2018-08-19 03:09:54 +0800377 pendingGroupKeys.forEach(pendingInstallObjectiveGroups::remove);
Charles Chan91ea9722018-08-30 15:56:32 -0700378 pendingInstallObjectives.remove(objective);
Yi Tsengfe13f3e2018-08-19 03:09:54 +0800379 callback.accept(error);
380 }
Charles Chan68e59732018-08-21 15:56:07 -0700381
382 @Override
383 public boolean equals(Object o) {
384 if (this == o) {
385 return true;
386 }
387 if (o == null || getClass() != o.getClass()) {
388 return false;
389 }
390 PendingInstallObjective pio = (PendingInstallObjective) o;
391 return Objects.equal(objective, pio.objective) &&
392 Objects.equal(flowIds, pio.flowIds) &&
393 Objects.equal(pendingGroupKeys, pio.pendingGroupKeys) &&
394 Objects.equal(callback, pio.callback);
395 }
396
397 @Override
398 public int hashCode() {
399 return Objects.hashCode(objective, flowIds, pendingGroupKeys, callback);
400 }
401
402 @Override
403 public String toString() {
404 return MoreObjects.toStringHelper(this)
405 .add("obj", objective)
406 .add("flowIds", flowIds)
407 .add("pendingGroupKeys", pendingGroupKeys)
408 .add("callback", callback)
409 .toString();
410 }
411 }
412
413 class PendingFlowKey {
414 private FlowId flowId;
415 private int objId;
416
417 PendingFlowKey(FlowId flowId, int objId) {
418 this.flowId = flowId;
419 this.objId = objId;
420 }
421
422 @Override
423 public boolean equals(Object o) {
424 if (this == o) {
425 return true;
426 }
427 if (o == null || getClass() != o.getClass()) {
428 return false;
429 }
430 PendingFlowKey pendingFlowKey = (PendingFlowKey) o;
431 return Objects.equal(flowId, pendingFlowKey.flowId) &&
432 objId == pendingFlowKey.objId;
433 }
434
435 @Override
436 public int hashCode() {
437 return Objects.hashCode(flowId, objId);
438 }
439
440 @Override
441 public String toString() {
442 return MoreObjects.toStringHelper(this)
443 .add("flowId", flowId)
444 .add("objId", objId)
445 .toString();
446 }
Yi Tsengfe13f3e2018-08-19 03:09:54 +0800447 }
448
449 class PendingGroupKey {
450 private GroupId groupId;
451 private GroupEvent.Type expectedEventType;
452
Yi Tsengfe13f3e2018-08-19 03:09:54 +0800453 PendingGroupKey(GroupId groupId, NextObjective.Operation objOp) {
454 this.groupId = groupId;
455
456 switch (objOp) {
457 case ADD:
458 expectedEventType = GroupEvent.Type.GROUP_ADDED;
459 break;
460 case REMOVE:
461 expectedEventType = GroupEvent.Type.GROUP_REMOVED;
462 break;
463 case MODIFY:
464 case ADD_TO_EXISTING:
465 case REMOVE_FROM_EXISTING:
466 expectedEventType = GroupEvent.Type.GROUP_UPDATED;
467 break;
468 default:
469 expectedEventType = null;
470 }
471 }
472
473 @Override
474 public boolean equals(Object o) {
475 if (this == o) {
476 return true;
477 }
478 if (o == null || getClass() != o.getClass()) {
479 return false;
480 }
481 PendingGroupKey pendingGroupKey = (PendingGroupKey) o;
482 return Objects.equal(groupId, pendingGroupKey.groupId) &&
483 expectedEventType == pendingGroupKey.expectedEventType;
484 }
485
486 @Override
487 public int hashCode() {
488 return Objects.hashCode(groupId, expectedEventType);
489 }
490
491 @Override
492 public String toString() {
493 return MoreObjects.toStringHelper(this)
494 .add("groupId", groupId)
495 .add("expectedEventType", expectedEventType)
496 .toString();
Yi Tsengf78e1742018-04-08 19:57:17 +0800497 }
498 }
Yi Tseng0b809722017-11-03 10:23:26 -0700499}