blob: 7cd154b61d8489265c9cd817eae1d757193a2a9a [file] [log] [blame]
Yi Tseng0b809722017-11-03 10:23:26 -07001/*
2 * Copyright 2017-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.pipelines.fabric.pipeliner;
18
Yi Tsengfe13f3e2018-08-19 03:09:54 +080019import com.google.common.base.MoreObjects;
20import com.google.common.base.Objects;
Yi Tseng0b809722017-11-03 10:23:26 -070021import com.google.common.collect.ImmutableList;
22import com.google.common.collect.Lists;
Charles Chan91ea9722018-08-30 15:56:32 -070023import com.google.common.collect.Maps;
Yi Tseng0b809722017-11-03 10:23:26 -070024import org.onlab.util.KryoNamespace;
Yi Tsengfe13f3e2018-08-19 03:09:54 +080025import org.onlab.util.Tools;
Yi Tsengf78e1742018-04-08 19:57:17 +080026import org.onosproject.core.GroupId;
Yi Tseng0b809722017-11-03 10:23:26 -070027import org.onosproject.net.DeviceId;
28import org.onosproject.net.PortNumber;
29import org.onosproject.net.behaviour.NextGroup;
30import org.onosproject.net.behaviour.Pipeliner;
31import org.onosproject.net.behaviour.PipelinerContext;
32import org.onosproject.net.driver.AbstractHandlerBehaviour;
Yi Tseng4fd28432018-02-01 14:48:03 -080033import org.onosproject.net.driver.Driver;
Yi Tsengf78e1742018-04-08 19:57:17 +080034import org.onosproject.net.flow.FlowId;
Yi Tseng0b809722017-11-03 10:23:26 -070035import org.onosproject.net.flow.FlowRule;
36import org.onosproject.net.flow.FlowRuleOperations;
Yi Tseng0b809722017-11-03 10:23:26 -070037import org.onosproject.net.flow.FlowRuleService;
38import org.onosproject.net.flow.instructions.Instruction;
39import org.onosproject.net.flow.instructions.Instructions;
40import org.onosproject.net.flowobjective.FilteringObjective;
41import org.onosproject.net.flowobjective.FlowObjectiveStore;
42import org.onosproject.net.flowobjective.ForwardingObjective;
43import org.onosproject.net.flowobjective.NextObjective;
44import org.onosproject.net.flowobjective.Objective;
45import org.onosproject.net.flowobjective.ObjectiveError;
46import org.onosproject.net.group.GroupDescription;
47import org.onosproject.net.group.GroupEvent;
Yi Tseng0b809722017-11-03 10:23:26 -070048import org.onosproject.net.group.GroupService;
49import org.onosproject.store.serializers.KryoNamespaces;
50import org.slf4j.Logger;
51
52import java.util.Collection;
53import java.util.List;
Yi Tseng1b154bd2017-11-20 17:48:19 -080054import java.util.Map;
Yi Tsengf78e1742018-04-08 19:57:17 +080055import java.util.Set;
Yi Tsengfe13f3e2018-08-19 03:09:54 +080056import java.util.concurrent.CompletableFuture;
Yi Tsengf78e1742018-04-08 19:57:17 +080057import java.util.concurrent.ConcurrentHashMap;
Yi Tsengfe13f3e2018-08-19 03:09:54 +080058import java.util.concurrent.ExecutorService;
59import java.util.concurrent.Executors;
Yi Tseng0b809722017-11-03 10:23:26 -070060import java.util.function.Consumer;
Yi Tseng1b154bd2017-11-20 17:48:19 -080061import java.util.stream.Collectors;
Yi Tseng0b809722017-11-03 10:23:26 -070062
63import static org.slf4j.LoggerFactory.getLogger;
64
65/**
66 * Pipeliner for fabric pipeline.
67 */
68public class FabricPipeliner extends AbstractHandlerBehaviour implements Pipeliner {
69 private static final Logger log = getLogger(FabricPipeliner.class);
70
71 protected static final KryoNamespace KRYO = new KryoNamespace.Builder()
72 .register(KryoNamespaces.API)
73 .register(FabricNextGroup.class)
74 .build("FabricPipeliner");
75
Yi Tsengfe13f3e2018-08-19 03:09:54 +080076 private static final int NUM_CALLBACK_THREAD = 2;
Yi Tseng0b809722017-11-03 10:23:26 -070077
78 protected DeviceId deviceId;
79 protected FlowRuleService flowRuleService;
80 protected GroupService groupService;
81 protected FlowObjectiveStore flowObjectiveStore;
Charles Chan91ea9722018-08-30 15:56:32 -070082 FabricFilteringPipeliner pipelinerFilter;
83 FabricForwardingPipeliner pipelinerForward;
84 FabricNextPipeliner pipelinerNext;
Yi Tseng0b809722017-11-03 10:23:26 -070085
Charles Chan68e59732018-08-21 15:56:07 -070086 private Map<PendingFlowKey, PendingInstallObjective> pendingInstallObjectiveFlows = new ConcurrentHashMap<>();
Yi Tsengfe13f3e2018-08-19 03:09:54 +080087 private Map<PendingGroupKey, PendingInstallObjective> pendingInstallObjectiveGroups = new ConcurrentHashMap<>();
Charles Chan91ea9722018-08-30 15:56:32 -070088 private Map<Objective, PendingInstallObjective> pendingInstallObjectives = Maps.newConcurrentMap();
89
Yi Tsengfe13f3e2018-08-19 03:09:54 +080090 private static ExecutorService flowObjCallbackExecutor =
91 Executors.newFixedThreadPool(NUM_CALLBACK_THREAD, Tools.groupedThreads("fabric-pipeliner", "cb-", log));
Yi Tsengf78e1742018-04-08 19:57:17 +080092
Yi Tseng0b809722017-11-03 10:23:26 -070093
94 @Override
95 public void init(DeviceId deviceId, PipelinerContext context) {
Yi Tseng4fd28432018-02-01 14:48:03 -080096 Driver driver = handler().driver();
Yi Tseng0b809722017-11-03 10:23:26 -070097 this.deviceId = deviceId;
98 this.flowRuleService = context.directory().get(FlowRuleService.class);
99 this.groupService = context.directory().get(GroupService.class);
100 this.flowObjectiveStore = context.directory().get(FlowObjectiveStore.class);
101 this.pipelinerFilter = new FabricFilteringPipeliner(deviceId);
102 this.pipelinerForward = new FabricForwardingPipeliner(deviceId);
Yi Tseng4fd28432018-02-01 14:48:03 -0800103 this.pipelinerNext = new FabricNextPipeliner(deviceId, driver);
Yi Tseng0b809722017-11-03 10:23:26 -0700104 }
105
106 @Override
107 public void filter(FilteringObjective filterObjective) {
108 PipelinerTranslationResult result = pipelinerFilter.filter(filterObjective);
109 if (result.error().isPresent()) {
110 fail(filterObjective, result.error().get());
111 return;
112 }
113
Yi Tsengfe13f3e2018-08-19 03:09:54 +0800114 applyTranslationResult(filterObjective, result, error -> {
115 if (error == null) {
Yi Tseng0b809722017-11-03 10:23:26 -0700116 success(filterObjective);
117 } else {
Charles Chan91ea9722018-08-30 15:56:32 -0700118 log.info("Ignore error {}. Let flow subsystem retry", error);
119 success(filterObjective);
Yi Tseng0b809722017-11-03 10:23:26 -0700120 }
121 });
122 }
123
124 @Override
125 public void forward(ForwardingObjective forwardObjective) {
126 PipelinerTranslationResult result = pipelinerForward.forward(forwardObjective);
127 if (result.error().isPresent()) {
128 fail(forwardObjective, result.error().get());
129 return;
130 }
131
Yi Tsengfe13f3e2018-08-19 03:09:54 +0800132 applyTranslationResult(forwardObjective, result, error -> {
133 if (error == null) {
Yi Tseng0b809722017-11-03 10:23:26 -0700134 success(forwardObjective);
135 } else {
Charles Chan91ea9722018-08-30 15:56:32 -0700136 log.info("Ignore error {}. Let flow subsystem retry", error);
137 success(forwardObjective);
Yi Tseng0b809722017-11-03 10:23:26 -0700138 }
139 });
140 }
141
142 @Override
143 public void next(NextObjective nextObjective) {
144 PipelinerTranslationResult result = pipelinerNext.next(nextObjective);
145
146 if (result.error().isPresent()) {
147 fail(nextObjective, result.error().get());
148 return;
149 }
150
Yi Tseng1b154bd2017-11-20 17:48:19 -0800151 if (nextObjective.op() == Objective.Operation.VERIFY) {
152 // TODO: support VERIFY operation
153 log.debug("Currently we don't support VERIFY operation, return success directly to the context");
154 success(nextObjective);
155 return;
156 }
157
Charles Chan91ea9722018-08-30 15:56:32 -0700158 if (nextObjective.op() == Objective.Operation.MODIFY) {
159 // TODO: support MODIFY operation
160 log.debug("Currently we don't support MODIFY operation, return failure directly to the context");
161 fail(nextObjective, ObjectiveError.UNSUPPORTED);
162 return;
163 }
164
Yi Tsengfe13f3e2018-08-19 03:09:54 +0800165 applyTranslationResult(nextObjective, result, error -> {
166 if (error != null) {
Charles Chan91ea9722018-08-30 15:56:32 -0700167 log.info("Ignore error {}. Let flow/group subsystem retry", error);
168 success(nextObjective);
Yi Tseng0b809722017-11-03 10:23:26 -0700169 return;
170 }
171
Yi Tseng15ab4b02018-09-14 11:14:43 -0700172 if (nextObjective.op() == Objective.Operation.REMOVE) {
173 if (flowObjectiveStore.getNextGroup(nextObjective.id()) == null) {
174 log.warn("Can not find next obj {} from store", nextObjective.id());
175 return;
176 }
177 flowObjectiveStore.removeNextGroup(nextObjective.id());
178 } else {
179 // Success, put next group to objective store
180 List<PortNumber> portNumbers = Lists.newArrayList();
181 nextObjective.next().forEach(treatment ->
182 treatment.allInstructions()
183 .stream()
184 .filter(inst -> inst.type() == Instruction.Type.OUTPUT)
185 .map(inst -> (Instructions.OutputInstruction) inst)
186 .findFirst()
187 .map(Instructions.OutputInstruction::port)
188 .ifPresent(portNumbers::add)
189 );
190 FabricNextGroup nextGroup = new FabricNextGroup(nextObjective.type(),
191 portNumbers);
192 flowObjectiveStore.putNextGroup(nextObjective.id(), nextGroup);
193 }
194
Yi Tseng0b809722017-11-03 10:23:26 -0700195 success(nextObjective);
196 });
197 }
198
199 @Override
200 public List<String> getNextMappings(NextGroup nextGroup) {
Yi Tseng1b154bd2017-11-20 17:48:19 -0800201 FabricNextGroup fabricNextGroup = KRYO.deserialize(nextGroup.data());
202 NextObjective.Type type = fabricNextGroup.type();
203 Collection<PortNumber> outputPorts = fabricNextGroup.outputPorts();
204
205 return outputPorts.stream()
206 .map(port -> String.format("%s -> %s", type, port))
207 .collect(Collectors.toList());
Yi Tseng0b809722017-11-03 10:23:26 -0700208 }
209
210 private void applyTranslationResult(Objective objective,
211 PipelinerTranslationResult result,
Yi Tsengfe13f3e2018-08-19 03:09:54 +0800212 Consumer<ObjectiveError> callback) {
Yi Tseng0b809722017-11-03 10:23:26 -0700213 Collection<GroupDescription> groups = result.groups();
214 Collection<FlowRule> flowRules = result.flowRules();
Yi Tsengf78e1742018-04-08 19:57:17 +0800215
216 Set<FlowId> flowIds = flowRules.stream().map(FlowRule::id).collect(Collectors.toSet());
Yi Tsengfe13f3e2018-08-19 03:09:54 +0800217 Set<PendingGroupKey> pendingGroupKeys = groups.stream().map(GroupDescription::givenGroupId)
218 .map(GroupId::new)
219 .map(gid -> new PendingGroupKey(gid, objective.op()))
220 .collect(Collectors.toSet());
Yi Tsengf78e1742018-04-08 19:57:17 +0800221
222 PendingInstallObjective pio =
Yi Tsengfe13f3e2018-08-19 03:09:54 +0800223 new PendingInstallObjective(objective, flowIds, pendingGroupKeys, callback);
Yi Tsengf78e1742018-04-08 19:57:17 +0800224
225 flowIds.forEach(flowId -> {
Charles Chan68e59732018-08-21 15:56:07 -0700226 PendingFlowKey pfk = new PendingFlowKey(flowId, objective.id());
227 pendingInstallObjectiveFlows.put(pfk, pio);
Yi Tsengf78e1742018-04-08 19:57:17 +0800228 });
229
Charles Chan91ea9722018-08-30 15:56:32 -0700230 pendingGroupKeys.forEach(pendingGroupKey ->
231 pendingInstallObjectiveGroups.put(pendingGroupKey, pio)
232 );
Yi Tsengf78e1742018-04-08 19:57:17 +0800233
234 pendingInstallObjectives.put(objective, pio);
235 installGroups(objective, groups);
236 installFlows(objective, flowRules);
Yi Tseng0b809722017-11-03 10:23:26 -0700237 }
238
Yi Tsengf78e1742018-04-08 19:57:17 +0800239 private void installFlows(Objective objective, Collection<FlowRule> flowRules) {
Yi Tseng0b809722017-11-03 10:23:26 -0700240 if (flowRules.isEmpty()) {
Yi Tsengf78e1742018-04-08 19:57:17 +0800241 return;
Yi Tseng0b809722017-11-03 10:23:26 -0700242 }
Yi Tsengf78e1742018-04-08 19:57:17 +0800243
Charles Chan91ea9722018-08-30 15:56:32 -0700244 FlowRuleOperations ops = buildFlowRuleOps(objective, flowRules);
245 flowRuleService.apply(ops);
Yi Tsengf78e1742018-04-08 19:57:17 +0800246
Charles Chan91ea9722018-08-30 15:56:32 -0700247 flowRules.forEach(flow -> {
248 PendingFlowKey pfk = new PendingFlowKey(flow.id(), objective.id());
249 PendingInstallObjective pio = pendingInstallObjectiveFlows.remove(pfk);
250
251 if (pio != null) {
252 pio.flowInstalled(flow.id());
Yi Tseng0b809722017-11-03 10:23:26 -0700253 }
Charles Chan91ea9722018-08-30 15:56:32 -0700254 });
Yi Tseng0b809722017-11-03 10:23:26 -0700255 }
256
Yi Tsengf78e1742018-04-08 19:57:17 +0800257 private void installGroups(Objective objective, Collection<GroupDescription> groups) {
Yi Tseng0b809722017-11-03 10:23:26 -0700258 if (groups.isEmpty()) {
Yi Tsengf78e1742018-04-08 19:57:17 +0800259 return;
Yi Tseng0b809722017-11-03 10:23:26 -0700260 }
Yi Tseng0b809722017-11-03 10:23:26 -0700261
262 switch (objective.op()) {
263 case ADD:
264 groups.forEach(groupService::addGroup);
265 break;
266 case REMOVE:
267 groups.forEach(group -> groupService.removeGroup(deviceId, group.appCookie(), objective.appId()));
268 break;
Yi Tseng1b154bd2017-11-20 17:48:19 -0800269 case ADD_TO_EXISTING:
Charles Chan91ea9722018-08-30 15:56:32 -0700270 groups.forEach(group -> groupService.addBucketsToGroup(deviceId, group.appCookie(),
271 group.buckets(), group.appCookie(), group.appId())
272 );
Yi Tseng1b154bd2017-11-20 17:48:19 -0800273 break;
274 case REMOVE_FROM_EXISTING:
Charles Chan91ea9722018-08-30 15:56:32 -0700275 groups.forEach(group -> groupService.removeBucketsFromGroup(deviceId, group.appCookie(),
276 group.buckets(), group.appCookie(), group.appId())
277 );
Yi Tseng1b154bd2017-11-20 17:48:19 -0800278 break;
Yi Tseng0b809722017-11-03 10:23:26 -0700279 default:
280 log.warn("Unsupported objective operation {}", objective.op());
Charles Chan91ea9722018-08-30 15:56:32 -0700281 return;
Yi Tseng0b809722017-11-03 10:23:26 -0700282 }
Yi Tseng0b809722017-11-03 10:23:26 -0700283
Charles Chan91ea9722018-08-30 15:56:32 -0700284 groups.forEach(group -> {
285 PendingGroupKey pendingGroupKey = new PendingGroupKey(new GroupId(group.givenGroupId()), objective.op());
286 PendingInstallObjective pio = pendingInstallObjectiveGroups.remove(pendingGroupKey);
287 pio.groupInstalled(pendingGroupKey);
288 });
Yi Tsengfe13f3e2018-08-19 03:09:54 +0800289
Yi Tseng0b809722017-11-03 10:23:26 -0700290 }
291
Charles Chan91ea9722018-08-30 15:56:32 -0700292 private static void fail(Objective objective, ObjectiveError error) {
293 CompletableFuture.runAsync(() -> objective.context().ifPresent(ctx -> ctx.onError(objective, error)),
294 flowObjCallbackExecutor);
295
Yi Tseng0b809722017-11-03 10:23:26 -0700296 }
297
Charles Chan91ea9722018-08-30 15:56:32 -0700298 private static void success(Objective objective) {
299 CompletableFuture.runAsync(() -> objective.context().ifPresent(ctx -> ctx.onSuccess(objective)),
300 flowObjCallbackExecutor);
301 }
302
303 private static FlowRuleOperations buildFlowRuleOps(Objective objective, Collection<FlowRule> flowRules) {
Yi Tseng0b809722017-11-03 10:23:26 -0700304 FlowRuleOperations.Builder ops = FlowRuleOperations.builder();
305 switch (objective.op()) {
306 case ADD:
Yi Tsengfe13f3e2018-08-19 03:09:54 +0800307 case ADD_TO_EXISTING: // For egress VLAN
Yi Tseng0b809722017-11-03 10:23:26 -0700308 flowRules.forEach(ops::add);
309 break;
310 case REMOVE:
Yi Tsengfe13f3e2018-08-19 03:09:54 +0800311 case REMOVE_FROM_EXISTING: // For egress VLAN
Yi Tseng0b809722017-11-03 10:23:26 -0700312 flowRules.forEach(ops::remove);
313 break;
314 default:
Yi Tseng20f9e7b2018-05-24 23:27:39 +0800315 log.warn("Unsupported op {} for {}", objective.op(), objective);
Yi Tseng0b809722017-11-03 10:23:26 -0700316 fail(objective, ObjectiveError.BADPARAMS);
317 return null;
318 }
Charles Chan91ea9722018-08-30 15:56:32 -0700319 return ops.build();
Yi Tseng0b809722017-11-03 10:23:26 -0700320 }
321
322 class FabricNextGroup implements NextGroup {
323 private NextObjective.Type type;
324 private Collection<PortNumber> outputPorts;
325
Charles Chan91ea9722018-08-30 15:56:32 -0700326 FabricNextGroup(NextObjective.Type type, Collection<PortNumber> outputPorts) {
Yi Tseng0b809722017-11-03 10:23:26 -0700327 this.type = type;
328 this.outputPorts = ImmutableList.copyOf(outputPorts);
329 }
330
Charles Chan91ea9722018-08-30 15:56:32 -0700331 NextObjective.Type type() {
Yi Tseng0b809722017-11-03 10:23:26 -0700332 return type;
333 }
334
Charles Chan91ea9722018-08-30 15:56:32 -0700335 Collection<PortNumber> outputPorts() {
Yi Tseng0b809722017-11-03 10:23:26 -0700336 return outputPorts;
337 }
338
339 @Override
340 public byte[] data() {
341 return KRYO.serialize(this);
342 }
343 }
Yi Tseng1b154bd2017-11-20 17:48:19 -0800344
Yi Tsengf78e1742018-04-08 19:57:17 +0800345 class PendingInstallObjective {
346 Objective objective;
347 Collection<FlowId> flowIds;
Yi Tsengfe13f3e2018-08-19 03:09:54 +0800348 Collection<PendingGroupKey> pendingGroupKeys;
349 Consumer<ObjectiveError> callback;
Yi Tsengf78e1742018-04-08 19:57:17 +0800350
Charles Chan91ea9722018-08-30 15:56:32 -0700351 PendingInstallObjective(Objective objective, Collection<FlowId> flowIds,
Yi Tsengfe13f3e2018-08-19 03:09:54 +0800352 Collection<PendingGroupKey> pendingGroupKeys,
353 Consumer<ObjectiveError> callback) {
Yi Tsengf78e1742018-04-08 19:57:17 +0800354 this.objective = objective;
355 this.flowIds = flowIds;
Yi Tsengfe13f3e2018-08-19 03:09:54 +0800356 this.pendingGroupKeys = pendingGroupKeys;
Yi Tsengf78e1742018-04-08 19:57:17 +0800357 this.callback = callback;
358 }
359
360 void flowInstalled(FlowId flowId) {
Charles Chan68e59732018-08-21 15:56:07 -0700361 synchronized (this) {
362 flowIds.remove(flowId);
363 checkIfFinished();
364 }
Yi Tsengf78e1742018-04-08 19:57:17 +0800365 }
366
Yi Tsengfe13f3e2018-08-19 03:09:54 +0800367 void groupInstalled(PendingGroupKey pendingGroupKey) {
Charles Chan68e59732018-08-21 15:56:07 -0700368 synchronized (this) {
369 pendingGroupKeys.remove(pendingGroupKey);
370 checkIfFinished();
371 }
Yi Tsengf78e1742018-04-08 19:57:17 +0800372 }
373
374 private void checkIfFinished() {
Yi Tsengfe13f3e2018-08-19 03:09:54 +0800375 if (flowIds.isEmpty() && pendingGroupKeys.isEmpty()) {
Charles Chan91ea9722018-08-30 15:56:32 -0700376 pendingInstallObjectives.remove(objective);
Yi Tsengfe13f3e2018-08-19 03:09:54 +0800377 callback.accept(null);
Yi Tsengf78e1742018-04-08 19:57:17 +0800378 }
379 }
380
Charles Chan68e59732018-08-21 15:56:07 -0700381 void failed(Objective obj, ObjectiveError error) {
382 flowIds.forEach(flowId -> {
383 PendingFlowKey pfk = new PendingFlowKey(flowId, obj.id());
384 pendingInstallObjectiveFlows.remove(pfk);
385 });
Yi Tsengfe13f3e2018-08-19 03:09:54 +0800386 pendingGroupKeys.forEach(pendingInstallObjectiveGroups::remove);
Charles Chan91ea9722018-08-30 15:56:32 -0700387 pendingInstallObjectives.remove(objective);
Yi Tsengfe13f3e2018-08-19 03:09:54 +0800388 callback.accept(error);
389 }
Charles Chan68e59732018-08-21 15:56:07 -0700390
391 @Override
392 public boolean equals(Object o) {
393 if (this == o) {
394 return true;
395 }
396 if (o == null || getClass() != o.getClass()) {
397 return false;
398 }
399 PendingInstallObjective pio = (PendingInstallObjective) o;
400 return Objects.equal(objective, pio.objective) &&
401 Objects.equal(flowIds, pio.flowIds) &&
402 Objects.equal(pendingGroupKeys, pio.pendingGroupKeys) &&
403 Objects.equal(callback, pio.callback);
404 }
405
406 @Override
407 public int hashCode() {
408 return Objects.hashCode(objective, flowIds, pendingGroupKeys, callback);
409 }
410
411 @Override
412 public String toString() {
413 return MoreObjects.toStringHelper(this)
414 .add("obj", objective)
415 .add("flowIds", flowIds)
416 .add("pendingGroupKeys", pendingGroupKeys)
417 .add("callback", callback)
418 .toString();
419 }
420 }
421
422 class PendingFlowKey {
423 private FlowId flowId;
424 private int objId;
425
426 PendingFlowKey(FlowId flowId, int objId) {
427 this.flowId = flowId;
428 this.objId = objId;
429 }
430
431 @Override
432 public boolean equals(Object o) {
433 if (this == o) {
434 return true;
435 }
436 if (o == null || getClass() != o.getClass()) {
437 return false;
438 }
439 PendingFlowKey pendingFlowKey = (PendingFlowKey) o;
440 return Objects.equal(flowId, pendingFlowKey.flowId) &&
441 objId == pendingFlowKey.objId;
442 }
443
444 @Override
445 public int hashCode() {
446 return Objects.hashCode(flowId, objId);
447 }
448
449 @Override
450 public String toString() {
451 return MoreObjects.toStringHelper(this)
452 .add("flowId", flowId)
453 .add("objId", objId)
454 .toString();
455 }
Yi Tsengfe13f3e2018-08-19 03:09:54 +0800456 }
457
458 class PendingGroupKey {
459 private GroupId groupId;
460 private GroupEvent.Type expectedEventType;
461
Yi Tsengfe13f3e2018-08-19 03:09:54 +0800462 PendingGroupKey(GroupId groupId, NextObjective.Operation objOp) {
463 this.groupId = groupId;
464
465 switch (objOp) {
466 case ADD:
467 expectedEventType = GroupEvent.Type.GROUP_ADDED;
468 break;
469 case REMOVE:
470 expectedEventType = GroupEvent.Type.GROUP_REMOVED;
471 break;
472 case MODIFY:
473 case ADD_TO_EXISTING:
474 case REMOVE_FROM_EXISTING:
475 expectedEventType = GroupEvent.Type.GROUP_UPDATED;
476 break;
477 default:
478 expectedEventType = null;
479 }
480 }
481
482 @Override
483 public boolean equals(Object o) {
484 if (this == o) {
485 return true;
486 }
487 if (o == null || getClass() != o.getClass()) {
488 return false;
489 }
490 PendingGroupKey pendingGroupKey = (PendingGroupKey) o;
491 return Objects.equal(groupId, pendingGroupKey.groupId) &&
492 expectedEventType == pendingGroupKey.expectedEventType;
493 }
494
495 @Override
496 public int hashCode() {
497 return Objects.hashCode(groupId, expectedEventType);
498 }
499
500 @Override
501 public String toString() {
502 return MoreObjects.toStringHelper(this)
503 .add("groupId", groupId)
504 .add("expectedEventType", expectedEventType)
505 .toString();
Yi Tsengf78e1742018-04-08 19:57:17 +0800506 }
507 }
Yi Tseng0b809722017-11-03 10:23:26 -0700508}