blob: 7b3006de386ee45c63fbf215ab99a358e31eb8bf [file] [log] [blame]
Yi Tseng0b809722017-11-03 10:23:26 -07001/*
2 * Copyright 2017-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.pipelines.fabric.pipeliner;
18
Yi Tsengfe13f3e2018-08-19 03:09:54 +080019import com.google.common.base.MoreObjects;
20import com.google.common.base.Objects;
Yi Tseng0b809722017-11-03 10:23:26 -070021import com.google.common.collect.ImmutableList;
22import com.google.common.collect.Lists;
Charles Chan91ea9722018-08-30 15:56:32 -070023import com.google.common.collect.Maps;
Yi Tseng0b809722017-11-03 10:23:26 -070024import org.onlab.util.KryoNamespace;
Yi Tsengfe13f3e2018-08-19 03:09:54 +080025import org.onlab.util.Tools;
Yi Tsengf78e1742018-04-08 19:57:17 +080026import org.onosproject.core.GroupId;
Yi Tseng0b809722017-11-03 10:23:26 -070027import org.onosproject.net.DeviceId;
28import org.onosproject.net.PortNumber;
29import org.onosproject.net.behaviour.NextGroup;
30import org.onosproject.net.behaviour.Pipeliner;
31import org.onosproject.net.behaviour.PipelinerContext;
32import org.onosproject.net.driver.AbstractHandlerBehaviour;
Yi Tseng4fd28432018-02-01 14:48:03 -080033import org.onosproject.net.driver.Driver;
Yi Tsengf78e1742018-04-08 19:57:17 +080034import org.onosproject.net.flow.FlowId;
Yi Tseng0b809722017-11-03 10:23:26 -070035import org.onosproject.net.flow.FlowRule;
36import org.onosproject.net.flow.FlowRuleOperations;
Yi Tseng0b809722017-11-03 10:23:26 -070037import org.onosproject.net.flow.FlowRuleService;
38import org.onosproject.net.flow.instructions.Instruction;
39import org.onosproject.net.flow.instructions.Instructions;
40import org.onosproject.net.flowobjective.FilteringObjective;
41import org.onosproject.net.flowobjective.FlowObjectiveStore;
42import org.onosproject.net.flowobjective.ForwardingObjective;
43import org.onosproject.net.flowobjective.NextObjective;
44import org.onosproject.net.flowobjective.Objective;
45import org.onosproject.net.flowobjective.ObjectiveError;
46import org.onosproject.net.group.GroupDescription;
47import org.onosproject.net.group.GroupEvent;
Yi Tseng0b809722017-11-03 10:23:26 -070048import org.onosproject.net.group.GroupService;
49import org.onosproject.store.serializers.KryoNamespaces;
50import org.slf4j.Logger;
51
52import java.util.Collection;
53import java.util.List;
Yi Tseng1b154bd2017-11-20 17:48:19 -080054import java.util.Map;
Yi Tsengf78e1742018-04-08 19:57:17 +080055import java.util.Set;
Yi Tsengfe13f3e2018-08-19 03:09:54 +080056import java.util.concurrent.CompletableFuture;
Yi Tsengf78e1742018-04-08 19:57:17 +080057import java.util.concurrent.ConcurrentHashMap;
Yi Tsengfe13f3e2018-08-19 03:09:54 +080058import java.util.concurrent.ExecutorService;
59import java.util.concurrent.Executors;
Yi Tseng0b809722017-11-03 10:23:26 -070060import java.util.function.Consumer;
Yi Tseng1b154bd2017-11-20 17:48:19 -080061import java.util.stream.Collectors;
Yi Tseng0b809722017-11-03 10:23:26 -070062
63import static org.slf4j.LoggerFactory.getLogger;
64
65/**
66 * Pipeliner for fabric pipeline.
67 */
68public class FabricPipeliner extends AbstractHandlerBehaviour implements Pipeliner {
69 private static final Logger log = getLogger(FabricPipeliner.class);
70
71 protected static final KryoNamespace KRYO = new KryoNamespace.Builder()
72 .register(KryoNamespaces.API)
73 .register(FabricNextGroup.class)
74 .build("FabricPipeliner");
75
Yi Tsengfe13f3e2018-08-19 03:09:54 +080076 private static final int NUM_CALLBACK_THREAD = 2;
Yi Tseng0b809722017-11-03 10:23:26 -070077
78 protected DeviceId deviceId;
79 protected FlowRuleService flowRuleService;
80 protected GroupService groupService;
81 protected FlowObjectiveStore flowObjectiveStore;
Charles Chan91ea9722018-08-30 15:56:32 -070082 FabricFilteringPipeliner pipelinerFilter;
83 FabricForwardingPipeliner pipelinerForward;
84 FabricNextPipeliner pipelinerNext;
Yi Tseng0b809722017-11-03 10:23:26 -070085
Charles Chan68e59732018-08-21 15:56:07 -070086 private Map<PendingFlowKey, PendingInstallObjective> pendingInstallObjectiveFlows = new ConcurrentHashMap<>();
Yi Tsengfe13f3e2018-08-19 03:09:54 +080087 private Map<PendingGroupKey, PendingInstallObjective> pendingInstallObjectiveGroups = new ConcurrentHashMap<>();
Charles Chan91ea9722018-08-30 15:56:32 -070088 private Map<Objective, PendingInstallObjective> pendingInstallObjectives = Maps.newConcurrentMap();
89
Yi Tsengfe13f3e2018-08-19 03:09:54 +080090 private static ExecutorService flowObjCallbackExecutor =
91 Executors.newFixedThreadPool(NUM_CALLBACK_THREAD, Tools.groupedThreads("fabric-pipeliner", "cb-", log));
Yi Tsengf78e1742018-04-08 19:57:17 +080092
Yi Tseng0b809722017-11-03 10:23:26 -070093
94 @Override
95 public void init(DeviceId deviceId, PipelinerContext context) {
Yi Tseng4fd28432018-02-01 14:48:03 -080096 Driver driver = handler().driver();
Yi Tseng0b809722017-11-03 10:23:26 -070097 this.deviceId = deviceId;
98 this.flowRuleService = context.directory().get(FlowRuleService.class);
99 this.groupService = context.directory().get(GroupService.class);
100 this.flowObjectiveStore = context.directory().get(FlowObjectiveStore.class);
101 this.pipelinerFilter = new FabricFilteringPipeliner(deviceId);
102 this.pipelinerForward = new FabricForwardingPipeliner(deviceId);
Yi Tseng4fd28432018-02-01 14:48:03 -0800103 this.pipelinerNext = new FabricNextPipeliner(deviceId, driver);
Yi Tseng0b809722017-11-03 10:23:26 -0700104 }
105
106 @Override
107 public void filter(FilteringObjective filterObjective) {
108 PipelinerTranslationResult result = pipelinerFilter.filter(filterObjective);
109 if (result.error().isPresent()) {
110 fail(filterObjective, result.error().get());
111 return;
112 }
113
Yi Tsengfe13f3e2018-08-19 03:09:54 +0800114 applyTranslationResult(filterObjective, result, error -> {
115 if (error == null) {
Yi Tseng0b809722017-11-03 10:23:26 -0700116 success(filterObjective);
117 } else {
Charles Chan91ea9722018-08-30 15:56:32 -0700118 log.info("Ignore error {}. Let flow subsystem retry", error);
119 success(filterObjective);
Yi Tseng0b809722017-11-03 10:23:26 -0700120 }
121 });
122 }
123
124 @Override
125 public void forward(ForwardingObjective forwardObjective) {
126 PipelinerTranslationResult result = pipelinerForward.forward(forwardObjective);
127 if (result.error().isPresent()) {
128 fail(forwardObjective, result.error().get());
129 return;
130 }
131
Yi Tsengfe13f3e2018-08-19 03:09:54 +0800132 applyTranslationResult(forwardObjective, result, error -> {
133 if (error == null) {
Yi Tseng0b809722017-11-03 10:23:26 -0700134 success(forwardObjective);
135 } else {
Charles Chan91ea9722018-08-30 15:56:32 -0700136 log.info("Ignore error {}. Let flow subsystem retry", error);
137 success(forwardObjective);
Yi Tseng0b809722017-11-03 10:23:26 -0700138 }
139 });
140 }
141
142 @Override
143 public void next(NextObjective nextObjective) {
144 PipelinerTranslationResult result = pipelinerNext.next(nextObjective);
145
146 if (result.error().isPresent()) {
147 fail(nextObjective, result.error().get());
148 return;
149 }
150
Yi Tseng1b154bd2017-11-20 17:48:19 -0800151 if (nextObjective.op() == Objective.Operation.VERIFY) {
152 // TODO: support VERIFY operation
153 log.debug("Currently we don't support VERIFY operation, return success directly to the context");
154 success(nextObjective);
155 return;
156 }
157
Charles Chan91ea9722018-08-30 15:56:32 -0700158 if (nextObjective.op() == Objective.Operation.MODIFY) {
159 // TODO: support MODIFY operation
160 log.debug("Currently we don't support MODIFY operation, return failure directly to the context");
161 fail(nextObjective, ObjectiveError.UNSUPPORTED);
162 return;
163 }
164
Yi Tsengfe13f3e2018-08-19 03:09:54 +0800165 applyTranslationResult(nextObjective, result, error -> {
166 if (error != null) {
Charles Chan91ea9722018-08-30 15:56:32 -0700167 log.info("Ignore error {}. Let flow/group subsystem retry", error);
168 success(nextObjective);
Yi Tseng0b809722017-11-03 10:23:26 -0700169 return;
170 }
171
Yi Tseng15ab4b02018-09-14 11:14:43 -0700172 if (nextObjective.op() == Objective.Operation.REMOVE) {
173 if (flowObjectiveStore.getNextGroup(nextObjective.id()) == null) {
174 log.warn("Can not find next obj {} from store", nextObjective.id());
175 return;
176 }
177 flowObjectiveStore.removeNextGroup(nextObjective.id());
178 } else {
179 // Success, put next group to objective store
180 List<PortNumber> portNumbers = Lists.newArrayList();
181 nextObjective.next().forEach(treatment ->
182 treatment.allInstructions()
183 .stream()
184 .filter(inst -> inst.type() == Instruction.Type.OUTPUT)
185 .map(inst -> (Instructions.OutputInstruction) inst)
186 .findFirst()
187 .map(Instructions.OutputInstruction::port)
188 .ifPresent(portNumbers::add)
189 );
190 FabricNextGroup nextGroup = new FabricNextGroup(nextObjective.type(),
191 portNumbers);
192 flowObjectiveStore.putNextGroup(nextObjective.id(), nextGroup);
193 }
194
Yi Tseng0b809722017-11-03 10:23:26 -0700195 success(nextObjective);
196 });
197 }
198
199 @Override
200 public List<String> getNextMappings(NextGroup nextGroup) {
Yi Tseng1b154bd2017-11-20 17:48:19 -0800201 FabricNextGroup fabricNextGroup = KRYO.deserialize(nextGroup.data());
202 NextObjective.Type type = fabricNextGroup.type();
203 Collection<PortNumber> outputPorts = fabricNextGroup.outputPorts();
204
205 return outputPorts.stream()
206 .map(port -> String.format("%s -> %s", type, port))
207 .collect(Collectors.toList());
Yi Tseng0b809722017-11-03 10:23:26 -0700208 }
209
210 private void applyTranslationResult(Objective objective,
211 PipelinerTranslationResult result,
Yi Tsengfe13f3e2018-08-19 03:09:54 +0800212 Consumer<ObjectiveError> callback) {
Yi Tseng0b809722017-11-03 10:23:26 -0700213 Collection<GroupDescription> groups = result.groups();
214 Collection<FlowRule> flowRules = result.flowRules();
Yi Tsengf78e1742018-04-08 19:57:17 +0800215
216 Set<FlowId> flowIds = flowRules.stream().map(FlowRule::id).collect(Collectors.toSet());
Yi Tsengfe13f3e2018-08-19 03:09:54 +0800217 Set<PendingGroupKey> pendingGroupKeys = groups.stream().map(GroupDescription::givenGroupId)
218 .map(GroupId::new)
219 .map(gid -> new PendingGroupKey(gid, objective.op()))
220 .collect(Collectors.toSet());
Yi Tsengf78e1742018-04-08 19:57:17 +0800221
222 PendingInstallObjective pio =
Yi Tsengfe13f3e2018-08-19 03:09:54 +0800223 new PendingInstallObjective(objective, flowIds, pendingGroupKeys, callback);
Yi Tsengf78e1742018-04-08 19:57:17 +0800224
225 flowIds.forEach(flowId -> {
Charles Chan68e59732018-08-21 15:56:07 -0700226 PendingFlowKey pfk = new PendingFlowKey(flowId, objective.id());
227 pendingInstallObjectiveFlows.put(pfk, pio);
Yi Tsengf78e1742018-04-08 19:57:17 +0800228 });
229
Charles Chan91ea9722018-08-30 15:56:32 -0700230 pendingGroupKeys.forEach(pendingGroupKey ->
231 pendingInstallObjectiveGroups.put(pendingGroupKey, pio)
232 );
Yi Tsengf78e1742018-04-08 19:57:17 +0800233
234 pendingInstallObjectives.put(objective, pio);
235 installGroups(objective, groups);
236 installFlows(objective, flowRules);
Yi Tseng0b809722017-11-03 10:23:26 -0700237 }
238
Yi Tsengf78e1742018-04-08 19:57:17 +0800239 private void installFlows(Objective objective, Collection<FlowRule> flowRules) {
Yi Tseng0b809722017-11-03 10:23:26 -0700240 if (flowRules.isEmpty()) {
Yi Tsengf78e1742018-04-08 19:57:17 +0800241 return;
Yi Tseng0b809722017-11-03 10:23:26 -0700242 }
Yi Tsengf78e1742018-04-08 19:57:17 +0800243
Charles Chan91ea9722018-08-30 15:56:32 -0700244 FlowRuleOperations ops = buildFlowRuleOps(objective, flowRules);
wu914ed232018-10-23 11:19:53 +0800245 if (ops == null) {
246 return;
247 }
Charles Chan91ea9722018-08-30 15:56:32 -0700248 flowRuleService.apply(ops);
Yi Tsengf78e1742018-04-08 19:57:17 +0800249
Charles Chan91ea9722018-08-30 15:56:32 -0700250 flowRules.forEach(flow -> {
251 PendingFlowKey pfk = new PendingFlowKey(flow.id(), objective.id());
252 PendingInstallObjective pio = pendingInstallObjectiveFlows.remove(pfk);
253
254 if (pio != null) {
255 pio.flowInstalled(flow.id());
Yi Tseng0b809722017-11-03 10:23:26 -0700256 }
Charles Chan91ea9722018-08-30 15:56:32 -0700257 });
Yi Tseng0b809722017-11-03 10:23:26 -0700258 }
259
Yi Tsengf78e1742018-04-08 19:57:17 +0800260 private void installGroups(Objective objective, Collection<GroupDescription> groups) {
Yi Tseng0b809722017-11-03 10:23:26 -0700261 if (groups.isEmpty()) {
Yi Tsengf78e1742018-04-08 19:57:17 +0800262 return;
Yi Tseng0b809722017-11-03 10:23:26 -0700263 }
Yi Tseng0b809722017-11-03 10:23:26 -0700264
265 switch (objective.op()) {
266 case ADD:
267 groups.forEach(groupService::addGroup);
268 break;
269 case REMOVE:
270 groups.forEach(group -> groupService.removeGroup(deviceId, group.appCookie(), objective.appId()));
271 break;
Yi Tseng1b154bd2017-11-20 17:48:19 -0800272 case ADD_TO_EXISTING:
Charles Chan91ea9722018-08-30 15:56:32 -0700273 groups.forEach(group -> groupService.addBucketsToGroup(deviceId, group.appCookie(),
274 group.buckets(), group.appCookie(), group.appId())
275 );
Yi Tseng1b154bd2017-11-20 17:48:19 -0800276 break;
277 case REMOVE_FROM_EXISTING:
Charles Chan91ea9722018-08-30 15:56:32 -0700278 groups.forEach(group -> groupService.removeBucketsFromGroup(deviceId, group.appCookie(),
279 group.buckets(), group.appCookie(), group.appId())
280 );
Yi Tseng1b154bd2017-11-20 17:48:19 -0800281 break;
Yi Tseng0b809722017-11-03 10:23:26 -0700282 default:
283 log.warn("Unsupported objective operation {}", objective.op());
Charles Chan91ea9722018-08-30 15:56:32 -0700284 return;
Yi Tseng0b809722017-11-03 10:23:26 -0700285 }
Yi Tseng0b809722017-11-03 10:23:26 -0700286
Charles Chan91ea9722018-08-30 15:56:32 -0700287 groups.forEach(group -> {
288 PendingGroupKey pendingGroupKey = new PendingGroupKey(new GroupId(group.givenGroupId()), objective.op());
289 PendingInstallObjective pio = pendingInstallObjectiveGroups.remove(pendingGroupKey);
290 pio.groupInstalled(pendingGroupKey);
291 });
Yi Tsengfe13f3e2018-08-19 03:09:54 +0800292
Yi Tseng0b809722017-11-03 10:23:26 -0700293 }
294
Charles Chan91ea9722018-08-30 15:56:32 -0700295 private static void fail(Objective objective, ObjectiveError error) {
296 CompletableFuture.runAsync(() -> objective.context().ifPresent(ctx -> ctx.onError(objective, error)),
297 flowObjCallbackExecutor);
298
Yi Tseng0b809722017-11-03 10:23:26 -0700299 }
300
Charles Chan91ea9722018-08-30 15:56:32 -0700301 private static void success(Objective objective) {
302 CompletableFuture.runAsync(() -> objective.context().ifPresent(ctx -> ctx.onSuccess(objective)),
303 flowObjCallbackExecutor);
304 }
305
306 private static FlowRuleOperations buildFlowRuleOps(Objective objective, Collection<FlowRule> flowRules) {
Yi Tseng0b809722017-11-03 10:23:26 -0700307 FlowRuleOperations.Builder ops = FlowRuleOperations.builder();
308 switch (objective.op()) {
309 case ADD:
Yi Tsengfe13f3e2018-08-19 03:09:54 +0800310 case ADD_TO_EXISTING: // For egress VLAN
Yi Tseng0b809722017-11-03 10:23:26 -0700311 flowRules.forEach(ops::add);
312 break;
313 case REMOVE:
Yi Tsengfe13f3e2018-08-19 03:09:54 +0800314 case REMOVE_FROM_EXISTING: // For egress VLAN
Yi Tseng0b809722017-11-03 10:23:26 -0700315 flowRules.forEach(ops::remove);
316 break;
317 default:
Yi Tseng20f9e7b2018-05-24 23:27:39 +0800318 log.warn("Unsupported op {} for {}", objective.op(), objective);
Yi Tseng0b809722017-11-03 10:23:26 -0700319 fail(objective, ObjectiveError.BADPARAMS);
320 return null;
321 }
Charles Chan91ea9722018-08-30 15:56:32 -0700322 return ops.build();
Yi Tseng0b809722017-11-03 10:23:26 -0700323 }
324
325 class FabricNextGroup implements NextGroup {
326 private NextObjective.Type type;
327 private Collection<PortNumber> outputPorts;
328
Charles Chan91ea9722018-08-30 15:56:32 -0700329 FabricNextGroup(NextObjective.Type type, Collection<PortNumber> outputPorts) {
Yi Tseng0b809722017-11-03 10:23:26 -0700330 this.type = type;
331 this.outputPorts = ImmutableList.copyOf(outputPorts);
332 }
333
Charles Chan91ea9722018-08-30 15:56:32 -0700334 NextObjective.Type type() {
Yi Tseng0b809722017-11-03 10:23:26 -0700335 return type;
336 }
337
Charles Chan91ea9722018-08-30 15:56:32 -0700338 Collection<PortNumber> outputPorts() {
Yi Tseng0b809722017-11-03 10:23:26 -0700339 return outputPorts;
340 }
341
342 @Override
343 public byte[] data() {
344 return KRYO.serialize(this);
345 }
346 }
Yi Tseng1b154bd2017-11-20 17:48:19 -0800347
Yi Tsengf78e1742018-04-08 19:57:17 +0800348 class PendingInstallObjective {
349 Objective objective;
350 Collection<FlowId> flowIds;
Yi Tsengfe13f3e2018-08-19 03:09:54 +0800351 Collection<PendingGroupKey> pendingGroupKeys;
352 Consumer<ObjectiveError> callback;
Yi Tsengf78e1742018-04-08 19:57:17 +0800353
Charles Chan91ea9722018-08-30 15:56:32 -0700354 PendingInstallObjective(Objective objective, Collection<FlowId> flowIds,
Yi Tsengfe13f3e2018-08-19 03:09:54 +0800355 Collection<PendingGroupKey> pendingGroupKeys,
356 Consumer<ObjectiveError> callback) {
Yi Tsengf78e1742018-04-08 19:57:17 +0800357 this.objective = objective;
358 this.flowIds = flowIds;
Yi Tsengfe13f3e2018-08-19 03:09:54 +0800359 this.pendingGroupKeys = pendingGroupKeys;
Yi Tsengf78e1742018-04-08 19:57:17 +0800360 this.callback = callback;
361 }
362
363 void flowInstalled(FlowId flowId) {
Charles Chan68e59732018-08-21 15:56:07 -0700364 synchronized (this) {
365 flowIds.remove(flowId);
366 checkIfFinished();
367 }
Yi Tsengf78e1742018-04-08 19:57:17 +0800368 }
369
Yi Tsengfe13f3e2018-08-19 03:09:54 +0800370 void groupInstalled(PendingGroupKey pendingGroupKey) {
Charles Chan68e59732018-08-21 15:56:07 -0700371 synchronized (this) {
372 pendingGroupKeys.remove(pendingGroupKey);
373 checkIfFinished();
374 }
Yi Tsengf78e1742018-04-08 19:57:17 +0800375 }
376
377 private void checkIfFinished() {
Yi Tsengfe13f3e2018-08-19 03:09:54 +0800378 if (flowIds.isEmpty() && pendingGroupKeys.isEmpty()) {
Charles Chan91ea9722018-08-30 15:56:32 -0700379 pendingInstallObjectives.remove(objective);
Yi Tsengfe13f3e2018-08-19 03:09:54 +0800380 callback.accept(null);
Yi Tsengf78e1742018-04-08 19:57:17 +0800381 }
382 }
383
Charles Chan68e59732018-08-21 15:56:07 -0700384 void failed(Objective obj, ObjectiveError error) {
385 flowIds.forEach(flowId -> {
386 PendingFlowKey pfk = new PendingFlowKey(flowId, obj.id());
387 pendingInstallObjectiveFlows.remove(pfk);
388 });
Yi Tsengfe13f3e2018-08-19 03:09:54 +0800389 pendingGroupKeys.forEach(pendingInstallObjectiveGroups::remove);
Charles Chan91ea9722018-08-30 15:56:32 -0700390 pendingInstallObjectives.remove(objective);
Yi Tsengfe13f3e2018-08-19 03:09:54 +0800391 callback.accept(error);
392 }
Charles Chan68e59732018-08-21 15:56:07 -0700393
394 @Override
395 public boolean equals(Object o) {
396 if (this == o) {
397 return true;
398 }
399 if (o == null || getClass() != o.getClass()) {
400 return false;
401 }
402 PendingInstallObjective pio = (PendingInstallObjective) o;
403 return Objects.equal(objective, pio.objective) &&
404 Objects.equal(flowIds, pio.flowIds) &&
405 Objects.equal(pendingGroupKeys, pio.pendingGroupKeys) &&
406 Objects.equal(callback, pio.callback);
407 }
408
409 @Override
410 public int hashCode() {
411 return Objects.hashCode(objective, flowIds, pendingGroupKeys, callback);
412 }
413
414 @Override
415 public String toString() {
416 return MoreObjects.toStringHelper(this)
417 .add("obj", objective)
418 .add("flowIds", flowIds)
419 .add("pendingGroupKeys", pendingGroupKeys)
420 .add("callback", callback)
421 .toString();
422 }
423 }
424
425 class PendingFlowKey {
426 private FlowId flowId;
427 private int objId;
428
429 PendingFlowKey(FlowId flowId, int objId) {
430 this.flowId = flowId;
431 this.objId = objId;
432 }
433
434 @Override
435 public boolean equals(Object o) {
436 if (this == o) {
437 return true;
438 }
439 if (o == null || getClass() != o.getClass()) {
440 return false;
441 }
442 PendingFlowKey pendingFlowKey = (PendingFlowKey) o;
443 return Objects.equal(flowId, pendingFlowKey.flowId) &&
444 objId == pendingFlowKey.objId;
445 }
446
447 @Override
448 public int hashCode() {
449 return Objects.hashCode(flowId, objId);
450 }
451
452 @Override
453 public String toString() {
454 return MoreObjects.toStringHelper(this)
455 .add("flowId", flowId)
456 .add("objId", objId)
457 .toString();
458 }
Yi Tsengfe13f3e2018-08-19 03:09:54 +0800459 }
460
461 class PendingGroupKey {
462 private GroupId groupId;
463 private GroupEvent.Type expectedEventType;
464
Yi Tsengfe13f3e2018-08-19 03:09:54 +0800465 PendingGroupKey(GroupId groupId, NextObjective.Operation objOp) {
466 this.groupId = groupId;
467
468 switch (objOp) {
469 case ADD:
470 expectedEventType = GroupEvent.Type.GROUP_ADDED;
471 break;
472 case REMOVE:
473 expectedEventType = GroupEvent.Type.GROUP_REMOVED;
474 break;
475 case MODIFY:
476 case ADD_TO_EXISTING:
477 case REMOVE_FROM_EXISTING:
478 expectedEventType = GroupEvent.Type.GROUP_UPDATED;
479 break;
480 default:
481 expectedEventType = null;
482 }
483 }
484
485 @Override
486 public boolean equals(Object o) {
487 if (this == o) {
488 return true;
489 }
490 if (o == null || getClass() != o.getClass()) {
491 return false;
492 }
493 PendingGroupKey pendingGroupKey = (PendingGroupKey) o;
494 return Objects.equal(groupId, pendingGroupKey.groupId) &&
495 expectedEventType == pendingGroupKey.expectedEventType;
496 }
497
498 @Override
499 public int hashCode() {
500 return Objects.hashCode(groupId, expectedEventType);
501 }
502
503 @Override
504 public String toString() {
505 return MoreObjects.toStringHelper(this)
506 .add("groupId", groupId)
507 .add("expectedEventType", expectedEventType)
508 .toString();
Yi Tsengf78e1742018-04-08 19:57:17 +0800509 }
510 }
Yi Tseng0b809722017-11-03 10:23:26 -0700511}