blob: 78ea9d917ce7143c85f4698bf012abe43bf8f458 [file] [log] [blame]
Carmelo Casconef7aa3f92017-07-06 23:56:50 -04001/*
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2017-present Open Networking Foundation
Carmelo Casconef7aa3f92017-07-06 23:56:50 -04003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.p4runtime.ctl;
18
Carmelo Cascone87b9b392017-10-02 18:33:20 +020019import com.google.common.collect.HashMultimap;
Carmelo Cascone8d99b172017-07-18 17:26:31 -040020import com.google.common.collect.ImmutableMap;
Yi Tseng82512da2017-08-16 19:46:36 -070021import com.google.common.collect.Lists;
Yi Tseng82512da2017-08-16 19:46:36 -070022import com.google.common.collect.Multimap;
Carmelo Cascone50d195f2018-09-11 13:26:38 -070023import com.google.common.collect.Sets;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040024import com.google.protobuf.ByteString;
Yi Tsenge67e1412018-01-31 17:35:20 -080025import com.google.protobuf.InvalidProtocolBufferException;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040026import io.grpc.ManagedChannel;
Carmelo Cascone943c6642018-09-11 13:01:03 -070027import io.grpc.Metadata;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040028import io.grpc.Status;
29import io.grpc.StatusRuntimeException;
Carmelo Cascone943c6642018-09-11 13:01:03 -070030import io.grpc.protobuf.lite.ProtoLiteUtils;
Carmelo Cascone9e4972c2018-08-30 00:29:16 -070031import io.grpc.stub.ClientCallStreamObserver;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040032import io.grpc.stub.StreamObserver;
Andrea Campanella288b2732017-07-28 14:16:16 +020033import org.onlab.osgi.DefaultServiceDirectory;
Yi Tseng82512da2017-08-16 19:46:36 -070034import org.onlab.util.Tools;
Yi Tseng2a340f72018-11-02 16:52:47 -070035import org.onosproject.grpc.ctl.AbstractGrpcClient;
Carmelo Cascone87892e22017-11-13 16:01:29 -080036import org.onosproject.net.pi.model.PiActionProfileId;
37import org.onosproject.net.pi.model.PiCounterId;
Carmelo Cascone81929aa2018-04-07 01:38:55 -070038import org.onosproject.net.pi.model.PiMeterId;
Andrea Campanella432f7182017-07-14 18:43:27 +020039import org.onosproject.net.pi.model.PiPipeconf;
Carmelo Cascone87892e22017-11-13 16:01:29 -080040import org.onosproject.net.pi.model.PiTableId;
Carmelo Casconecb4327a2018-09-11 15:17:23 -070041import org.onosproject.net.pi.runtime.PiActionProfileGroup;
42import org.onosproject.net.pi.runtime.PiActionProfileMember;
43import org.onosproject.net.pi.runtime.PiActionProfileMemberId;
steven308017632e152018-10-20 00:51:08 +080044import org.onosproject.net.pi.runtime.PiCounterCell;
Carmelo Casconeb045ddc2017-09-01 01:26:35 +020045import org.onosproject.net.pi.runtime.PiCounterCellId;
Frank Wangd7e3b4b2017-09-24 13:37:54 +090046import org.onosproject.net.pi.runtime.PiMeterCellConfig;
47import org.onosproject.net.pi.runtime.PiMeterCellId;
Carmelo Cascone58136812018-07-19 03:40:16 +020048import org.onosproject.net.pi.runtime.PiMulticastGroupEntry;
Andrea Campanella432f7182017-07-14 18:43:27 +020049import org.onosproject.net.pi.runtime.PiPacketOperation;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040050import org.onosproject.net.pi.runtime.PiTableEntry;
Yi Tsenge67e1412018-01-31 17:35:20 -080051import org.onosproject.net.pi.service.PiPipeconfService;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040052import org.onosproject.p4runtime.api.P4RuntimeClient;
Yi Tseng2a340f72018-11-02 16:52:47 -070053import org.onosproject.p4runtime.api.P4RuntimeClientKey;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040054import org.onosproject.p4runtime.api.P4RuntimeEvent;
Carmelo Casconee5b28722018-06-22 17:28:28 +020055import p4.config.v1.P4InfoOuterClass.P4Info;
56import p4.tmp.P4Config;
Carmelo Cascone6af4e172018-06-15 16:01:30 +020057import p4.v1.P4RuntimeGrpc;
58import p4.v1.P4RuntimeOuterClass;
59import p4.v1.P4RuntimeOuterClass.ActionProfileGroup;
60import p4.v1.P4RuntimeOuterClass.ActionProfileMember;
61import p4.v1.P4RuntimeOuterClass.Entity;
62import p4.v1.P4RuntimeOuterClass.ForwardingPipelineConfig;
Carmelo Cascone9e4972c2018-08-30 00:29:16 -070063import p4.v1.P4RuntimeOuterClass.GetForwardingPipelineConfigRequest;
64import p4.v1.P4RuntimeOuterClass.GetForwardingPipelineConfigResponse;
Carmelo Cascone6af4e172018-06-15 16:01:30 +020065import p4.v1.P4RuntimeOuterClass.MasterArbitrationUpdate;
Carmelo Cascone58136812018-07-19 03:40:16 +020066import p4.v1.P4RuntimeOuterClass.MulticastGroupEntry;
67import p4.v1.P4RuntimeOuterClass.PacketReplicationEngineEntry;
Carmelo Cascone6af4e172018-06-15 16:01:30 +020068import p4.v1.P4RuntimeOuterClass.ReadRequest;
69import p4.v1.P4RuntimeOuterClass.ReadResponse;
70import p4.v1.P4RuntimeOuterClass.SetForwardingPipelineConfigRequest;
71import p4.v1.P4RuntimeOuterClass.StreamMessageRequest;
72import p4.v1.P4RuntimeOuterClass.StreamMessageResponse;
73import p4.v1.P4RuntimeOuterClass.TableEntry;
74import p4.v1.P4RuntimeOuterClass.Uint128;
75import p4.v1.P4RuntimeOuterClass.Update;
76import p4.v1.P4RuntimeOuterClass.WriteRequest;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040077
Carmelo Casconee5b28722018-06-22 17:28:28 +020078import java.math.BigInteger;
Carmelo Cascone9e4972c2018-08-30 00:29:16 -070079import java.net.ConnectException;
Carmelo Casconed61fdb32017-10-30 10:09:57 -070080import java.nio.ByteBuffer;
Carmelo Cascone8d99b172017-07-18 17:26:31 -040081import java.util.Collections;
82import java.util.Iterator;
83import java.util.List;
84import java.util.Map;
Carmelo Cascone87b9b392017-10-02 18:33:20 +020085import java.util.Objects;
Carmelo Casconeb045ddc2017-09-01 01:26:35 +020086import java.util.Set;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040087import java.util.concurrent.CompletableFuture;
Carmelo Cascone9e4972c2018-08-30 00:29:16 -070088import java.util.concurrent.atomic.AtomicBoolean;
Carmelo Cascone8d99b172017-07-18 17:26:31 -040089import java.util.stream.Collectors;
90import java.util.stream.StreamSupport;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040091
Carmelo Casconed61fdb32017-10-30 10:09:57 -070092import static com.google.common.base.Preconditions.checkNotNull;
Carmelo Cascone5bc7e102018-02-18 18:27:55 -080093import static java.lang.String.format;
Carmelo Casconee44592f2018-09-12 02:24:47 -070094import static java.util.Collections.singletonList;
Carmelo Cascone6af4e172018-06-15 16:01:30 +020095import static p4.v1.P4RuntimeOuterClass.Entity.EntityCase.ACTION_PROFILE_GROUP;
96import static p4.v1.P4RuntimeOuterClass.Entity.EntityCase.ACTION_PROFILE_MEMBER;
Carmelo Cascone58136812018-07-19 03:40:16 +020097import static p4.v1.P4RuntimeOuterClass.Entity.EntityCase.PACKET_REPLICATION_ENGINE_ENTRY;
Carmelo Cascone6af4e172018-06-15 16:01:30 +020098import static p4.v1.P4RuntimeOuterClass.Entity.EntityCase.TABLE_ENTRY;
Carmelo Casconee5b28722018-06-22 17:28:28 +020099import static p4.v1.P4RuntimeOuterClass.PacketIn;
Carmelo Cascone6af4e172018-06-15 16:01:30 +0200100import static p4.v1.P4RuntimeOuterClass.PacketOut;
Carmelo Cascone58136812018-07-19 03:40:16 +0200101import static p4.v1.P4RuntimeOuterClass.PacketReplicationEngineEntry.TypeCase.MULTICAST_GROUP_ENTRY;
Carmelo Cascone6af4e172018-06-15 16:01:30 +0200102import static p4.v1.P4RuntimeOuterClass.SetForwardingPipelineConfigRequest.Action.VERIFY_AND_COMMIT;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400103
104/**
105 * Implementation of a P4Runtime client.
106 */
Yi Tseng2a340f72018-11-02 16:52:47 -0700107final class P4RuntimeClientImpl extends AbstractGrpcClient implements P4RuntimeClient {
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400108
Carmelo Cascone943c6642018-09-11 13:01:03 -0700109 private static final Metadata.Key<com.google.rpc.Status> STATUS_DETAILS_KEY =
110 Metadata.Key.of("grpc-status-details-bin",
111 ProtoLiteUtils.metadataMarshaller(
112 com.google.rpc.Status.getDefaultInstance()));
113
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400114 private static final Map<WriteOperationType, Update.Type> UPDATE_TYPES = ImmutableMap.of(
115 WriteOperationType.UNSPECIFIED, Update.Type.UNSPECIFIED,
116 WriteOperationType.INSERT, Update.Type.INSERT,
117 WriteOperationType.MODIFY, Update.Type.MODIFY,
118 WriteOperationType.DELETE, Update.Type.DELETE
119 );
120
Carmelo Casconef423bec2017-08-30 01:56:25 +0200121 private final long p4DeviceId;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400122 private final P4RuntimeControllerImpl controller;
123 private final P4RuntimeGrpc.P4RuntimeBlockingStub blockingStub;
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700124 private StreamChannelManager streamChannelManager;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400125
Carmelo Casconee5b28722018-06-22 17:28:28 +0200126 // Used by this client for write requests.
127 private Uint128 clientElectionId = Uint128.newBuilder().setLow(1).build();
Yi Tseng3e7f1452017-10-20 10:31:53 -0700128
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700129 private final AtomicBoolean isClientMaster = new AtomicBoolean(false);
130
Yi Tseng82512da2017-08-16 19:46:36 -0700131 /**
132 * Default constructor.
133 *
Yi Tseng2a340f72018-11-02 16:52:47 -0700134 * @param clientKey the client key of this client
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200135 * @param channel gRPC channel
Yi Tseng82512da2017-08-16 19:46:36 -0700136 * @param controller runtime client controller
137 */
Yi Tseng2a340f72018-11-02 16:52:47 -0700138 P4RuntimeClientImpl(P4RuntimeClientKey clientKey, ManagedChannel channel,
Carmelo Casconef423bec2017-08-30 01:56:25 +0200139 P4RuntimeControllerImpl controller) {
Yi Tseng2a340f72018-11-02 16:52:47 -0700140
Yi Tsengd7716482018-10-31 15:34:30 -0700141 super(clientKey);
Yi Tseng2a340f72018-11-02 16:52:47 -0700142 this.p4DeviceId = clientKey.p4DeviceId();
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400143 this.controller = controller;
Yi Tseng2a340f72018-11-02 16:52:47 -0700144
Carmelo Casconee5b28722018-06-22 17:28:28 +0200145 //TODO Investigate use of stub deadlines instead of timeout in supplyInContext
Carmelo Cascone2cad9ef2017-08-01 21:52:07 +0200146 this.blockingStub = P4RuntimeGrpc.newBlockingStub(channel);
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700147 this.streamChannelManager = new StreamChannelManager(channel);
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400148 }
149
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400150 @Override
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700151 public CompletableFuture<Boolean> startStreamChannel() {
Carmelo Cascone85d72ef2018-08-23 19:06:29 -0700152 return supplyInContext(() -> sendMasterArbitrationUpdate(false),
Carmelo Casconee5b28722018-06-22 17:28:28 +0200153 "start-initStreamChannel");
154 }
155
156 @Override
Carmelo Casconee5b28722018-06-22 17:28:28 +0200157 public CompletableFuture<Boolean> becomeMaster() {
Carmelo Cascone85d72ef2018-08-23 19:06:29 -0700158 return supplyInContext(() -> sendMasterArbitrationUpdate(true),
Carmelo Casconee5b28722018-06-22 17:28:28 +0200159 "becomeMaster");
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400160 }
161
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400162 @Override
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700163 public boolean isMaster() {
164 return streamChannelManager.isOpen() && isClientMaster.get();
165 }
166
167 @Override
168 public boolean isStreamChannelOpen() {
169 return streamChannelManager.isOpen();
170 }
171
172 @Override
Carmelo Casconed61fdb32017-10-30 10:09:57 -0700173 public CompletableFuture<Boolean> setPipelineConfig(PiPipeconf pipeconf, ByteBuffer deviceData) {
174 return supplyInContext(() -> doSetPipelineConfig(pipeconf, deviceData), "setPipelineConfig");
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400175 }
176
177 @Override
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700178 public boolean isPipelineConfigSet(PiPipeconf pipeconf, ByteBuffer deviceData) {
179 return doIsPipelineConfigSet(pipeconf, deviceData);
180 }
181
182 @Override
Carmelo Casconee44592f2018-09-12 02:24:47 -0700183 public CompletableFuture<Boolean> writeTableEntries(List<PiTableEntry> piTableEntries,
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400184 WriteOperationType opType, PiPipeconf pipeconf) {
Carmelo Cascone2cad9ef2017-08-01 21:52:07 +0200185 return supplyInContext(() -> doWriteTableEntries(piTableEntries, opType, pipeconf),
186 "writeTableEntries-" + opType.name());
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400187 }
188
189 @Override
Carmelo Casconee44592f2018-09-12 02:24:47 -0700190 public CompletableFuture<List<PiTableEntry>> dumpTables(
Carmelo Cascone50d195f2018-09-11 13:26:38 -0700191 Set<PiTableId> piTableIds, boolean defaultEntries, PiPipeconf pipeconf) {
192 return supplyInContext(() -> doDumpTables(piTableIds, defaultEntries, pipeconf),
193 "dumpTables-" + piTableIds.hashCode());
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400194 }
195
196 @Override
Carmelo Casconee44592f2018-09-12 02:24:47 -0700197 public CompletableFuture<List<PiTableEntry>> dumpAllTables(PiPipeconf pipeconf) {
Carmelo Cascone50d195f2018-09-11 13:26:38 -0700198 return supplyInContext(() -> doDumpTables(null, false, pipeconf), "dumpAllTables");
Carmelo Casconee5b28722018-06-22 17:28:28 +0200199 }
200
201 @Override
Andrea Campanella432f7182017-07-14 18:43:27 +0200202 public CompletableFuture<Boolean> packetOut(PiPacketOperation packet, PiPipeconf pipeconf) {
Carmelo Cascone2cad9ef2017-08-01 21:52:07 +0200203 return supplyInContext(() -> doPacketOut(packet, pipeconf), "packetOut");
Andrea Campanella432f7182017-07-14 18:43:27 +0200204 }
205
Carmelo Casconeb045ddc2017-09-01 01:26:35 +0200206 @Override
steven308017632e152018-10-20 00:51:08 +0800207 public CompletableFuture<List<PiCounterCell>> readCounterCells(Set<PiCounterCellId> cellIds,
208 PiPipeconf pipeconf) {
Carmelo Casconee44592f2018-09-12 02:24:47 -0700209 return supplyInContext(() -> doReadCounterCells(Lists.newArrayList(cellIds), pipeconf),
Carmelo Casconeb045ddc2017-09-01 01:26:35 +0200210 "readCounterCells-" + cellIds.hashCode());
211 }
212
213 @Override
steven308017632e152018-10-20 00:51:08 +0800214 public CompletableFuture<List<PiCounterCell>> readAllCounterCells(Set<PiCounterId> counterIds,
215 PiPipeconf pipeconf) {
Carmelo Casconee44592f2018-09-12 02:24:47 -0700216 return supplyInContext(() -> doReadAllCounterCells(Lists.newArrayList(counterIds), pipeconf),
Carmelo Cascone81929aa2018-04-07 01:38:55 -0700217 "readAllCounterCells-" + counterIds.hashCode());
Carmelo Casconeb045ddc2017-09-01 01:26:35 +0200218 }
219
Yi Tseng82512da2017-08-16 19:46:36 -0700220 @Override
Carmelo Casconecb4327a2018-09-11 15:17:23 -0700221 public CompletableFuture<Boolean> writeActionProfileMembers(List<PiActionProfileMember> members,
222 WriteOperationType opType,
223 PiPipeconf pipeconf) {
224 return supplyInContext(() -> doWriteActionProfileMembers(members, opType, pipeconf),
225 "writeActionProfileMembers-" + opType.name());
226 }
227
228
229 @Override
230 public CompletableFuture<Boolean> writeActionProfileGroup(PiActionProfileGroup group,
Yi Tseng82512da2017-08-16 19:46:36 -0700231 WriteOperationType opType,
Carmelo Casconecb4327a2018-09-11 15:17:23 -0700232 PiPipeconf pipeconf,
ghj0504520ed7340c2018-10-26 13:06:35 -0700233 int maxMemberSize) {
Carmelo Casconecb4327a2018-09-11 15:17:23 -0700234 return supplyInContext(() -> doWriteActionProfileGroup(group, opType, pipeconf, maxMemberSize),
235 "writeActionProfileGroup-" + opType.name());
Yi Tseng82512da2017-08-16 19:46:36 -0700236 }
237
238 @Override
Carmelo Casconecb4327a2018-09-11 15:17:23 -0700239 public CompletableFuture<List<PiActionProfileGroup>> dumpActionProfileGroups(
240 PiActionProfileId actionProfileId, PiPipeconf pipeconf) {
Yi Tseng82512da2017-08-16 19:46:36 -0700241 return supplyInContext(() -> doDumpGroups(actionProfileId, pipeconf),
Carmelo Casconecb4327a2018-09-11 15:17:23 -0700242 "dumpActionProfileGroups-" + actionProfileId.id());
Yi Tseng82512da2017-08-16 19:46:36 -0700243 }
244
Yi Tseng3e7f1452017-10-20 10:31:53 -0700245 @Override
Carmelo Casconecb4327a2018-09-11 15:17:23 -0700246 public CompletableFuture<List<PiActionProfileMemberId>> dumpActionProfileMemberIds(
Carmelo Casconee44592f2018-09-12 02:24:47 -0700247 PiActionProfileId actionProfileId, PiPipeconf pipeconf) {
248 return supplyInContext(() -> doDumpActionProfileMemberIds(actionProfileId, pipeconf),
249 "dumpActionProfileMemberIds-" + actionProfileId.id());
250 }
251
252 @Override
Carmelo Casconecb4327a2018-09-11 15:17:23 -0700253 public CompletableFuture<List<PiActionProfileMemberId>> removeActionProfileMembers(
Carmelo Casconee44592f2018-09-12 02:24:47 -0700254 PiActionProfileId actionProfileId,
Carmelo Casconecb4327a2018-09-11 15:17:23 -0700255 List<PiActionProfileMemberId> memberIds,
Carmelo Casconee44592f2018-09-12 02:24:47 -0700256 PiPipeconf pipeconf) {
257 return supplyInContext(
258 () -> doRemoveActionProfileMembers(actionProfileId, memberIds, pipeconf),
259 "cleanupActionProfileMembers-" + actionProfileId.id());
260 }
261
262 @Override
263 public CompletableFuture<Boolean> writeMeterCells(List<PiMeterCellConfig> cellIds, PiPipeconf pipeconf) {
Frank Wangd7e3b4b2017-09-24 13:37:54 +0900264
265 return supplyInContext(() -> doWriteMeterCells(cellIds, pipeconf),
266 "writeMeterCells");
267 }
268
269 @Override
Carmelo Cascone58136812018-07-19 03:40:16 +0200270 public CompletableFuture<Boolean> writePreMulticastGroupEntries(
Carmelo Casconee44592f2018-09-12 02:24:47 -0700271 List<PiMulticastGroupEntry> entries,
Carmelo Cascone58136812018-07-19 03:40:16 +0200272 WriteOperationType opType) {
273 return supplyInContext(() -> doWriteMulticastGroupEntries(entries, opType),
274 "writePreMulticastGroupEntries");
275 }
276
277 @Override
Carmelo Casconee44592f2018-09-12 02:24:47 -0700278 public CompletableFuture<List<PiMulticastGroupEntry>> readAllMulticastGroupEntries() {
Carmelo Cascone58136812018-07-19 03:40:16 +0200279 return supplyInContext(this::doReadAllMulticastGroupEntries,
280 "readAllMulticastGroupEntries");
281 }
282
283 @Override
Carmelo Casconee44592f2018-09-12 02:24:47 -0700284 public CompletableFuture<List<PiMeterCellConfig>> readMeterCells(Set<PiMeterCellId> cellIds,
285 PiPipeconf pipeconf) {
286 return supplyInContext(() -> doReadMeterCells(Lists.newArrayList(cellIds), pipeconf),
Frank Wangd7e3b4b2017-09-24 13:37:54 +0900287 "readMeterCells-" + cellIds.hashCode());
288 }
289
290 @Override
Carmelo Casconee44592f2018-09-12 02:24:47 -0700291 public CompletableFuture<List<PiMeterCellConfig>> readAllMeterCells(Set<PiMeterId> meterIds,
292 PiPipeconf pipeconf) {
293 return supplyInContext(() -> doReadAllMeterCells(Lists.newArrayList(meterIds), pipeconf),
Carmelo Cascone81929aa2018-04-07 01:38:55 -0700294 "readAllMeterCells-" + meterIds.hashCode());
Frank Wangd7e3b4b2017-09-24 13:37:54 +0900295 }
Yi Tseng3e7f1452017-10-20 10:31:53 -0700296
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400297 /* Blocking method implementations below */
298
Carmelo Cascone85d72ef2018-08-23 19:06:29 -0700299 private boolean sendMasterArbitrationUpdate(boolean asMaster) {
300 BigInteger newId = controller.newMasterElectionId(deviceId);
301 if (asMaster) {
302 // Becoming master is a race. Here we increase our chances of win
303 // against other ONOS nodes in the cluster that are calling start()
304 // (which is used to start the stream RPC session, not to become
305 // master).
306 newId = newId.add(BigInteger.valueOf(1000));
Carmelo Casconee5b28722018-06-22 17:28:28 +0200307 }
Carmelo Cascone85d72ef2018-08-23 19:06:29 -0700308 final Uint128 idMsg = bigIntegerToUint128(
309 controller.newMasterElectionId(deviceId));
Andrea Campanella1e573442018-05-17 17:07:13 +0200310
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700311 log.debug("Sending arbitration update to {}... electionId={}",
312 deviceId, newId);
313
314 streamChannelManager.send(
315 StreamMessageRequest.newBuilder()
316 .setArbitration(
317 MasterArbitrationUpdate
318 .newBuilder()
319 .setDeviceId(p4DeviceId)
320 .setElectionId(idMsg)
321 .build())
322 .build());
323 clientElectionId = idMsg;
324 return true;
325 }
326
327 private ForwardingPipelineConfig getPipelineConfig(
328 PiPipeconf pipeconf, ByteBuffer deviceData) {
329 P4Info p4Info = PipeconfHelper.getP4Info(pipeconf);
330 if (p4Info == null) {
331 // Problem logged by PipeconfHelper.
332 return null;
Yi Tseng3e7f1452017-10-20 10:31:53 -0700333 }
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700334
ghj050452092a48bf2018-10-22 10:50:41 -0700335 ForwardingPipelineConfig.Cookie pipeconfCookie = ForwardingPipelineConfig.Cookie
336 .newBuilder()
ghj0504520ec1a4202018-10-22 10:50:41 -0700337 .setCookie(pipeconf.fingerprint())
ghj050452092a48bf2018-10-22 10:50:41 -0700338 .build();
339
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700340 // FIXME: This is specific to PI P4Runtime implementation.
341 P4Config.P4DeviceConfig p4DeviceConfigMsg = P4Config.P4DeviceConfig
342 .newBuilder()
343 .setExtras(P4Config.P4DeviceConfig.Extras.getDefaultInstance())
344 .setReassign(true)
345 .setDeviceData(ByteString.copyFrom(deviceData))
346 .build();
347
348 return ForwardingPipelineConfig
349 .newBuilder()
350 .setP4Info(p4Info)
351 .setP4DeviceConfig(p4DeviceConfigMsg.toByteString())
ghj050452092a48bf2018-10-22 10:50:41 -0700352 .setCookie(pipeconfCookie)
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700353 .build();
354 }
355
356 private boolean doIsPipelineConfigSet(PiPipeconf pipeconf, ByteBuffer deviceData) {
357
358 GetForwardingPipelineConfigRequest request = GetForwardingPipelineConfigRequest
359 .newBuilder()
360 .setDeviceId(p4DeviceId)
ghj050452092a48bf2018-10-22 10:50:41 -0700361 .setResponseType(GetForwardingPipelineConfigRequest
362 .ResponseType.COOKIE_ONLY)
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700363 .build();
364
365 GetForwardingPipelineConfigResponse resp;
366 try {
367 resp = this.blockingStub
368 .getForwardingPipelineConfig(request);
369 } catch (StatusRuntimeException ex) {
370 checkGrpcException(ex);
371 // FAILED_PRECONDITION means that a pipeline config was not set in
372 // the first place. Don't bother logging.
373 if (!ex.getStatus().getCode()
374 .equals(Status.FAILED_PRECONDITION.getCode())) {
375 log.warn("Unable to get pipeline config from {}: {}",
376 deviceId, ex.getMessage());
377 }
378 return false;
379 }
ghj050452092a48bf2018-10-22 10:50:41 -0700380 if (!resp.getConfig().hasCookie()) {
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700381 log.warn("{} returned GetForwardingPipelineConfigResponse " +
Carmelo Casconecb831812018-11-29 13:19:39 -0800382 "with 'cookie' field unset. " +
383 "Will try by comparing 'device_data'...",
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700384 deviceId);
Carmelo Casconecb831812018-11-29 13:19:39 -0800385 return doIsPipelineConfigSetWithData(pipeconf, deviceData);
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700386 }
ghj050452092a48bf2018-10-22 10:50:41 -0700387
ghj0504520ec1a4202018-10-22 10:50:41 -0700388 return resp.getConfig().getCookie().getCookie() == pipeconf.fingerprint();
Yi Tseng3e7f1452017-10-20 10:31:53 -0700389 }
Carmelo Casconee5b28722018-06-22 17:28:28 +0200390
Carmelo Casconecb831812018-11-29 13:19:39 -0800391 private boolean doIsPipelineConfigSetWithData(PiPipeconf pipeconf, ByteBuffer deviceData) {
392
393 GetForwardingPipelineConfigRequest request = GetForwardingPipelineConfigRequest
394 .newBuilder()
395 .setDeviceId(p4DeviceId)
396 .build();
397
398 GetForwardingPipelineConfigResponse resp;
399 try {
400 resp = this.blockingStub
401 .getForwardingPipelineConfig(request);
402 } catch (StatusRuntimeException ex) {
403 checkGrpcException(ex);
404 return false;
405 }
406
407 ForwardingPipelineConfig expectedConfig = getPipelineConfig(
408 pipeconf, deviceData);
409
410 if (expectedConfig == null) {
411 return false;
412 }
413 if (!resp.hasConfig()) {
414 log.warn("{} returned GetForwardingPipelineConfigResponse " +
415 "with 'config' field unset",
416 deviceId);
417 return false;
418 }
419 if (resp.getConfig().getP4DeviceConfig().isEmpty()
420 && !expectedConfig.getP4DeviceConfig().isEmpty()) {
421 // Don't bother with a warn or error since we don't really allow
422 // updating the pipeline to a different one. So the P4Info should be
423 // enough for us.
424 log.debug("{} returned GetForwardingPipelineConfigResponse " +
425 "with empty 'p4_device_config' field, " +
426 "equality will be based only on P4Info",
427 deviceId);
428 return resp.getConfig().getP4Info().equals(
429 expectedConfig.getP4Info());
430 } else {
431 return resp.getConfig().getP4DeviceConfig()
432 .equals(expectedConfig.getP4DeviceConfig())
433 && resp.getConfig().getP4Info()
434 .equals(expectedConfig.getP4Info());
435 }
436 }
437
Carmelo Casconed61fdb32017-10-30 10:09:57 -0700438 private boolean doSetPipelineConfig(PiPipeconf pipeconf, ByteBuffer deviceData) {
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400439
Carmelo Casconed61fdb32017-10-30 10:09:57 -0700440 log.info("Setting pipeline config for {} to {}...", deviceId, pipeconf.id());
441
442 checkNotNull(deviceData, "deviceData cannot be null");
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400443
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700444 ForwardingPipelineConfig pipelineConfig = getPipelineConfig(pipeconf, deviceData);
445
446 if (pipelineConfig == null) {
447 // Error logged in getPipelineConfig()
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400448 return false;
449 }
450
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400451 SetForwardingPipelineConfigRequest request = SetForwardingPipelineConfigRequest
452 .newBuilder()
Andrea Campanella8bcd5862017-12-11 11:34:45 +0100453 .setDeviceId(p4DeviceId)
Carmelo Casconee5b28722018-06-22 17:28:28 +0200454 .setElectionId(clientElectionId)
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400455 .setAction(VERIFY_AND_COMMIT)
Andrea Campanella8bcd5862017-12-11 11:34:45 +0100456 .setConfig(pipelineConfig)
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400457 .build();
458
459 try {
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700460 //noinspection ResultOfMethodCallIgnored
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400461 this.blockingStub.setForwardingPipelineConfig(request);
Carmelo Casconed61fdb32017-10-30 10:09:57 -0700462 return true;
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400463 } catch (StatusRuntimeException ex) {
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700464 checkGrpcException(ex);
Carmelo Cascone5bc7e102018-02-18 18:27:55 -0800465 log.warn("Unable to set pipeline config on {}: {}", deviceId, ex.getMessage());
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400466 return false;
467 }
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400468 }
469
Carmelo Casconee44592f2018-09-12 02:24:47 -0700470 private boolean doWriteTableEntries(List<PiTableEntry> piTableEntries, WriteOperationType opType,
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400471 PiPipeconf pipeconf) {
Carmelo Cascone5bc7e102018-02-18 18:27:55 -0800472 if (piTableEntries.size() == 0) {
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400473 return true;
474 }
475
Carmelo Casconee44592f2018-09-12 02:24:47 -0700476 List<Update> updateMsgs;
Carmelo Cascone5bc7e102018-02-18 18:27:55 -0800477 try {
478 updateMsgs = TableEntryEncoder.encode(piTableEntries, pipeconf)
479 .stream()
480 .map(tableEntryMsg ->
481 Update.newBuilder()
482 .setEntity(Entity.newBuilder()
483 .setTableEntry(tableEntryMsg)
484 .build())
485 .setType(UPDATE_TYPES.get(opType))
486 .build())
487 .collect(Collectors.toList());
488 } catch (EncodeException e) {
489 log.error("Unable to encode table entries, aborting {} operation: {}",
490 opType.name(), e.getMessage());
491 return false;
492 }
493
Carmelo Cascone58136812018-07-19 03:40:16 +0200494 return write(updateMsgs, piTableEntries, opType, "table entry");
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400495 }
496
Carmelo Casconee44592f2018-09-12 02:24:47 -0700497 private List<PiTableEntry> doDumpTables(
Carmelo Cascone50d195f2018-09-11 13:26:38 -0700498 Set<PiTableId> piTableIds, boolean defaultEntries, PiPipeconf pipeconf) {
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400499
Carmelo Cascone50d195f2018-09-11 13:26:38 -0700500 log.debug("Dumping tables {} from {} (pipeconf {})...",
501 piTableIds, deviceId, pipeconf.id());
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400502
Carmelo Cascone50d195f2018-09-11 13:26:38 -0700503 Set<Integer> tableIds = Sets.newHashSet();
504 if (piTableIds == null) {
Carmelo Casconee5b28722018-06-22 17:28:28 +0200505 // Dump all tables.
Carmelo Cascone50d195f2018-09-11 13:26:38 -0700506 tableIds.add(0);
Carmelo Casconee5b28722018-06-22 17:28:28 +0200507 } else {
508 P4InfoBrowser browser = PipeconfHelper.getP4InfoBrowser(pipeconf);
Carmelo Cascone58136812018-07-19 03:40:16 +0200509 if (browser == null) {
510 log.warn("Unable to get a P4Info browser for pipeconf {}", pipeconf);
511 return Collections.emptyList();
512 }
Carmelo Cascone50d195f2018-09-11 13:26:38 -0700513 piTableIds.forEach(piTableId -> {
514 try {
515 tableIds.add(browser.tables().getByName(piTableId.id()).getPreamble().getId());
516 } catch (P4InfoBrowser.NotFoundException e) {
517 log.warn("Unable to dump table {}: {}", piTableId, e.getMessage());
518 }
519 });
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400520 }
521
Carmelo Cascone50d195f2018-09-11 13:26:38 -0700522 if (tableIds.isEmpty()) {
523 return Collections.emptyList();
524 }
525
526 ReadRequest.Builder requestMsgBuilder = ReadRequest.newBuilder()
527 .setDeviceId(p4DeviceId);
528 tableIds.forEach(tableId -> requestMsgBuilder.addEntities(
529 Entity.newBuilder()
530 .setTableEntry(
531 TableEntry.newBuilder()
532 .setTableId(tableId)
533 .setIsDefaultAction(defaultEntries)
steven308017632e152018-10-20 00:51:08 +0800534 .setCounterData(P4RuntimeOuterClass.CounterData.getDefaultInstance())
Carmelo Cascone50d195f2018-09-11 13:26:38 -0700535 .build())
536 .build())
537 .build());
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400538
539 Iterator<ReadResponse> responses;
540 try {
Carmelo Cascone50d195f2018-09-11 13:26:38 -0700541 responses = blockingStub.read(requestMsgBuilder.build());
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400542 } catch (StatusRuntimeException e) {
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700543 checkGrpcException(e);
Carmelo Cascone50d195f2018-09-11 13:26:38 -0700544 log.warn("Unable to dump tables from {}: {}", deviceId, e.getMessage());
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400545 return Collections.emptyList();
546 }
547
548 Iterable<ReadResponse> responseIterable = () -> responses;
549 List<TableEntry> tableEntryMsgs = StreamSupport
550 .stream(responseIterable.spliterator(), false)
551 .map(ReadResponse::getEntitiesList)
552 .flatMap(List::stream)
553 .filter(entity -> entity.getEntityCase() == TABLE_ENTRY)
554 .map(Entity::getTableEntry)
555 .collect(Collectors.toList());
556
Carmelo Cascone50d195f2018-09-11 13:26:38 -0700557 log.debug("Retrieved {} entries from {} tables on {}...",
558 tableEntryMsgs.size(), tableIds.size(), deviceId);
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400559
560 return TableEntryEncoder.decode(tableEntryMsgs, pipeconf);
561 }
562
Andrea Campanellafc1d34c2017-07-18 17:01:41 +0200563 private boolean doPacketOut(PiPacketOperation packet, PiPipeconf pipeconf) {
564 try {
565 //encode the PiPacketOperation into a PacketOut
566 PacketOut packetOut = PacketIOCodec.encodePacketOut(packet, pipeconf);
567
568 //Build the request
569 StreamMessageRequest packetOutRequest = StreamMessageRequest
570 .newBuilder().setPacket(packetOut).build();
571
572 //Send the request
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700573 streamChannelManager.send(packetOutRequest);
Andrea Campanellafc1d34c2017-07-18 17:01:41 +0200574
575 } catch (P4InfoBrowser.NotFoundException e) {
576 log.error("Cant find expected metadata in p4Info file. {}", e.getMessage());
577 log.debug("Exception", e);
578 return false;
579 }
580 return true;
581 }
582
Carmelo Casconea966c342017-07-30 01:56:30 -0400583 private void doPacketIn(PacketIn packetInMsg) {
584
585 // Retrieve the pipeconf for this client's device.
586 PiPipeconfService pipeconfService = DefaultServiceDirectory.getService(PiPipeconfService.class);
587 if (pipeconfService == null) {
588 throw new IllegalStateException("PiPipeconfService is null. Can't handle packet in.");
589 }
590 final PiPipeconf pipeconf;
591 if (pipeconfService.ofDevice(deviceId).isPresent() &&
592 pipeconfService.getPipeconf(pipeconfService.ofDevice(deviceId).get()).isPresent()) {
593 pipeconf = pipeconfService.getPipeconf(pipeconfService.ofDevice(deviceId).get()).get();
594 } else {
595 log.warn("Unable to get pipeconf of {}. Can't handle packet in", deviceId);
596 return;
597 }
598 // Decode packet message and post event.
Carmelo Cascone87892e22017-11-13 16:01:29 -0800599 PiPacketOperation packetOperation = PacketIOCodec.decodePacketIn(packetInMsg, pipeconf, deviceId);
Carmelo Casconee5b28722018-06-22 17:28:28 +0200600 PacketInEvent packetInEventSubject = new PacketInEvent(deviceId, packetOperation);
Carmelo Cascone2cad9ef2017-08-01 21:52:07 +0200601 P4RuntimeEvent event = new P4RuntimeEvent(P4RuntimeEvent.Type.PACKET_IN, packetInEventSubject);
Carmelo Casconea966c342017-07-30 01:56:30 -0400602 log.debug("Received packet in: {}", event);
603 controller.postEvent(event);
604 }
605
Carmelo Casconee5b28722018-06-22 17:28:28 +0200606 private void doArbitrationResponse(MasterArbitrationUpdate msg) {
607 // From the spec...
608 // - Election_id: The stream RPC with the highest election_id is the
609 // master. Switch populates with the highest election ID it
610 // has received from all connected controllers.
611 // - Status: Switch populates this with OK for the client that is the
612 // master, and with an error status for all other connected clients (at
613 // every mastership change).
614 if (!msg.hasElectionId() || !msg.hasStatus()) {
Yi Tseng3e7f1452017-10-20 10:31:53 -0700615 return;
616 }
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700617 final boolean isMaster =
618 msg.getStatus().getCode() == Status.OK.getCode().value();
619 log.debug("Received arbitration update from {}: isMaster={}, electionId={}",
620 deviceId, isMaster, uint128ToBigInteger(msg.getElectionId()));
Carmelo Casconee5b28722018-06-22 17:28:28 +0200621 controller.postEvent(new P4RuntimeEvent(
622 P4RuntimeEvent.Type.ARBITRATION_RESPONSE,
623 new ArbitrationResponse(deviceId, isMaster)));
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700624 isClientMaster.set(isMaster);
Carmelo Casconea966c342017-07-30 01:56:30 -0400625 }
626
steven308017632e152018-10-20 00:51:08 +0800627 private List<PiCounterCell> doReadAllCounterCells(
Carmelo Casconee44592f2018-09-12 02:24:47 -0700628 List<PiCounterId> counterIds, PiPipeconf pipeconf) {
Carmelo Cascone81929aa2018-04-07 01:38:55 -0700629 return doReadCounterEntities(
630 CounterEntryCodec.readAllCellsEntities(counterIds, pipeconf),
631 pipeconf);
632 }
Carmelo Casconeb045ddc2017-09-01 01:26:35 +0200633
steven308017632e152018-10-20 00:51:08 +0800634 private List<PiCounterCell> doReadCounterCells(
Carmelo Casconee44592f2018-09-12 02:24:47 -0700635 List<PiCounterCellId> cellIds, PiPipeconf pipeconf) {
Carmelo Cascone81929aa2018-04-07 01:38:55 -0700636 return doReadCounterEntities(
637 CounterEntryCodec.encodePiCounterCellIds(cellIds, pipeconf),
638 pipeconf);
639 }
640
steven308017632e152018-10-20 00:51:08 +0800641 private List<PiCounterCell> doReadCounterEntities(
Carmelo Casconee44592f2018-09-12 02:24:47 -0700642 List<Entity> counterEntities, PiPipeconf pipeconf) {
Carmelo Cascone81929aa2018-04-07 01:38:55 -0700643
644 if (counterEntities.size() == 0) {
645 return Collections.emptyList();
646 }
Carmelo Casconeb045ddc2017-09-01 01:26:35 +0200647
Carmelo Cascone7f75be42017-09-07 14:37:02 +0200648 final ReadRequest request = ReadRequest.newBuilder()
649 .setDeviceId(p4DeviceId)
Carmelo Cascone81929aa2018-04-07 01:38:55 -0700650 .addAllEntities(counterEntities)
Carmelo Cascone7f75be42017-09-07 14:37:02 +0200651 .build();
652
Carmelo Cascone7f75be42017-09-07 14:37:02 +0200653 final Iterable<ReadResponse> responses;
Carmelo Casconeb045ddc2017-09-01 01:26:35 +0200654 try {
Carmelo Cascone7f75be42017-09-07 14:37:02 +0200655 responses = () -> blockingStub.read(request);
Carmelo Casconeb045ddc2017-09-01 01:26:35 +0200656 } catch (StatusRuntimeException e) {
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700657 checkGrpcException(e);
Carmelo Cascone5bc7e102018-02-18 18:27:55 -0800658 log.warn("Unable to read counter cells from {}: {}", deviceId, e.getMessage());
Carmelo Casconeb045ddc2017-09-01 01:26:35 +0200659 return Collections.emptyList();
660 }
661
Carmelo Cascone7f75be42017-09-07 14:37:02 +0200662 List<Entity> entities = StreamSupport.stream(responses.spliterator(), false)
Carmelo Casconeb045ddc2017-09-01 01:26:35 +0200663 .map(ReadResponse::getEntitiesList)
664 .flatMap(List::stream)
Carmelo Cascone7f75be42017-09-07 14:37:02 +0200665 .collect(Collectors.toList());
Carmelo Casconeb045ddc2017-09-01 01:26:35 +0200666
Carmelo Cascone81929aa2018-04-07 01:38:55 -0700667 return CounterEntryCodec.decodeCounterEntities(entities, pipeconf);
Carmelo Casconeb045ddc2017-09-01 01:26:35 +0200668 }
669
Carmelo Casconecb4327a2018-09-11 15:17:23 -0700670 private boolean doWriteActionProfileMembers(List<PiActionProfileMember> members,
671 WriteOperationType opType, PiPipeconf pipeconf) {
Carmelo Casconee44592f2018-09-12 02:24:47 -0700672 final List<ActionProfileMember> actionProfileMembers = Lists.newArrayList();
Carmelo Cascone5bc7e102018-02-18 18:27:55 -0800673
Carmelo Casconecb4327a2018-09-11 15:17:23 -0700674 for (PiActionProfileMember member : members) {
Carmelo Cascone5bc7e102018-02-18 18:27:55 -0800675 try {
Carmelo Casconee44592f2018-09-12 02:24:47 -0700676 actionProfileMembers.add(ActionProfileMemberEncoder.encode(member, pipeconf));
Carmelo Cascone5bc7e102018-02-18 18:27:55 -0800677 } catch (EncodeException | P4InfoBrowser.NotFoundException e) {
Carmelo Casconecb4327a2018-09-11 15:17:23 -0700678 log.warn("Unable to encode action profile member, aborting {} operation: {} [{}]",
Carmelo Cascone5bc7e102018-02-18 18:27:55 -0800679 opType.name(), e.getMessage(), member.toString());
680 return false;
Yi Tseng82512da2017-08-16 19:46:36 -0700681 }
Yi Tseng82512da2017-08-16 19:46:36 -0700682 }
683
Carmelo Casconee44592f2018-09-12 02:24:47 -0700684 final List<Update> updateMsgs = actionProfileMembers.stream()
Yi Tseng82512da2017-08-16 19:46:36 -0700685 .map(actionProfileMember ->
686 Update.newBuilder()
687 .setEntity(Entity.newBuilder()
688 .setActionProfileMember(actionProfileMember)
689 .build())
690 .setType(UPDATE_TYPES.get(opType))
691 .build())
692 .collect(Collectors.toList());
693
694 if (updateMsgs.size() == 0) {
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200695 // Nothing to update.
Yi Tseng82512da2017-08-16 19:46:36 -0700696 return true;
697 }
698
Carmelo Casconecb4327a2018-09-11 15:17:23 -0700699 return write(updateMsgs, members, opType, "action profile member");
Yi Tseng82512da2017-08-16 19:46:36 -0700700 }
701
Carmelo Casconecb4327a2018-09-11 15:17:23 -0700702 private List<PiActionProfileGroup> doDumpGroups(PiActionProfileId piActionProfileId, PiPipeconf pipeconf) {
Yi Tseng82512da2017-08-16 19:46:36 -0700703 log.debug("Dumping groups from action profile {} from {} (pipeconf {})...",
704 piActionProfileId.id(), deviceId, pipeconf.id());
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200705
706 final P4InfoBrowser browser = PipeconfHelper.getP4InfoBrowser(pipeconf);
Yi Tseng82512da2017-08-16 19:46:36 -0700707 if (browser == null) {
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200708 log.warn("Unable to get a P4Info browser for pipeconf {}, aborting dump action profile", pipeconf);
Carmelo Casconee44592f2018-09-12 02:24:47 -0700709 return Collections.emptyList();
Yi Tseng82512da2017-08-16 19:46:36 -0700710 }
711
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200712 final int actionProfileId;
Yi Tseng82512da2017-08-16 19:46:36 -0700713 try {
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200714 actionProfileId = browser
715 .actionProfiles()
Carmelo Casconecb0a49c2017-10-03 14:32:23 +0200716 .getByName(piActionProfileId.id())
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200717 .getPreamble()
718 .getId();
Yi Tseng82512da2017-08-16 19:46:36 -0700719 } catch (P4InfoBrowser.NotFoundException e) {
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200720 log.warn("Unable to dump groups: {}", e.getMessage());
Carmelo Casconee44592f2018-09-12 02:24:47 -0700721 return Collections.emptyList();
Yi Tseng82512da2017-08-16 19:46:36 -0700722 }
723
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200724 // Prepare read request to read all groups from the given action profile.
725 final ReadRequest groupRequestMsg = ReadRequest.newBuilder()
Yi Tseng82512da2017-08-16 19:46:36 -0700726 .setDeviceId(p4DeviceId)
727 .addEntities(Entity.newBuilder()
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200728 .setActionProfileGroup(
729 ActionProfileGroup.newBuilder()
730 .setActionProfileId(actionProfileId)
731 .build())
Yi Tseng82512da2017-08-16 19:46:36 -0700732 .build())
733 .build();
734
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200735 // Read groups.
736 final Iterator<ReadResponse> groupResponses;
Yi Tseng82512da2017-08-16 19:46:36 -0700737 try {
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200738 groupResponses = blockingStub.read(groupRequestMsg);
Yi Tseng82512da2017-08-16 19:46:36 -0700739 } catch (StatusRuntimeException e) {
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700740 checkGrpcException(e);
Carmelo Cascone5bc7e102018-02-18 18:27:55 -0800741 log.warn("Unable to dump action profile {} from {}: {}", piActionProfileId, deviceId, e.getMessage());
Carmelo Casconee44592f2018-09-12 02:24:47 -0700742 return Collections.emptyList();
Yi Tseng82512da2017-08-16 19:46:36 -0700743 }
744
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200745 final List<ActionProfileGroup> groupMsgs = Tools.stream(() -> groupResponses)
746 .map(ReadResponse::getEntitiesList)
747 .flatMap(List::stream)
748 .filter(entity -> entity.getEntityCase() == ACTION_PROFILE_GROUP)
749 .map(Entity::getActionProfileGroup)
750 .collect(Collectors.toList());
Yi Tseng82512da2017-08-16 19:46:36 -0700751
752 log.debug("Retrieved {} groups from action profile {} on {}...",
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200753 groupMsgs.size(), piActionProfileId.id(), deviceId);
Yi Tseng82512da2017-08-16 19:46:36 -0700754
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200755 // Returned groups contain only a minimal description of their members.
756 // We need to issue a new request to get the full description of each member.
Yi Tseng82512da2017-08-16 19:46:36 -0700757
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200758 // Keep a map of all member IDs for each group ID, will need it later.
759 final Multimap<Integer, Integer> groupIdToMemberIdsMap = HashMultimap.create();
760 groupMsgs.forEach(g -> groupIdToMemberIdsMap.putAll(
761 g.getGroupId(),
762 g.getMembersList().stream()
763 .map(ActionProfileGroup.Member::getMemberId)
764 .collect(Collectors.toList())));
Yi Tseng82512da2017-08-16 19:46:36 -0700765
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200766 // Prepare one big read request to read all members in one shot.
767 final Set<Entity> entityMsgs = groupMsgs.stream()
768 .flatMap(g -> g.getMembersList().stream())
769 .map(ActionProfileGroup.Member::getMemberId)
770 // Prevent issuing many read requests for the same member.
771 .distinct()
772 .map(id -> ActionProfileMember.newBuilder()
773 .setActionProfileId(actionProfileId)
774 .setMemberId(id)
775 .build())
776 .map(m -> Entity.newBuilder()
777 .setActionProfileMember(m)
778 .build())
779 .collect(Collectors.toSet());
780 final ReadRequest memberRequestMsg = ReadRequest.newBuilder().setDeviceId(p4DeviceId)
781 .addAllEntities(entityMsgs)
782 .build();
Yi Tseng82512da2017-08-16 19:46:36 -0700783
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200784 // Read members.
785 final Iterator<ReadResponse> memberResponses;
786 try {
787 memberResponses = blockingStub.read(memberRequestMsg);
788 } catch (StatusRuntimeException e) {
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700789 checkGrpcException(e);
Carmelo Cascone5bc7e102018-02-18 18:27:55 -0800790 log.warn("Unable to read members of action profile {} from {}: {}",
791 piActionProfileId, deviceId, e.getMessage());
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200792 return Collections.emptyList();
Yi Tseng82512da2017-08-16 19:46:36 -0700793 }
794
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200795 final Multimap<Integer, ActionProfileMember> groupIdToMembersMap = HashMultimap.create();
796 Tools.stream(() -> memberResponses)
797 .map(ReadResponse::getEntitiesList)
798 .flatMap(List::stream)
799 .filter(e -> e.getEntityCase() == ACTION_PROFILE_MEMBER)
800 .map(Entity::getActionProfileMember)
801 .forEach(member -> groupIdToMemberIdsMap.asMap()
802 // Get all group IDs that contain this member.
803 .entrySet()
804 .stream()
805 .filter(entry -> entry.getValue().contains(member.getMemberId()))
806 .map(Map.Entry::getKey)
807 .forEach(gid -> groupIdToMembersMap.put(gid, member)));
808
Carmelo Casconecb4327a2018-09-11 15:17:23 -0700809 log.debug("Retrieved {} members from action profile {} on {}...",
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200810 groupIdToMembersMap.size(), piActionProfileId.id(), deviceId);
811
812 return groupMsgs.stream()
813 .map(groupMsg -> {
814 try {
815 return ActionProfileGroupEncoder.decode(groupMsg,
816 groupIdToMembersMap.get(groupMsg.getGroupId()),
817 pipeconf);
818 } catch (P4InfoBrowser.NotFoundException | EncodeException e) {
819 log.warn("Unable to decode group: {}\n {}", e.getMessage(), groupMsg);
820 return null;
821 }
822 })
823 .filter(Objects::nonNull)
824 .collect(Collectors.toList());
Yi Tseng82512da2017-08-16 19:46:36 -0700825 }
826
Carmelo Casconecb4327a2018-09-11 15:17:23 -0700827 private List<PiActionProfileMemberId> doDumpActionProfileMemberIds(
Carmelo Casconee44592f2018-09-12 02:24:47 -0700828 PiActionProfileId actionProfileId, PiPipeconf pipeconf) {
829
830 final P4InfoBrowser browser = PipeconfHelper.getP4InfoBrowser(pipeconf);
831 if (browser == null) {
832 log.warn("Unable to get a P4Info browser for pipeconf {}, " +
833 "aborting cleanup of action profile members",
834 pipeconf);
835 return Collections.emptyList();
836 }
837
838 final int p4ActProfId;
839 try {
840 p4ActProfId = browser
841 .actionProfiles()
842 .getByName(actionProfileId.id())
843 .getPreamble()
844 .getId();
845 } catch (P4InfoBrowser.NotFoundException e) {
846 log.warn("Unable to cleanup action profile members: {}", e.getMessage());
847 return Collections.emptyList();
848 }
849
850 final ReadRequest memberRequestMsg = ReadRequest.newBuilder()
851 .setDeviceId(p4DeviceId)
852 .addEntities(Entity.newBuilder().setActionProfileMember(
853 ActionProfileMember.newBuilder()
854 .setActionProfileId(p4ActProfId)
855 .build()).build())
856 .build();
857
858 // Read members.
859 final Iterator<ReadResponse> memberResponses;
860 try {
861 memberResponses = blockingStub.read(memberRequestMsg);
862 } catch (StatusRuntimeException e) {
863 checkGrpcException(e);
864 log.warn("Unable to read members of action profile {} from {}: {}",
865 actionProfileId, deviceId, e.getMessage());
866 return Collections.emptyList();
867 }
868
869 return Tools.stream(() -> memberResponses)
870 .map(ReadResponse::getEntitiesList)
871 .flatMap(List::stream)
872 .filter(e -> e.getEntityCase() == ACTION_PROFILE_MEMBER)
873 .map(Entity::getActionProfileMember)
874 // Perhaps not needed, but better to double check to avoid
875 // removing members of other groups.
876 .filter(m -> m.getActionProfileId() == p4ActProfId)
877 .map(ActionProfileMember::getMemberId)
Carmelo Casconecb4327a2018-09-11 15:17:23 -0700878 .map(PiActionProfileMemberId::of)
Carmelo Casconee44592f2018-09-12 02:24:47 -0700879 .collect(Collectors.toList());
880 }
881
Carmelo Casconecb4327a2018-09-11 15:17:23 -0700882 private List<PiActionProfileMemberId> doRemoveActionProfileMembers(
Carmelo Casconee44592f2018-09-12 02:24:47 -0700883 PiActionProfileId actionProfileId,
Carmelo Casconecb4327a2018-09-11 15:17:23 -0700884 List<PiActionProfileMemberId> memberIds,
Carmelo Casconee44592f2018-09-12 02:24:47 -0700885 PiPipeconf pipeconf) {
886
887 if (memberIds.isEmpty()) {
888 return Collections.emptyList();
889 }
890
891 final P4InfoBrowser browser = PipeconfHelper.getP4InfoBrowser(pipeconf);
892 if (browser == null) {
893 log.warn("Unable to get a P4Info browser for pipeconf {}, " +
894 "aborting cleanup of action profile members",
895 pipeconf);
896 return Collections.emptyList();
897 }
898
899 final int p4ActProfId;
900 try {
901 p4ActProfId = browser.actionProfiles()
902 .getByName(actionProfileId.id()).getPreamble().getId();
903 } catch (P4InfoBrowser.NotFoundException e) {
904 log.warn("Unable to cleanup action profile members: {}", e.getMessage());
905 return Collections.emptyList();
906 }
907
908 final List<Update> updateMsgs = memberIds.stream()
909 .map(m -> ActionProfileMember.newBuilder()
910 .setActionProfileId(p4ActProfId)
911 .setMemberId(m.id()).build())
912 .map(m -> Entity.newBuilder().setActionProfileMember(m).build())
913 .map(e -> Update.newBuilder().setEntity(e)
914 .setType(Update.Type.DELETE).build())
915 .collect(Collectors.toList());
916
917 log.debug("Removing {} members of action profile '{}'...",
918 memberIds.size(), actionProfileId);
919
920 return writeAndReturnSuccessEntities(
921 updateMsgs, memberIds, WriteOperationType.DELETE,
922 "action profile members");
923 }
924
Carmelo Casconecb4327a2018-09-11 15:17:23 -0700925 private boolean doWriteActionProfileGroup(
926 PiActionProfileGroup group, WriteOperationType opType, PiPipeconf pipeconf,
ghj0504520ed7340c2018-10-26 13:06:35 -0700927 int maxMemberSize) {
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200928 final ActionProfileGroup actionProfileGroup;
ghj0504520ed7340c2018-10-26 13:06:35 -0700929 if (opType == P4RuntimeClient.WriteOperationType.INSERT && maxMemberSize < group.members().size()) {
930 log.warn("Unable to encode group, since group member larger than maximum member size");
931 return false;
932 }
Yi Tseng82512da2017-08-16 19:46:36 -0700933 try {
ghj0504520ed7340c2018-10-26 13:06:35 -0700934 actionProfileGroup = ActionProfileGroupEncoder.encode(group, pipeconf, maxMemberSize);
Yi Tseng82512da2017-08-16 19:46:36 -0700935 } catch (EncodeException | P4InfoBrowser.NotFoundException e) {
Carmelo Cascone5bc7e102018-02-18 18:27:55 -0800936 log.warn("Unable to encode group, aborting {} operation: {}", e.getMessage(), opType.name());
Yi Tseng82512da2017-08-16 19:46:36 -0700937 return false;
938 }
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200939
Carmelo Cascone58136812018-07-19 03:40:16 +0200940 final Update updateMsg = Update.newBuilder()
941 .setEntity(Entity.newBuilder()
942 .setActionProfileGroup(actionProfileGroup)
943 .build())
944 .setType(UPDATE_TYPES.get(opType))
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200945 .build();
Carmelo Cascone58136812018-07-19 03:40:16 +0200946
Carmelo Casconee44592f2018-09-12 02:24:47 -0700947 return write(singletonList(updateMsg), singletonList(group),
Carmelo Cascone58136812018-07-19 03:40:16 +0200948 opType, "group");
Yi Tseng82512da2017-08-16 19:46:36 -0700949 }
950
Carmelo Casconee44592f2018-09-12 02:24:47 -0700951 private List<PiMeterCellConfig> doReadAllMeterCells(
952 List<PiMeterId> meterIds, PiPipeconf pipeconf) {
Carmelo Cascone81929aa2018-04-07 01:38:55 -0700953 return doReadMeterEntities(MeterEntryCodec.readAllCellsEntities(
954 meterIds, pipeconf), pipeconf);
955 }
Frank Wangd7e3b4b2017-09-24 13:37:54 +0900956
Carmelo Casconee44592f2018-09-12 02:24:47 -0700957 private List<PiMeterCellConfig> doReadMeterCells(
958 List<PiMeterCellId> cellIds, PiPipeconf pipeconf) {
Carmelo Cascone81929aa2018-04-07 01:38:55 -0700959
Carmelo Casconee44592f2018-09-12 02:24:47 -0700960 final List<PiMeterCellConfig> piMeterCellConfigs = cellIds.stream()
Frank Wangd7e3b4b2017-09-24 13:37:54 +0900961 .map(cellId -> PiMeterCellConfig.builder()
Carmelo Cascone81929aa2018-04-07 01:38:55 -0700962 .withMeterCellId(cellId)
963 .build())
Frank Wangd7e3b4b2017-09-24 13:37:54 +0900964 .collect(Collectors.toList());
965
Carmelo Cascone81929aa2018-04-07 01:38:55 -0700966 return doReadMeterEntities(MeterEntryCodec.encodePiMeterCellConfigs(
967 piMeterCellConfigs, pipeconf), pipeconf);
968 }
969
Carmelo Casconee44592f2018-09-12 02:24:47 -0700970 private List<PiMeterCellConfig> doReadMeterEntities(
971 List<Entity> entitiesToRead, PiPipeconf pipeconf) {
Carmelo Cascone81929aa2018-04-07 01:38:55 -0700972
973 if (entitiesToRead.size() == 0) {
974 return Collections.emptyList();
975 }
976
Frank Wangd7e3b4b2017-09-24 13:37:54 +0900977 final ReadRequest request = ReadRequest.newBuilder()
978 .setDeviceId(p4DeviceId)
Carmelo Cascone81929aa2018-04-07 01:38:55 -0700979 .addAllEntities(entitiesToRead)
Frank Wangd7e3b4b2017-09-24 13:37:54 +0900980 .build();
981
Frank Wangd7e3b4b2017-09-24 13:37:54 +0900982 final Iterable<ReadResponse> responses;
983 try {
984 responses = () -> blockingStub.read(request);
985 } catch (StatusRuntimeException e) {
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700986 checkGrpcException(e);
Carmelo Cascone81929aa2018-04-07 01:38:55 -0700987 log.warn("Unable to read meter cells: {}", e.getMessage());
Frank Wangd7e3b4b2017-09-24 13:37:54 +0900988 log.debug("exception", e);
989 return Collections.emptyList();
990 }
991
Carmelo Cascone81929aa2018-04-07 01:38:55 -0700992 List<Entity> responseEntities = StreamSupport
993 .stream(responses.spliterator(), false)
Frank Wangd7e3b4b2017-09-24 13:37:54 +0900994 .map(ReadResponse::getEntitiesList)
995 .flatMap(List::stream)
996 .collect(Collectors.toList());
997
Carmelo Cascone81929aa2018-04-07 01:38:55 -0700998 return MeterEntryCodec.decodeMeterEntities(responseEntities, pipeconf);
Frank Wangd7e3b4b2017-09-24 13:37:54 +0900999 }
1000
Carmelo Casconee44592f2018-09-12 02:24:47 -07001001 private boolean doWriteMeterCells(List<PiMeterCellConfig> cellConfigs, PiPipeconf pipeconf) {
Frank Wangd7e3b4b2017-09-24 13:37:54 +09001002
Carmelo Casconee44592f2018-09-12 02:24:47 -07001003 List<Update> updateMsgs = MeterEntryCodec.encodePiMeterCellConfigs(cellConfigs, pipeconf)
Frank Wangd7e3b4b2017-09-24 13:37:54 +09001004 .stream()
1005 .map(meterEntryMsg ->
1006 Update.newBuilder()
1007 .setEntity(meterEntryMsg)
1008 .setType(UPDATE_TYPES.get(WriteOperationType.MODIFY))
1009 .build())
1010 .collect(Collectors.toList());
1011
1012 if (updateMsgs.size() == 0) {
1013 return true;
1014 }
1015
Carmelo Cascone58136812018-07-19 03:40:16 +02001016 return write(updateMsgs, cellConfigs, WriteOperationType.MODIFY, "meter cell config");
1017 }
1018
1019 private boolean doWriteMulticastGroupEntries(
Carmelo Casconee44592f2018-09-12 02:24:47 -07001020 List<PiMulticastGroupEntry> entries,
Carmelo Cascone58136812018-07-19 03:40:16 +02001021 WriteOperationType opType) {
1022
1023 final List<Update> updateMsgs = entries.stream()
Carmelo Cascone03ae0ac2018-10-11 08:31:59 -07001024 .map(piEntry -> {
1025 try {
1026 return MulticastGroupEntryCodec.encode(piEntry);
1027 } catch (EncodeException e) {
1028 log.warn("Unable to encode PiMulticastGroupEntry: {}", e.getMessage());
1029 return null;
1030 }
1031 })
1032 .filter(Objects::nonNull)
Carmelo Cascone58136812018-07-19 03:40:16 +02001033 .map(mcMsg -> PacketReplicationEngineEntry.newBuilder()
1034 .setMulticastGroupEntry(mcMsg)
1035 .build())
1036 .map(preMsg -> Entity.newBuilder()
1037 .setPacketReplicationEngineEntry(preMsg)
1038 .build())
1039 .map(entityMsg -> Update.newBuilder()
1040 .setEntity(entityMsg)
1041 .setType(UPDATE_TYPES.get(opType))
1042 .build())
1043 .collect(Collectors.toList());
1044 return write(updateMsgs, entries, opType, "multicast group entry");
1045 }
1046
Carmelo Casconee44592f2018-09-12 02:24:47 -07001047 private List<PiMulticastGroupEntry> doReadAllMulticastGroupEntries() {
Carmelo Cascone58136812018-07-19 03:40:16 +02001048
1049 final Entity entity = Entity.newBuilder()
1050 .setPacketReplicationEngineEntry(
1051 PacketReplicationEngineEntry.newBuilder()
1052 .setMulticastGroupEntry(
1053 MulticastGroupEntry.newBuilder()
1054 .build())
1055 .build())
1056 .build();
1057
1058 final ReadRequest req = ReadRequest.newBuilder()
1059 .setDeviceId(p4DeviceId)
1060 .addEntities(entity)
1061 .build();
1062
1063 Iterator<ReadResponse> responses;
1064 try {
1065 responses = blockingStub.read(req);
1066 } catch (StatusRuntimeException e) {
Carmelo Cascone9e4972c2018-08-30 00:29:16 -07001067 checkGrpcException(e);
Carmelo Cascone58136812018-07-19 03:40:16 +02001068 log.warn("Unable to read multicast group entries from {}: {}", deviceId, e.getMessage());
1069 return Collections.emptyList();
1070 }
1071
1072 Iterable<ReadResponse> responseIterable = () -> responses;
1073 final List<PiMulticastGroupEntry> mcEntries = StreamSupport
1074 .stream(responseIterable.spliterator(), false)
1075 .map(ReadResponse::getEntitiesList)
1076 .flatMap(List::stream)
1077 .filter(e -> e.getEntityCase()
1078 .equals(PACKET_REPLICATION_ENGINE_ENTRY))
1079 .map(Entity::getPacketReplicationEngineEntry)
1080 .filter(e -> e.getTypeCase().equals(MULTICAST_GROUP_ENTRY))
1081 .map(PacketReplicationEngineEntry::getMulticastGroupEntry)
1082 .map(MulticastGroupEntryCodec::decode)
1083 .collect(Collectors.toList());
1084
1085 log.debug("Retrieved {} multicast group entries from {}...",
1086 mcEntries.size(), deviceId);
1087
1088 return mcEntries;
1089 }
1090
Carmelo Casconee44592f2018-09-12 02:24:47 -07001091 private <T> boolean write(List<Update> updates,
1092 List<T> writeEntities,
1093 WriteOperationType opType,
1094 String entryType) {
1095 // True if all entities were successfully written.
Carmelo Cascone03ae0ac2018-10-11 08:31:59 -07001096 return writeAndReturnSuccessEntities(updates, writeEntities, opType, entryType)
1097 .size() == writeEntities.size();
Carmelo Casconee44592f2018-09-12 02:24:47 -07001098 }
Carmelo Cascone9e4972c2018-08-30 00:29:16 -07001099
Carmelo Casconee44592f2018-09-12 02:24:47 -07001100 private <T> List<T> writeAndReturnSuccessEntities(
1101 List<Update> updates, List<T> writeEntities,
1102 WriteOperationType opType, String entryType) {
Carmelo Cascone03ae0ac2018-10-11 08:31:59 -07001103 if (updates.isEmpty()) {
1104 return Collections.emptyList();
1105 }
1106 if (updates.size() != writeEntities.size()) {
1107 log.error("Cannot perform {} operation, provided {} " +
1108 "update messages for {} {} - BUG?",
1109 opType, updates.size(), writeEntities.size(), entryType);
1110 return Collections.emptyList();
1111 }
Carmelo Casconee44592f2018-09-12 02:24:47 -07001112 try {
Carmelo Cascone9e4972c2018-08-30 00:29:16 -07001113 //noinspection ResultOfMethodCallIgnored
Carmelo Cascone58136812018-07-19 03:40:16 +02001114 blockingStub.write(writeRequest(updates));
Carmelo Casconee44592f2018-09-12 02:24:47 -07001115 return writeEntities;
Carmelo Cascone58136812018-07-19 03:40:16 +02001116 } catch (StatusRuntimeException e) {
Carmelo Casconee44592f2018-09-12 02:24:47 -07001117 return checkAndLogWriteErrors(writeEntities, e, opType, entryType);
Carmelo Cascone58136812018-07-19 03:40:16 +02001118 }
1119 }
1120
1121 private WriteRequest writeRequest(Iterable<Update> updateMsgs) {
1122 return WriteRequest.newBuilder()
Frank Wangd7e3b4b2017-09-24 13:37:54 +09001123 .setDeviceId(p4DeviceId)
Carmelo Casconee5b28722018-06-22 17:28:28 +02001124 .setElectionId(clientElectionId)
Frank Wangd7e3b4b2017-09-24 13:37:54 +09001125 .addAllUpdates(updateMsgs)
1126 .build();
Frank Wangd7e3b4b2017-09-24 13:37:54 +09001127 }
1128
Yi Tseng2a340f72018-11-02 16:52:47 -07001129 protected Void doShutdown() {
Carmelo Cascone9e4972c2018-08-30 00:29:16 -07001130 streamChannelManager.complete();
Yi Tseng2a340f72018-11-02 16:52:47 -07001131 return super.doShutdown();
Carmelo Casconef7aa3f92017-07-06 23:56:50 -04001132 }
1133
Carmelo Casconee44592f2018-09-12 02:24:47 -07001134 // Returns the collection of succesfully write entities.
1135 private <T> List<T> checkAndLogWriteErrors(
1136 List<T> writeEntities, StatusRuntimeException ex,
Carmelo Casconee5b28722018-06-22 17:28:28 +02001137 WriteOperationType opType, String entryType) {
1138
1139 checkGrpcException(ex);
1140
Carmelo Cascone943c6642018-09-11 13:01:03 -07001141 final List<P4RuntimeOuterClass.Error> errors = extractWriteErrorDetails(ex);
Carmelo Cascone5bc7e102018-02-18 18:27:55 -08001142
Carmelo Cascone943c6642018-09-11 13:01:03 -07001143 if (errors.isEmpty()) {
1144 final String description = ex.getStatus().getDescription();
1145 log.warn("Unable to {} {} {}(s) on {}: {}",
Carmelo Cascone50d195f2018-09-11 13:26:38 -07001146 opType.name(), writeEntities.size(), entryType, deviceId,
1147 ex.getStatus().getCode().name(),
Carmelo Cascone943c6642018-09-11 13:01:03 -07001148 description == null ? "" : " - " + description);
Carmelo Casconee44592f2018-09-12 02:24:47 -07001149 return Collections.emptyList();
Carmelo Cascone5bc7e102018-02-18 18:27:55 -08001150 }
1151
Carmelo Cascone5bc7e102018-02-18 18:27:55 -08001152 if (errors.size() == writeEntities.size()) {
Carmelo Casconee44592f2018-09-12 02:24:47 -07001153 List<T> okEntities = Lists.newArrayList();
1154 Iterator<T> entityIterator = writeEntities.iterator();
1155 for (P4RuntimeOuterClass.Error error : errors) {
1156 T entity = entityIterator.next();
1157 if (error.getCanonicalCode() != Status.OK.getCode().value()) {
1158 log.warn("Unable to {} {} on {}: {} [{}]",
1159 opType.name(), entryType, deviceId,
1160 parseP4Error(error), entity.toString());
1161 } else {
1162 okEntities.add(entity);
1163 }
1164 }
1165 return okEntities;
Carmelo Cascone5bc7e102018-02-18 18:27:55 -08001166 } else {
Carmelo Casconeb5324e72018-11-25 02:26:32 -08001167 log.warn("Unable to reconcile error details to {} updates " +
Carmelo Casconee44592f2018-09-12 02:24:47 -07001168 "(sent {} updates, but device returned {} errors)",
1169 entryType, writeEntities.size(), errors.size());
Carmelo Cascone5bc7e102018-02-18 18:27:55 -08001170 errors.stream()
1171 .filter(err -> err.getCanonicalCode() != Status.OK.getCode().value())
1172 .forEach(err -> log.warn("Unable to {} {} (unknown): {}",
Carmelo Cascone58136812018-07-19 03:40:16 +02001173 opType.name(), entryType, parseP4Error(err)));
Carmelo Casconee44592f2018-09-12 02:24:47 -07001174 return Collections.emptyList();
Carmelo Cascone5bc7e102018-02-18 18:27:55 -08001175 }
1176 }
1177
1178 private List<P4RuntimeOuterClass.Error> extractWriteErrorDetails(
Carmelo Cascone943c6642018-09-11 13:01:03 -07001179 StatusRuntimeException ex) {
1180 if (!ex.getTrailers().containsKey(STATUS_DETAILS_KEY)) {
Carmelo Cascone5bc7e102018-02-18 18:27:55 -08001181 return Collections.emptyList();
1182 }
Carmelo Cascone943c6642018-09-11 13:01:03 -07001183 com.google.rpc.Status status = ex.getTrailers().get(STATUS_DETAILS_KEY);
1184 if (status == null) {
1185 return Collections.emptyList();
1186 }
Carmelo Cascone5bc7e102018-02-18 18:27:55 -08001187 return status.getDetailsList().stream()
1188 .map(any -> {
1189 try {
1190 return any.unpack(P4RuntimeOuterClass.Error.class);
1191 } catch (InvalidProtocolBufferException e) {
1192 log.warn("Unable to unpack P4Runtime Error: {}",
1193 any.toString());
1194 return null;
1195 }
1196 })
1197 .filter(Objects::nonNull)
1198 .collect(Collectors.toList());
Carmelo Cascone5bc7e102018-02-18 18:27:55 -08001199 }
1200
1201 private String parseP4Error(P4RuntimeOuterClass.Error err) {
Carmelo Cascone943c6642018-09-11 13:01:03 -07001202 return format("%s %s%s (%s:%d)",
1203 Status.fromCodeValue(err.getCanonicalCode()).getCode(),
Carmelo Cascone5bc7e102018-02-18 18:27:55 -08001204 err.getMessage(),
Carmelo Cascone943c6642018-09-11 13:01:03 -07001205 err.hasDetails() ? ", " + err.getDetails().toString() : "",
Carmelo Cascone5bc7e102018-02-18 18:27:55 -08001206 err.getSpace(),
Carmelo Cascone943c6642018-09-11 13:01:03 -07001207 err.getCode());
Carmelo Cascone5bc7e102018-02-18 18:27:55 -08001208 }
1209
Carmelo Casconee5b28722018-06-22 17:28:28 +02001210 private void checkGrpcException(StatusRuntimeException ex) {
1211 switch (ex.getStatus().getCode()) {
1212 case OK:
1213 break;
1214 case CANCELLED:
1215 break;
1216 case UNKNOWN:
1217 break;
1218 case INVALID_ARGUMENT:
1219 break;
1220 case DEADLINE_EXCEEDED:
1221 break;
1222 case NOT_FOUND:
1223 break;
1224 case ALREADY_EXISTS:
1225 break;
1226 case PERMISSION_DENIED:
1227 // Notify upper layers that this node is not master.
1228 controller.postEvent(new P4RuntimeEvent(
Carmelo Casconede3b6842018-09-05 17:45:10 -07001229 P4RuntimeEvent.Type.PERMISSION_DENIED,
1230 new BaseP4RuntimeEventSubject(deviceId)));
Carmelo Casconee5b28722018-06-22 17:28:28 +02001231 break;
1232 case RESOURCE_EXHAUSTED:
1233 break;
1234 case FAILED_PRECONDITION:
1235 break;
1236 case ABORTED:
1237 break;
1238 case OUT_OF_RANGE:
1239 break;
1240 case UNIMPLEMENTED:
1241 break;
1242 case INTERNAL:
1243 break;
1244 case UNAVAILABLE:
1245 // Channel might be closed.
1246 controller.postEvent(new P4RuntimeEvent(
1247 P4RuntimeEvent.Type.CHANNEL_EVENT,
1248 new ChannelEvent(deviceId, ChannelEvent.Type.ERROR)));
1249 break;
1250 case DATA_LOSS:
1251 break;
1252 case UNAUTHENTICATED:
1253 break;
1254 default:
1255 break;
1256 }
1257 }
1258
1259 private Uint128 bigIntegerToUint128(BigInteger value) {
1260 final byte[] arr = value.toByteArray();
1261 final ByteBuffer bb = ByteBuffer.allocate(Long.BYTES * 2)
1262 .put(new byte[Long.BYTES * 2 - arr.length])
1263 .put(arr);
1264 bb.rewind();
1265 return Uint128.newBuilder()
1266 .setHigh(bb.getLong())
1267 .setLow(bb.getLong())
1268 .build();
1269 }
1270
1271 private BigInteger uint128ToBigInteger(Uint128 value) {
1272 return new BigInteger(
1273 ByteBuffer.allocate(Long.BYTES * 2)
1274 .putLong(value.getHigh())
1275 .putLong(value.getLow())
1276 .array());
1277 }
1278
Carmelo Cascone8d99b172017-07-18 17:26:31 -04001279 /**
Carmelo Cascone9e4972c2018-08-30 00:29:16 -07001280 * A manager for the P4Runtime stream channel that opportunistically creates
1281 * new stream RCP stubs (e.g. when one fails because of errors) and posts
1282 * channel events via the P4Runtime controller.
1283 */
1284 private final class StreamChannelManager {
1285
1286 private final ManagedChannel channel;
1287 private final AtomicBoolean open;
1288 private final StreamObserver<StreamMessageResponse> responseObserver;
1289 private ClientCallStreamObserver<StreamMessageRequest> requestObserver;
1290
1291 private StreamChannelManager(ManagedChannel channel) {
1292 this.channel = channel;
1293 this.responseObserver = new InternalStreamResponseObserver(this);
1294 this.open = new AtomicBoolean(false);
1295 }
1296
1297 private void initIfRequired() {
1298 if (requestObserver == null) {
1299 log.debug("Creating new stream channel for {}...", deviceId);
1300 requestObserver =
1301 (ClientCallStreamObserver<StreamMessageRequest>)
1302 P4RuntimeGrpc.newStub(channel)
1303 .streamChannel(responseObserver);
1304 open.set(false);
1305 }
1306 }
1307
1308 public boolean send(StreamMessageRequest value) {
1309 synchronized (this) {
1310 initIfRequired();
1311 try {
1312 requestObserver.onNext(value);
1313 // FIXME
1314 // signalOpen();
1315 return true;
1316 } catch (Throwable ex) {
1317 if (ex instanceof StatusRuntimeException) {
1318 log.warn("Unable to send {} to {}: {}",
1319 value.getUpdateCase().toString(), deviceId, ex.getMessage());
1320 } else {
1321 log.warn(format(
1322 "Exception while sending %s to %s",
1323 value.getUpdateCase().toString(), deviceId), ex);
1324 }
1325 complete();
1326 return false;
1327 }
1328 }
1329 }
1330
1331 public void complete() {
1332 synchronized (this) {
1333 signalClosed();
1334 if (requestObserver != null) {
1335 requestObserver.onCompleted();
1336 requestObserver.cancel("Terminated", null);
1337 requestObserver = null;
1338 }
1339 }
1340 }
1341
1342 void signalOpen() {
1343 synchronized (this) {
1344 final boolean wasOpen = open.getAndSet(true);
1345 if (!wasOpen) {
1346 controller.postEvent(new P4RuntimeEvent(
1347 P4RuntimeEvent.Type.CHANNEL_EVENT,
1348 new ChannelEvent(deviceId, ChannelEvent.Type.OPEN)));
1349 }
1350 }
1351 }
1352
1353 void signalClosed() {
1354 synchronized (this) {
1355 final boolean wasOpen = open.getAndSet(false);
1356 if (wasOpen) {
1357 controller.postEvent(new P4RuntimeEvent(
1358 P4RuntimeEvent.Type.CHANNEL_EVENT,
1359 new ChannelEvent(deviceId, ChannelEvent.Type.CLOSED)));
1360 }
1361 }
1362 }
1363
1364 public boolean isOpen() {
1365 return open.get();
1366 }
1367 }
1368
1369 /**
Carmelo Cascone8d99b172017-07-18 17:26:31 -04001370 * Handles messages received from the device on the stream channel.
1371 */
Carmelo Cascone9e4972c2018-08-30 00:29:16 -07001372 private final class InternalStreamResponseObserver
Carmelo Casconee5b28722018-06-22 17:28:28 +02001373 implements StreamObserver<StreamMessageResponse> {
Carmelo Casconef7aa3f92017-07-06 23:56:50 -04001374
Carmelo Cascone9e4972c2018-08-30 00:29:16 -07001375 private final StreamChannelManager streamChannelManager;
1376
1377 private InternalStreamResponseObserver(
1378 StreamChannelManager streamChannelManager) {
1379 this.streamChannelManager = streamChannelManager;
1380 }
1381
Carmelo Casconef7aa3f92017-07-06 23:56:50 -04001382 @Override
1383 public void onNext(StreamMessageResponse message) {
Carmelo Cascone9e4972c2018-08-30 00:29:16 -07001384 streamChannelManager.signalOpen();
Carmelo Cascone8d99b172017-07-18 17:26:31 -04001385 executorService.submit(() -> doNext(message));
1386 }
Carmelo Casconef7aa3f92017-07-06 23:56:50 -04001387
Carmelo Cascone8d99b172017-07-18 17:26:31 -04001388 private void doNext(StreamMessageResponse message) {
Carmelo Casconea966c342017-07-30 01:56:30 -04001389 try {
Carmelo Casconee5b28722018-06-22 17:28:28 +02001390 log.debug("Received message on stream channel from {}: {}",
1391 deviceId, message.getUpdateCase());
Carmelo Casconea966c342017-07-30 01:56:30 -04001392 switch (message.getUpdateCase()) {
1393 case PACKET:
Carmelo Casconea966c342017-07-30 01:56:30 -04001394 doPacketIn(message.getPacket());
Andrea Campanella288b2732017-07-28 14:16:16 +02001395 return;
Carmelo Casconea966c342017-07-30 01:56:30 -04001396 case ARBITRATION:
Carmelo Casconee5b28722018-06-22 17:28:28 +02001397 doArbitrationResponse(message.getArbitration());
Carmelo Casconea966c342017-07-30 01:56:30 -04001398 return;
1399 default:
Carmelo Casconee5b28722018-06-22 17:28:28 +02001400 log.warn("Unrecognized stream message from {}: {}",
1401 deviceId, message.getUpdateCase());
Carmelo Casconea966c342017-07-30 01:56:30 -04001402 }
1403 } catch (Throwable ex) {
Carmelo Casconee5b28722018-06-22 17:28:28 +02001404 log.error("Exception while processing stream message from {}",
1405 deviceId, ex);
Carmelo Casconef7aa3f92017-07-06 23:56:50 -04001406 }
Carmelo Casconef7aa3f92017-07-06 23:56:50 -04001407 }
1408
1409 @Override
1410 public void onError(Throwable throwable) {
Carmelo Cascone9e4972c2018-08-30 00:29:16 -07001411 if (throwable instanceof StatusRuntimeException) {
1412 StatusRuntimeException sre = (StatusRuntimeException) throwable;
1413 if (sre.getStatus().getCause() instanceof ConnectException) {
1414 log.warn("Device {} is unreachable ({})",
1415 deviceId, sre.getCause().getMessage());
1416 } else {
1417 log.warn("Received error on stream channel for {}: {}",
1418 deviceId, throwable.getMessage());
1419 }
1420 } else {
1421 log.warn(format("Received exception on stream channel for %s",
1422 deviceId), throwable);
1423 }
1424 streamChannelManager.complete();
Carmelo Casconef7aa3f92017-07-06 23:56:50 -04001425 }
1426
1427 @Override
1428 public void onCompleted() {
Carmelo Cascone59f57de2017-07-11 19:55:09 -04001429 log.warn("Stream channel for {} has completed", deviceId);
Carmelo Cascone9e4972c2018-08-30 00:29:16 -07001430 streamChannelManager.complete();
Carmelo Casconef7aa3f92017-07-06 23:56:50 -04001431 }
1432 }
Carmelo Cascone87892e22017-11-13 16:01:29 -08001433}