blob: 6ac478f0505fbbed4f201daa90a39ff6db8c6963 [file] [log] [blame]
Carmelo Casconef7aa3f92017-07-06 23:56:50 -04001/*
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2017-present Open Networking Foundation
Carmelo Casconef7aa3f92017-07-06 23:56:50 -04003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.p4runtime.ctl;
18
Carmelo Cascone8d99b172017-07-18 17:26:31 -040019import com.google.common.collect.ImmutableMap;
Yi Tseng82512da2017-08-16 19:46:36 -070020import com.google.common.collect.Lists;
Carmelo Cascone50d195f2018-09-11 13:26:38 -070021import com.google.common.collect.Sets;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040022import com.google.protobuf.ByteString;
Yi Tsenge67e1412018-01-31 17:35:20 -080023import com.google.protobuf.InvalidProtocolBufferException;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040024import io.grpc.ManagedChannel;
Carmelo Cascone943c6642018-09-11 13:01:03 -070025import io.grpc.Metadata;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040026import io.grpc.Status;
27import io.grpc.StatusRuntimeException;
Carmelo Cascone943c6642018-09-11 13:01:03 -070028import io.grpc.protobuf.lite.ProtoLiteUtils;
Carmelo Cascone9e4972c2018-08-30 00:29:16 -070029import io.grpc.stub.ClientCallStreamObserver;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040030import io.grpc.stub.StreamObserver;
Andrea Campanella288b2732017-07-28 14:16:16 +020031import org.onlab.osgi.DefaultServiceDirectory;
Yi Tseng82512da2017-08-16 19:46:36 -070032import org.onlab.util.Tools;
Yi Tseng2a340f72018-11-02 16:52:47 -070033import org.onosproject.grpc.ctl.AbstractGrpcClient;
Carmelo Cascone87892e22017-11-13 16:01:29 -080034import org.onosproject.net.pi.model.PiActionProfileId;
35import org.onosproject.net.pi.model.PiCounterId;
Carmelo Cascone81929aa2018-04-07 01:38:55 -070036import org.onosproject.net.pi.model.PiMeterId;
Andrea Campanella432f7182017-07-14 18:43:27 +020037import org.onosproject.net.pi.model.PiPipeconf;
Carmelo Cascone87892e22017-11-13 16:01:29 -080038import org.onosproject.net.pi.model.PiTableId;
Carmelo Casconecb4327a2018-09-11 15:17:23 -070039import org.onosproject.net.pi.runtime.PiActionProfileGroup;
40import org.onosproject.net.pi.runtime.PiActionProfileMember;
41import org.onosproject.net.pi.runtime.PiActionProfileMemberId;
steven308017632e152018-10-20 00:51:08 +080042import org.onosproject.net.pi.runtime.PiCounterCell;
Carmelo Casconeb045ddc2017-09-01 01:26:35 +020043import org.onosproject.net.pi.runtime.PiCounterCellId;
Frank Wangd7e3b4b2017-09-24 13:37:54 +090044import org.onosproject.net.pi.runtime.PiMeterCellConfig;
45import org.onosproject.net.pi.runtime.PiMeterCellId;
Carmelo Cascone58136812018-07-19 03:40:16 +020046import org.onosproject.net.pi.runtime.PiMulticastGroupEntry;
Andrea Campanella432f7182017-07-14 18:43:27 +020047import org.onosproject.net.pi.runtime.PiPacketOperation;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040048import org.onosproject.net.pi.runtime.PiTableEntry;
Yi Tsenge67e1412018-01-31 17:35:20 -080049import org.onosproject.net.pi.service.PiPipeconfService;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040050import org.onosproject.p4runtime.api.P4RuntimeClient;
Yi Tseng2a340f72018-11-02 16:52:47 -070051import org.onosproject.p4runtime.api.P4RuntimeClientKey;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040052import org.onosproject.p4runtime.api.P4RuntimeEvent;
Carmelo Casconee5b28722018-06-22 17:28:28 +020053import p4.config.v1.P4InfoOuterClass.P4Info;
54import p4.tmp.P4Config;
Carmelo Cascone6af4e172018-06-15 16:01:30 +020055import p4.v1.P4RuntimeGrpc;
56import p4.v1.P4RuntimeOuterClass;
57import p4.v1.P4RuntimeOuterClass.ActionProfileGroup;
58import p4.v1.P4RuntimeOuterClass.ActionProfileMember;
59import p4.v1.P4RuntimeOuterClass.Entity;
60import p4.v1.P4RuntimeOuterClass.ForwardingPipelineConfig;
Carmelo Cascone9e4972c2018-08-30 00:29:16 -070061import p4.v1.P4RuntimeOuterClass.GetForwardingPipelineConfigRequest;
62import p4.v1.P4RuntimeOuterClass.GetForwardingPipelineConfigResponse;
Carmelo Cascone6af4e172018-06-15 16:01:30 +020063import p4.v1.P4RuntimeOuterClass.MasterArbitrationUpdate;
Carmelo Cascone58136812018-07-19 03:40:16 +020064import p4.v1.P4RuntimeOuterClass.MulticastGroupEntry;
65import p4.v1.P4RuntimeOuterClass.PacketReplicationEngineEntry;
Carmelo Cascone6af4e172018-06-15 16:01:30 +020066import p4.v1.P4RuntimeOuterClass.ReadRequest;
67import p4.v1.P4RuntimeOuterClass.ReadResponse;
68import p4.v1.P4RuntimeOuterClass.SetForwardingPipelineConfigRequest;
69import p4.v1.P4RuntimeOuterClass.StreamMessageRequest;
70import p4.v1.P4RuntimeOuterClass.StreamMessageResponse;
71import p4.v1.P4RuntimeOuterClass.TableEntry;
72import p4.v1.P4RuntimeOuterClass.Uint128;
73import p4.v1.P4RuntimeOuterClass.Update;
74import p4.v1.P4RuntimeOuterClass.WriteRequest;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040075
Carmelo Casconee5b28722018-06-22 17:28:28 +020076import java.math.BigInteger;
Carmelo Cascone9e4972c2018-08-30 00:29:16 -070077import java.net.ConnectException;
Carmelo Casconed61fdb32017-10-30 10:09:57 -070078import java.nio.ByteBuffer;
Carmelo Cascone8d99b172017-07-18 17:26:31 -040079import java.util.Collections;
80import java.util.Iterator;
81import java.util.List;
82import java.util.Map;
Carmelo Cascone87b9b392017-10-02 18:33:20 +020083import java.util.Objects;
Carmelo Casconeb045ddc2017-09-01 01:26:35 +020084import java.util.Set;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040085import java.util.concurrent.CompletableFuture;
Carmelo Cascone9e4972c2018-08-30 00:29:16 -070086import java.util.concurrent.atomic.AtomicBoolean;
Carmelo Cascone99c59db2019-01-17 15:39:35 -080087import java.util.stream.Stream;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040088
Carmelo Cascone99c59db2019-01-17 15:39:35 -080089import static com.google.common.base.Preconditions.checkArgument;
Carmelo Casconed61fdb32017-10-30 10:09:57 -070090import static com.google.common.base.Preconditions.checkNotNull;
Carmelo Cascone5bc7e102018-02-18 18:27:55 -080091import static java.lang.String.format;
Carmelo Casconee44592f2018-09-12 02:24:47 -070092import static java.util.Collections.singletonList;
Carmelo Cascone99c59db2019-01-17 15:39:35 -080093import static java.util.stream.Collectors.joining;
94import static java.util.stream.Collectors.toList;
95import static org.onosproject.p4runtime.ctl.P4RuntimeCodecs.CODECS;
Carmelo Cascone6af4e172018-06-15 16:01:30 +020096import static p4.v1.P4RuntimeOuterClass.Entity.EntityCase.ACTION_PROFILE_GROUP;
97import static p4.v1.P4RuntimeOuterClass.Entity.EntityCase.ACTION_PROFILE_MEMBER;
Carmelo Cascone99c59db2019-01-17 15:39:35 -080098import static p4.v1.P4RuntimeOuterClass.Entity.EntityCase.COUNTER_ENTRY;
99import static p4.v1.P4RuntimeOuterClass.Entity.EntityCase.DIRECT_COUNTER_ENTRY;
100import static p4.v1.P4RuntimeOuterClass.Entity.EntityCase.DIRECT_METER_ENTRY;
101import static p4.v1.P4RuntimeOuterClass.Entity.EntityCase.METER_ENTRY;
Carmelo Cascone58136812018-07-19 03:40:16 +0200102import static p4.v1.P4RuntimeOuterClass.Entity.EntityCase.PACKET_REPLICATION_ENGINE_ENTRY;
Carmelo Cascone6af4e172018-06-15 16:01:30 +0200103import static p4.v1.P4RuntimeOuterClass.Entity.EntityCase.TABLE_ENTRY;
Carmelo Casconee5b28722018-06-22 17:28:28 +0200104import static p4.v1.P4RuntimeOuterClass.PacketIn;
Carmelo Cascone6af4e172018-06-15 16:01:30 +0200105import static p4.v1.P4RuntimeOuterClass.PacketOut;
Carmelo Cascone58136812018-07-19 03:40:16 +0200106import static p4.v1.P4RuntimeOuterClass.PacketReplicationEngineEntry.TypeCase.MULTICAST_GROUP_ENTRY;
Carmelo Cascone6af4e172018-06-15 16:01:30 +0200107import static p4.v1.P4RuntimeOuterClass.SetForwardingPipelineConfigRequest.Action.VERIFY_AND_COMMIT;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400108
109/**
110 * Implementation of a P4Runtime client.
111 */
Yi Tseng2a340f72018-11-02 16:52:47 -0700112final class P4RuntimeClientImpl extends AbstractGrpcClient implements P4RuntimeClient {
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400113
Carmelo Cascone99c59db2019-01-17 15:39:35 -0800114 private static final String MISSING_P4INFO_BROWSER = "Unable to get a P4Info browser for pipeconf {}";
115
Carmelo Cascone943c6642018-09-11 13:01:03 -0700116 private static final Metadata.Key<com.google.rpc.Status> STATUS_DETAILS_KEY =
Carmelo Cascone99c59db2019-01-17 15:39:35 -0800117 Metadata.Key.of(
118 "grpc-status-details-bin",
119 ProtoLiteUtils.metadataMarshaller(
120 com.google.rpc.Status.getDefaultInstance()));
Carmelo Cascone943c6642018-09-11 13:01:03 -0700121
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400122 private static final Map<WriteOperationType, Update.Type> UPDATE_TYPES = ImmutableMap.of(
123 WriteOperationType.UNSPECIFIED, Update.Type.UNSPECIFIED,
124 WriteOperationType.INSERT, Update.Type.INSERT,
125 WriteOperationType.MODIFY, Update.Type.MODIFY,
126 WriteOperationType.DELETE, Update.Type.DELETE
127 );
128
Carmelo Casconef423bec2017-08-30 01:56:25 +0200129 private final long p4DeviceId;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400130 private final P4RuntimeControllerImpl controller;
131 private final P4RuntimeGrpc.P4RuntimeBlockingStub blockingStub;
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700132 private StreamChannelManager streamChannelManager;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400133
Carmelo Casconee5b28722018-06-22 17:28:28 +0200134 // Used by this client for write requests.
135 private Uint128 clientElectionId = Uint128.newBuilder().setLow(1).build();
Yi Tseng3e7f1452017-10-20 10:31:53 -0700136
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700137 private final AtomicBoolean isClientMaster = new AtomicBoolean(false);
138
Yi Tseng82512da2017-08-16 19:46:36 -0700139 /**
140 * Default constructor.
141 *
Yi Tseng2a340f72018-11-02 16:52:47 -0700142 * @param clientKey the client key of this client
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200143 * @param channel gRPC channel
Yi Tseng82512da2017-08-16 19:46:36 -0700144 * @param controller runtime client controller
145 */
Yi Tseng2a340f72018-11-02 16:52:47 -0700146 P4RuntimeClientImpl(P4RuntimeClientKey clientKey, ManagedChannel channel,
Carmelo Casconef423bec2017-08-30 01:56:25 +0200147 P4RuntimeControllerImpl controller) {
Yi Tseng2a340f72018-11-02 16:52:47 -0700148
Yi Tsengd7716482018-10-31 15:34:30 -0700149 super(clientKey);
Yi Tseng2a340f72018-11-02 16:52:47 -0700150 this.p4DeviceId = clientKey.p4DeviceId();
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400151 this.controller = controller;
Yi Tseng2a340f72018-11-02 16:52:47 -0700152
Carmelo Casconee5b28722018-06-22 17:28:28 +0200153 //TODO Investigate use of stub deadlines instead of timeout in supplyInContext
Carmelo Cascone2cad9ef2017-08-01 21:52:07 +0200154 this.blockingStub = P4RuntimeGrpc.newBlockingStub(channel);
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700155 this.streamChannelManager = new StreamChannelManager(channel);
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400156 }
157
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400158 @Override
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700159 public CompletableFuture<Boolean> startStreamChannel() {
Carmelo Cascone85d72ef2018-08-23 19:06:29 -0700160 return supplyInContext(() -> sendMasterArbitrationUpdate(false),
Carmelo Casconee5b28722018-06-22 17:28:28 +0200161 "start-initStreamChannel");
162 }
163
164 @Override
Carmelo Casconee5b28722018-06-22 17:28:28 +0200165 public CompletableFuture<Boolean> becomeMaster() {
Carmelo Cascone85d72ef2018-08-23 19:06:29 -0700166 return supplyInContext(() -> sendMasterArbitrationUpdate(true),
Carmelo Casconee5b28722018-06-22 17:28:28 +0200167 "becomeMaster");
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400168 }
169
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400170 @Override
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700171 public boolean isMaster() {
172 return streamChannelManager.isOpen() && isClientMaster.get();
173 }
174
175 @Override
176 public boolean isStreamChannelOpen() {
177 return streamChannelManager.isOpen();
178 }
179
180 @Override
Carmelo Casconed61fdb32017-10-30 10:09:57 -0700181 public CompletableFuture<Boolean> setPipelineConfig(PiPipeconf pipeconf, ByteBuffer deviceData) {
182 return supplyInContext(() -> doSetPipelineConfig(pipeconf, deviceData), "setPipelineConfig");
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400183 }
184
185 @Override
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700186 public boolean isPipelineConfigSet(PiPipeconf pipeconf, ByteBuffer deviceData) {
187 return doIsPipelineConfigSet(pipeconf, deviceData);
188 }
189
190 @Override
Carmelo Casconee44592f2018-09-12 02:24:47 -0700191 public CompletableFuture<Boolean> writeTableEntries(List<PiTableEntry> piTableEntries,
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400192 WriteOperationType opType, PiPipeconf pipeconf) {
Carmelo Cascone2cad9ef2017-08-01 21:52:07 +0200193 return supplyInContext(() -> doWriteTableEntries(piTableEntries, opType, pipeconf),
194 "writeTableEntries-" + opType.name());
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400195 }
196
197 @Override
Carmelo Casconee44592f2018-09-12 02:24:47 -0700198 public CompletableFuture<List<PiTableEntry>> dumpTables(
Carmelo Cascone50d195f2018-09-11 13:26:38 -0700199 Set<PiTableId> piTableIds, boolean defaultEntries, PiPipeconf pipeconf) {
200 return supplyInContext(() -> doDumpTables(piTableIds, defaultEntries, pipeconf),
201 "dumpTables-" + piTableIds.hashCode());
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400202 }
203
204 @Override
Carmelo Casconee44592f2018-09-12 02:24:47 -0700205 public CompletableFuture<List<PiTableEntry>> dumpAllTables(PiPipeconf pipeconf) {
Carmelo Cascone50d195f2018-09-11 13:26:38 -0700206 return supplyInContext(() -> doDumpTables(null, false, pipeconf), "dumpAllTables");
Carmelo Casconee5b28722018-06-22 17:28:28 +0200207 }
208
209 @Override
Andrea Campanella432f7182017-07-14 18:43:27 +0200210 public CompletableFuture<Boolean> packetOut(PiPacketOperation packet, PiPipeconf pipeconf) {
Carmelo Cascone2cad9ef2017-08-01 21:52:07 +0200211 return supplyInContext(() -> doPacketOut(packet, pipeconf), "packetOut");
Andrea Campanella432f7182017-07-14 18:43:27 +0200212 }
213
Carmelo Casconeb045ddc2017-09-01 01:26:35 +0200214 @Override
steven308017632e152018-10-20 00:51:08 +0800215 public CompletableFuture<List<PiCounterCell>> readCounterCells(Set<PiCounterCellId> cellIds,
216 PiPipeconf pipeconf) {
Carmelo Casconee44592f2018-09-12 02:24:47 -0700217 return supplyInContext(() -> doReadCounterCells(Lists.newArrayList(cellIds), pipeconf),
Carmelo Casconeb045ddc2017-09-01 01:26:35 +0200218 "readCounterCells-" + cellIds.hashCode());
219 }
220
221 @Override
steven308017632e152018-10-20 00:51:08 +0800222 public CompletableFuture<List<PiCounterCell>> readAllCounterCells(Set<PiCounterId> counterIds,
223 PiPipeconf pipeconf) {
Carmelo Casconee44592f2018-09-12 02:24:47 -0700224 return supplyInContext(() -> doReadAllCounterCells(Lists.newArrayList(counterIds), pipeconf),
Carmelo Cascone81929aa2018-04-07 01:38:55 -0700225 "readAllCounterCells-" + counterIds.hashCode());
Carmelo Casconeb045ddc2017-09-01 01:26:35 +0200226 }
227
Yi Tseng82512da2017-08-16 19:46:36 -0700228 @Override
Carmelo Casconecb4327a2018-09-11 15:17:23 -0700229 public CompletableFuture<Boolean> writeActionProfileMembers(List<PiActionProfileMember> members,
230 WriteOperationType opType,
231 PiPipeconf pipeconf) {
232 return supplyInContext(() -> doWriteActionProfileMembers(members, opType, pipeconf),
233 "writeActionProfileMembers-" + opType.name());
234 }
235
236
237 @Override
238 public CompletableFuture<Boolean> writeActionProfileGroup(PiActionProfileGroup group,
Yi Tseng82512da2017-08-16 19:46:36 -0700239 WriteOperationType opType,
Carmelo Cascone99c59db2019-01-17 15:39:35 -0800240 PiPipeconf pipeconf) {
241 return supplyInContext(() -> doWriteActionProfileGroup(group, opType, pipeconf),
Carmelo Casconecb4327a2018-09-11 15:17:23 -0700242 "writeActionProfileGroup-" + opType.name());
Yi Tseng82512da2017-08-16 19:46:36 -0700243 }
244
245 @Override
Carmelo Casconecb4327a2018-09-11 15:17:23 -0700246 public CompletableFuture<List<PiActionProfileGroup>> dumpActionProfileGroups(
247 PiActionProfileId actionProfileId, PiPipeconf pipeconf) {
Yi Tseng82512da2017-08-16 19:46:36 -0700248 return supplyInContext(() -> doDumpGroups(actionProfileId, pipeconf),
Carmelo Casconecb4327a2018-09-11 15:17:23 -0700249 "dumpActionProfileGroups-" + actionProfileId.id());
Yi Tseng82512da2017-08-16 19:46:36 -0700250 }
251
Yi Tseng3e7f1452017-10-20 10:31:53 -0700252 @Override
Carmelo Cascone99c59db2019-01-17 15:39:35 -0800253 public CompletableFuture<List<PiActionProfileMember>> dumpActionProfileMembers(
Carmelo Casconee44592f2018-09-12 02:24:47 -0700254 PiActionProfileId actionProfileId, PiPipeconf pipeconf) {
Carmelo Cascone99c59db2019-01-17 15:39:35 -0800255 return supplyInContext(() -> doDumpActionProfileMembers(actionProfileId, pipeconf),
256 "dumpActionProfileMembers-" + actionProfileId.id());
Carmelo Casconee44592f2018-09-12 02:24:47 -0700257 }
258
259 @Override
Carmelo Casconecb4327a2018-09-11 15:17:23 -0700260 public CompletableFuture<List<PiActionProfileMemberId>> removeActionProfileMembers(
Carmelo Casconee44592f2018-09-12 02:24:47 -0700261 PiActionProfileId actionProfileId,
Carmelo Casconecb4327a2018-09-11 15:17:23 -0700262 List<PiActionProfileMemberId> memberIds,
Carmelo Casconee44592f2018-09-12 02:24:47 -0700263 PiPipeconf pipeconf) {
264 return supplyInContext(
265 () -> doRemoveActionProfileMembers(actionProfileId, memberIds, pipeconf),
266 "cleanupActionProfileMembers-" + actionProfileId.id());
267 }
268
269 @Override
270 public CompletableFuture<Boolean> writeMeterCells(List<PiMeterCellConfig> cellIds, PiPipeconf pipeconf) {
Frank Wangd7e3b4b2017-09-24 13:37:54 +0900271
272 return supplyInContext(() -> doWriteMeterCells(cellIds, pipeconf),
273 "writeMeterCells");
274 }
275
276 @Override
Carmelo Cascone58136812018-07-19 03:40:16 +0200277 public CompletableFuture<Boolean> writePreMulticastGroupEntries(
Carmelo Casconee44592f2018-09-12 02:24:47 -0700278 List<PiMulticastGroupEntry> entries,
Carmelo Cascone58136812018-07-19 03:40:16 +0200279 WriteOperationType opType) {
280 return supplyInContext(() -> doWriteMulticastGroupEntries(entries, opType),
281 "writePreMulticastGroupEntries");
282 }
283
284 @Override
Carmelo Casconee44592f2018-09-12 02:24:47 -0700285 public CompletableFuture<List<PiMulticastGroupEntry>> readAllMulticastGroupEntries() {
Carmelo Cascone58136812018-07-19 03:40:16 +0200286 return supplyInContext(this::doReadAllMulticastGroupEntries,
287 "readAllMulticastGroupEntries");
288 }
289
290 @Override
Carmelo Casconee44592f2018-09-12 02:24:47 -0700291 public CompletableFuture<List<PiMeterCellConfig>> readMeterCells(Set<PiMeterCellId> cellIds,
292 PiPipeconf pipeconf) {
293 return supplyInContext(() -> doReadMeterCells(Lists.newArrayList(cellIds), pipeconf),
Frank Wangd7e3b4b2017-09-24 13:37:54 +0900294 "readMeterCells-" + cellIds.hashCode());
295 }
296
297 @Override
Carmelo Casconee44592f2018-09-12 02:24:47 -0700298 public CompletableFuture<List<PiMeterCellConfig>> readAllMeterCells(Set<PiMeterId> meterIds,
299 PiPipeconf pipeconf) {
300 return supplyInContext(() -> doReadAllMeterCells(Lists.newArrayList(meterIds), pipeconf),
Carmelo Cascone81929aa2018-04-07 01:38:55 -0700301 "readAllMeterCells-" + meterIds.hashCode());
Frank Wangd7e3b4b2017-09-24 13:37:54 +0900302 }
Yi Tseng3e7f1452017-10-20 10:31:53 -0700303
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400304 /* Blocking method implementations below */
305
Carmelo Cascone85d72ef2018-08-23 19:06:29 -0700306 private boolean sendMasterArbitrationUpdate(boolean asMaster) {
307 BigInteger newId = controller.newMasterElectionId(deviceId);
308 if (asMaster) {
309 // Becoming master is a race. Here we increase our chances of win
310 // against other ONOS nodes in the cluster that are calling start()
311 // (which is used to start the stream RPC session, not to become
312 // master).
313 newId = newId.add(BigInteger.valueOf(1000));
Carmelo Casconee5b28722018-06-22 17:28:28 +0200314 }
Carmelo Cascone85d72ef2018-08-23 19:06:29 -0700315 final Uint128 idMsg = bigIntegerToUint128(
316 controller.newMasterElectionId(deviceId));
Andrea Campanella1e573442018-05-17 17:07:13 +0200317
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700318 log.debug("Sending arbitration update to {}... electionId={}",
319 deviceId, newId);
320
321 streamChannelManager.send(
322 StreamMessageRequest.newBuilder()
323 .setArbitration(
324 MasterArbitrationUpdate
325 .newBuilder()
326 .setDeviceId(p4DeviceId)
327 .setElectionId(idMsg)
328 .build())
329 .build());
330 clientElectionId = idMsg;
331 return true;
332 }
333
334 private ForwardingPipelineConfig getPipelineConfig(
335 PiPipeconf pipeconf, ByteBuffer deviceData) {
336 P4Info p4Info = PipeconfHelper.getP4Info(pipeconf);
337 if (p4Info == null) {
338 // Problem logged by PipeconfHelper.
339 return null;
Yi Tseng3e7f1452017-10-20 10:31:53 -0700340 }
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700341
ghj050452092a48bf2018-10-22 10:50:41 -0700342 ForwardingPipelineConfig.Cookie pipeconfCookie = ForwardingPipelineConfig.Cookie
343 .newBuilder()
ghj0504520ec1a4202018-10-22 10:50:41 -0700344 .setCookie(pipeconf.fingerprint())
ghj050452092a48bf2018-10-22 10:50:41 -0700345 .build();
346
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700347 // FIXME: This is specific to PI P4Runtime implementation.
348 P4Config.P4DeviceConfig p4DeviceConfigMsg = P4Config.P4DeviceConfig
349 .newBuilder()
350 .setExtras(P4Config.P4DeviceConfig.Extras.getDefaultInstance())
351 .setReassign(true)
352 .setDeviceData(ByteString.copyFrom(deviceData))
353 .build();
354
355 return ForwardingPipelineConfig
356 .newBuilder()
357 .setP4Info(p4Info)
358 .setP4DeviceConfig(p4DeviceConfigMsg.toByteString())
ghj050452092a48bf2018-10-22 10:50:41 -0700359 .setCookie(pipeconfCookie)
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700360 .build();
361 }
362
363 private boolean doIsPipelineConfigSet(PiPipeconf pipeconf, ByteBuffer deviceData) {
364
365 GetForwardingPipelineConfigRequest request = GetForwardingPipelineConfigRequest
366 .newBuilder()
367 .setDeviceId(p4DeviceId)
ghj050452092a48bf2018-10-22 10:50:41 -0700368 .setResponseType(GetForwardingPipelineConfigRequest
369 .ResponseType.COOKIE_ONLY)
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700370 .build();
371
372 GetForwardingPipelineConfigResponse resp;
373 try {
374 resp = this.blockingStub
375 .getForwardingPipelineConfig(request);
376 } catch (StatusRuntimeException ex) {
377 checkGrpcException(ex);
378 // FAILED_PRECONDITION means that a pipeline config was not set in
379 // the first place. Don't bother logging.
380 if (!ex.getStatus().getCode()
381 .equals(Status.FAILED_PRECONDITION.getCode())) {
382 log.warn("Unable to get pipeline config from {}: {}",
383 deviceId, ex.getMessage());
384 }
385 return false;
386 }
ghj050452092a48bf2018-10-22 10:50:41 -0700387 if (!resp.getConfig().hasCookie()) {
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700388 log.warn("{} returned GetForwardingPipelineConfigResponse " +
Carmelo Casconecb831812018-11-29 13:19:39 -0800389 "with 'cookie' field unset. " +
390 "Will try by comparing 'device_data'...",
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700391 deviceId);
Carmelo Casconecb831812018-11-29 13:19:39 -0800392 return doIsPipelineConfigSetWithData(pipeconf, deviceData);
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700393 }
ghj050452092a48bf2018-10-22 10:50:41 -0700394
ghj0504520ec1a4202018-10-22 10:50:41 -0700395 return resp.getConfig().getCookie().getCookie() == pipeconf.fingerprint();
Yi Tseng3e7f1452017-10-20 10:31:53 -0700396 }
Carmelo Casconee5b28722018-06-22 17:28:28 +0200397
Carmelo Casconecb831812018-11-29 13:19:39 -0800398 private boolean doIsPipelineConfigSetWithData(PiPipeconf pipeconf, ByteBuffer deviceData) {
399
400 GetForwardingPipelineConfigRequest request = GetForwardingPipelineConfigRequest
401 .newBuilder()
402 .setDeviceId(p4DeviceId)
403 .build();
404
405 GetForwardingPipelineConfigResponse resp;
406 try {
407 resp = this.blockingStub
408 .getForwardingPipelineConfig(request);
409 } catch (StatusRuntimeException ex) {
410 checkGrpcException(ex);
411 return false;
412 }
413
414 ForwardingPipelineConfig expectedConfig = getPipelineConfig(
415 pipeconf, deviceData);
416
417 if (expectedConfig == null) {
418 return false;
419 }
420 if (!resp.hasConfig()) {
421 log.warn("{} returned GetForwardingPipelineConfigResponse " +
422 "with 'config' field unset",
423 deviceId);
424 return false;
425 }
426 if (resp.getConfig().getP4DeviceConfig().isEmpty()
427 && !expectedConfig.getP4DeviceConfig().isEmpty()) {
428 // Don't bother with a warn or error since we don't really allow
429 // updating the pipeline to a different one. So the P4Info should be
430 // enough for us.
431 log.debug("{} returned GetForwardingPipelineConfigResponse " +
432 "with empty 'p4_device_config' field, " +
433 "equality will be based only on P4Info",
434 deviceId);
435 return resp.getConfig().getP4Info().equals(
436 expectedConfig.getP4Info());
437 } else {
438 return resp.getConfig().getP4DeviceConfig()
439 .equals(expectedConfig.getP4DeviceConfig())
440 && resp.getConfig().getP4Info()
441 .equals(expectedConfig.getP4Info());
442 }
443 }
444
Carmelo Casconed61fdb32017-10-30 10:09:57 -0700445 private boolean doSetPipelineConfig(PiPipeconf pipeconf, ByteBuffer deviceData) {
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400446
Carmelo Casconed61fdb32017-10-30 10:09:57 -0700447 log.info("Setting pipeline config for {} to {}...", deviceId, pipeconf.id());
448
449 checkNotNull(deviceData, "deviceData cannot be null");
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400450
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700451 ForwardingPipelineConfig pipelineConfig = getPipelineConfig(pipeconf, deviceData);
452
453 if (pipelineConfig == null) {
454 // Error logged in getPipelineConfig()
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400455 return false;
456 }
457
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400458 SetForwardingPipelineConfigRequest request = SetForwardingPipelineConfigRequest
459 .newBuilder()
Andrea Campanella8bcd5862017-12-11 11:34:45 +0100460 .setDeviceId(p4DeviceId)
Carmelo Casconee5b28722018-06-22 17:28:28 +0200461 .setElectionId(clientElectionId)
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400462 .setAction(VERIFY_AND_COMMIT)
Andrea Campanella8bcd5862017-12-11 11:34:45 +0100463 .setConfig(pipelineConfig)
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400464 .build();
465
466 try {
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700467 //noinspection ResultOfMethodCallIgnored
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400468 this.blockingStub.setForwardingPipelineConfig(request);
Carmelo Casconed61fdb32017-10-30 10:09:57 -0700469 return true;
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400470 } catch (StatusRuntimeException ex) {
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700471 checkGrpcException(ex);
Carmelo Cascone5bc7e102018-02-18 18:27:55 -0800472 log.warn("Unable to set pipeline config on {}: {}", deviceId, ex.getMessage());
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400473 return false;
474 }
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400475 }
476
Carmelo Casconee44592f2018-09-12 02:24:47 -0700477 private boolean doWriteTableEntries(List<PiTableEntry> piTableEntries, WriteOperationType opType,
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400478 PiPipeconf pipeconf) {
Carmelo Cascone5bc7e102018-02-18 18:27:55 -0800479 if (piTableEntries.size() == 0) {
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400480 return true;
481 }
482
Carmelo Casconee44592f2018-09-12 02:24:47 -0700483 List<Update> updateMsgs;
Carmelo Cascone5bc7e102018-02-18 18:27:55 -0800484 try {
485 updateMsgs = TableEntryEncoder.encode(piTableEntries, pipeconf)
486 .stream()
487 .map(tableEntryMsg ->
488 Update.newBuilder()
489 .setEntity(Entity.newBuilder()
490 .setTableEntry(tableEntryMsg)
491 .build())
492 .setType(UPDATE_TYPES.get(opType))
493 .build())
Carmelo Cascone99c59db2019-01-17 15:39:35 -0800494 .collect(toList());
495 } catch (CodecException e) {
Carmelo Cascone5bc7e102018-02-18 18:27:55 -0800496 log.error("Unable to encode table entries, aborting {} operation: {}",
497 opType.name(), e.getMessage());
498 return false;
499 }
500
Carmelo Cascone58136812018-07-19 03:40:16 +0200501 return write(updateMsgs, piTableEntries, opType, "table entry");
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400502 }
503
Carmelo Casconee44592f2018-09-12 02:24:47 -0700504 private List<PiTableEntry> doDumpTables(
Carmelo Cascone50d195f2018-09-11 13:26:38 -0700505 Set<PiTableId> piTableIds, boolean defaultEntries, PiPipeconf pipeconf) {
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400506
Carmelo Cascone50d195f2018-09-11 13:26:38 -0700507 log.debug("Dumping tables {} from {} (pipeconf {})...",
508 piTableIds, deviceId, pipeconf.id());
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400509
Carmelo Cascone50d195f2018-09-11 13:26:38 -0700510 Set<Integer> tableIds = Sets.newHashSet();
511 if (piTableIds == null) {
Carmelo Casconee5b28722018-06-22 17:28:28 +0200512 // Dump all tables.
Carmelo Cascone50d195f2018-09-11 13:26:38 -0700513 tableIds.add(0);
Carmelo Casconee5b28722018-06-22 17:28:28 +0200514 } else {
515 P4InfoBrowser browser = PipeconfHelper.getP4InfoBrowser(pipeconf);
Carmelo Cascone58136812018-07-19 03:40:16 +0200516 if (browser == null) {
Carmelo Cascone99c59db2019-01-17 15:39:35 -0800517 log.error(MISSING_P4INFO_BROWSER, pipeconf);
Carmelo Cascone58136812018-07-19 03:40:16 +0200518 return Collections.emptyList();
519 }
Carmelo Cascone50d195f2018-09-11 13:26:38 -0700520 piTableIds.forEach(piTableId -> {
521 try {
522 tableIds.add(browser.tables().getByName(piTableId.id()).getPreamble().getId());
523 } catch (P4InfoBrowser.NotFoundException e) {
524 log.warn("Unable to dump table {}: {}", piTableId, e.getMessage());
525 }
526 });
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400527 }
528
Carmelo Cascone50d195f2018-09-11 13:26:38 -0700529 if (tableIds.isEmpty()) {
530 return Collections.emptyList();
531 }
532
Carmelo Cascone99c59db2019-01-17 15:39:35 -0800533 final List<Entity> entities = tableIds.stream()
534 .map(tableId -> TableEntry.newBuilder()
535 .setTableId(tableId)
536 .setIsDefaultAction(defaultEntries)
537 .setCounterData(P4RuntimeOuterClass.CounterData.getDefaultInstance())
Carmelo Cascone50d195f2018-09-11 13:26:38 -0700538 .build())
Carmelo Cascone99c59db2019-01-17 15:39:35 -0800539 .map(e -> Entity.newBuilder().setTableEntry(e).build())
540 .collect(toList());
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400541
Carmelo Cascone99c59db2019-01-17 15:39:35 -0800542 final List<TableEntry> tableEntryMsgs = blockingRead(entities, TABLE_ENTRY)
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400543 .map(Entity::getTableEntry)
Carmelo Cascone99c59db2019-01-17 15:39:35 -0800544 .collect(toList());
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400545
Carmelo Cascone50d195f2018-09-11 13:26:38 -0700546 log.debug("Retrieved {} entries from {} tables on {}...",
547 tableEntryMsgs.size(), tableIds.size(), deviceId);
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400548
549 return TableEntryEncoder.decode(tableEntryMsgs, pipeconf);
550 }
551
Andrea Campanellafc1d34c2017-07-18 17:01:41 +0200552 private boolean doPacketOut(PiPacketOperation packet, PiPipeconf pipeconf) {
553 try {
554 //encode the PiPacketOperation into a PacketOut
555 PacketOut packetOut = PacketIOCodec.encodePacketOut(packet, pipeconf);
556
557 //Build the request
558 StreamMessageRequest packetOutRequest = StreamMessageRequest
559 .newBuilder().setPacket(packetOut).build();
560
561 //Send the request
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700562 streamChannelManager.send(packetOutRequest);
Andrea Campanellafc1d34c2017-07-18 17:01:41 +0200563
564 } catch (P4InfoBrowser.NotFoundException e) {
565 log.error("Cant find expected metadata in p4Info file. {}", e.getMessage());
566 log.debug("Exception", e);
567 return false;
568 }
569 return true;
570 }
571
Carmelo Casconea966c342017-07-30 01:56:30 -0400572 private void doPacketIn(PacketIn packetInMsg) {
573
574 // Retrieve the pipeconf for this client's device.
575 PiPipeconfService pipeconfService = DefaultServiceDirectory.getService(PiPipeconfService.class);
576 if (pipeconfService == null) {
577 throw new IllegalStateException("PiPipeconfService is null. Can't handle packet in.");
578 }
579 final PiPipeconf pipeconf;
580 if (pipeconfService.ofDevice(deviceId).isPresent() &&
581 pipeconfService.getPipeconf(pipeconfService.ofDevice(deviceId).get()).isPresent()) {
582 pipeconf = pipeconfService.getPipeconf(pipeconfService.ofDevice(deviceId).get()).get();
583 } else {
584 log.warn("Unable to get pipeconf of {}. Can't handle packet in", deviceId);
585 return;
586 }
587 // Decode packet message and post event.
Carmelo Cascone87892e22017-11-13 16:01:29 -0800588 PiPacketOperation packetOperation = PacketIOCodec.decodePacketIn(packetInMsg, pipeconf, deviceId);
Carmelo Casconee5b28722018-06-22 17:28:28 +0200589 PacketInEvent packetInEventSubject = new PacketInEvent(deviceId, packetOperation);
Carmelo Cascone2cad9ef2017-08-01 21:52:07 +0200590 P4RuntimeEvent event = new P4RuntimeEvent(P4RuntimeEvent.Type.PACKET_IN, packetInEventSubject);
Carmelo Casconea966c342017-07-30 01:56:30 -0400591 log.debug("Received packet in: {}", event);
592 controller.postEvent(event);
593 }
594
Carmelo Casconee5b28722018-06-22 17:28:28 +0200595 private void doArbitrationResponse(MasterArbitrationUpdate msg) {
596 // From the spec...
597 // - Election_id: The stream RPC with the highest election_id is the
598 // master. Switch populates with the highest election ID it
599 // has received from all connected controllers.
600 // - Status: Switch populates this with OK for the client that is the
601 // master, and with an error status for all other connected clients (at
602 // every mastership change).
603 if (!msg.hasElectionId() || !msg.hasStatus()) {
Yi Tseng3e7f1452017-10-20 10:31:53 -0700604 return;
605 }
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700606 final boolean isMaster =
607 msg.getStatus().getCode() == Status.OK.getCode().value();
608 log.debug("Received arbitration update from {}: isMaster={}, electionId={}",
609 deviceId, isMaster, uint128ToBigInteger(msg.getElectionId()));
Carmelo Casconee5b28722018-06-22 17:28:28 +0200610 controller.postEvent(new P4RuntimeEvent(
611 P4RuntimeEvent.Type.ARBITRATION_RESPONSE,
612 new ArbitrationResponse(deviceId, isMaster)));
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700613 isClientMaster.set(isMaster);
Carmelo Casconea966c342017-07-30 01:56:30 -0400614 }
615
steven308017632e152018-10-20 00:51:08 +0800616 private List<PiCounterCell> doReadAllCounterCells(
Carmelo Casconee44592f2018-09-12 02:24:47 -0700617 List<PiCounterId> counterIds, PiPipeconf pipeconf) {
Carmelo Cascone81929aa2018-04-07 01:38:55 -0700618 return doReadCounterEntities(
619 CounterEntryCodec.readAllCellsEntities(counterIds, pipeconf),
620 pipeconf);
621 }
Carmelo Casconeb045ddc2017-09-01 01:26:35 +0200622
steven308017632e152018-10-20 00:51:08 +0800623 private List<PiCounterCell> doReadCounterCells(
Carmelo Casconee44592f2018-09-12 02:24:47 -0700624 List<PiCounterCellId> cellIds, PiPipeconf pipeconf) {
Carmelo Cascone81929aa2018-04-07 01:38:55 -0700625 return doReadCounterEntities(
626 CounterEntryCodec.encodePiCounterCellIds(cellIds, pipeconf),
627 pipeconf);
628 }
629
steven308017632e152018-10-20 00:51:08 +0800630 private List<PiCounterCell> doReadCounterEntities(
Carmelo Casconee44592f2018-09-12 02:24:47 -0700631 List<Entity> counterEntities, PiPipeconf pipeconf) {
Carmelo Cascone81929aa2018-04-07 01:38:55 -0700632
Carmelo Cascone99c59db2019-01-17 15:39:35 -0800633 final List<Entity> entities = blockingRead(
634 counterEntities, COUNTER_ENTRY, DIRECT_COUNTER_ENTRY)
635 .collect(toList());
Carmelo Casconeb045ddc2017-09-01 01:26:35 +0200636
Carmelo Cascone81929aa2018-04-07 01:38:55 -0700637 return CounterEntryCodec.decodeCounterEntities(entities, pipeconf);
Carmelo Casconeb045ddc2017-09-01 01:26:35 +0200638 }
639
Carmelo Casconecb4327a2018-09-11 15:17:23 -0700640 private boolean doWriteActionProfileMembers(List<PiActionProfileMember> members,
641 WriteOperationType opType, PiPipeconf pipeconf) {
Carmelo Cascone99c59db2019-01-17 15:39:35 -0800642 final List<ActionProfileMember> actionProfileMembers;
643 try {
644 actionProfileMembers = CODECS.actionProfileMember()
645 .encodeAllOrFail(members, pipeconf);
646 } catch (CodecException e) {
647 log.warn("Unable to {} action profile members: {}",
648 opType.name(), e.getMessage());
649 return false;
Yi Tseng82512da2017-08-16 19:46:36 -0700650 }
Carmelo Casconee44592f2018-09-12 02:24:47 -0700651 final List<Update> updateMsgs = actionProfileMembers.stream()
Carmelo Cascone99c59db2019-01-17 15:39:35 -0800652 .map(m -> Update.newBuilder()
653 .setEntity(Entity.newBuilder()
654 .setActionProfileMember(m)
655 .build())
656 .setType(UPDATE_TYPES.get(opType))
657 .build())
658 .collect(toList());
Carmelo Casconecb4327a2018-09-11 15:17:23 -0700659 return write(updateMsgs, members, opType, "action profile member");
Yi Tseng82512da2017-08-16 19:46:36 -0700660 }
661
Carmelo Casconecb4327a2018-09-11 15:17:23 -0700662 private List<PiActionProfileGroup> doDumpGroups(PiActionProfileId piActionProfileId, PiPipeconf pipeconf) {
Yi Tseng82512da2017-08-16 19:46:36 -0700663 log.debug("Dumping groups from action profile {} from {} (pipeconf {})...",
664 piActionProfileId.id(), deviceId, pipeconf.id());
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200665
666 final P4InfoBrowser browser = PipeconfHelper.getP4InfoBrowser(pipeconf);
Yi Tseng82512da2017-08-16 19:46:36 -0700667 if (browser == null) {
Carmelo Cascone99c59db2019-01-17 15:39:35 -0800668 log.warn(MISSING_P4INFO_BROWSER, pipeconf);
Carmelo Casconee44592f2018-09-12 02:24:47 -0700669 return Collections.emptyList();
Yi Tseng82512da2017-08-16 19:46:36 -0700670 }
671
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200672 final int actionProfileId;
Yi Tseng82512da2017-08-16 19:46:36 -0700673 try {
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200674 actionProfileId = browser
675 .actionProfiles()
Carmelo Casconecb0a49c2017-10-03 14:32:23 +0200676 .getByName(piActionProfileId.id())
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200677 .getPreamble()
678 .getId();
Yi Tseng82512da2017-08-16 19:46:36 -0700679 } catch (P4InfoBrowser.NotFoundException e) {
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200680 log.warn("Unable to dump groups: {}", e.getMessage());
Carmelo Casconee44592f2018-09-12 02:24:47 -0700681 return Collections.emptyList();
Yi Tseng82512da2017-08-16 19:46:36 -0700682 }
683
Carmelo Cascone99c59db2019-01-17 15:39:35 -0800684 // Read all groups from the given action profile.
685 final Entity entityToRead = Entity.newBuilder()
686 .setActionProfileGroup(
687 ActionProfileGroup.newBuilder()
688 .setActionProfileId(actionProfileId)
689 .build())
Yi Tseng82512da2017-08-16 19:46:36 -0700690 .build();
Carmelo Cascone99c59db2019-01-17 15:39:35 -0800691 final List<ActionProfileGroup> groupMsgs = blockingRead(entityToRead, ACTION_PROFILE_GROUP)
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200692 .map(Entity::getActionProfileGroup)
Carmelo Cascone99c59db2019-01-17 15:39:35 -0800693 .collect(toList());
Yi Tseng82512da2017-08-16 19:46:36 -0700694
695 log.debug("Retrieved {} groups from action profile {} on {}...",
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200696 groupMsgs.size(), piActionProfileId.id(), deviceId);
Yi Tseng82512da2017-08-16 19:46:36 -0700697
Carmelo Cascone99c59db2019-01-17 15:39:35 -0800698 return CODECS.actionProfileGroup().decodeAll(groupMsgs, pipeconf);
Yi Tseng82512da2017-08-16 19:46:36 -0700699 }
700
Carmelo Cascone99c59db2019-01-17 15:39:35 -0800701 private List<PiActionProfileMember> doDumpActionProfileMembers(
Carmelo Casconee44592f2018-09-12 02:24:47 -0700702 PiActionProfileId actionProfileId, PiPipeconf pipeconf) {
703
704 final P4InfoBrowser browser = PipeconfHelper.getP4InfoBrowser(pipeconf);
705 if (browser == null) {
Carmelo Cascone99c59db2019-01-17 15:39:35 -0800706 log.error(MISSING_P4INFO_BROWSER, pipeconf);
Carmelo Casconee44592f2018-09-12 02:24:47 -0700707 return Collections.emptyList();
708 }
709
710 final int p4ActProfId;
711 try {
712 p4ActProfId = browser
713 .actionProfiles()
714 .getByName(actionProfileId.id())
715 .getPreamble()
716 .getId();
717 } catch (P4InfoBrowser.NotFoundException e) {
Carmelo Cascone99c59db2019-01-17 15:39:35 -0800718 log.warn("Unable to dump action profile members: {}", e.getMessage());
Carmelo Casconee44592f2018-09-12 02:24:47 -0700719 return Collections.emptyList();
720 }
721
Carmelo Cascone99c59db2019-01-17 15:39:35 -0800722 Entity entityToRead = Entity.newBuilder()
723 .setActionProfileMember(
Carmelo Casconee44592f2018-09-12 02:24:47 -0700724 ActionProfileMember.newBuilder()
725 .setActionProfileId(p4ActProfId)
Carmelo Cascone99c59db2019-01-17 15:39:35 -0800726 .build())
Carmelo Casconee44592f2018-09-12 02:24:47 -0700727 .build();
Carmelo Cascone99c59db2019-01-17 15:39:35 -0800728 final List<ActionProfileMember> memberMsgs = blockingRead(entityToRead, ACTION_PROFILE_MEMBER)
Carmelo Casconee44592f2018-09-12 02:24:47 -0700729 .map(Entity::getActionProfileMember)
Carmelo Cascone99c59db2019-01-17 15:39:35 -0800730 .collect(toList());
731
732 log.debug("Retrieved {} members from action profile {} on {}...",
733 memberMsgs.size(), actionProfileId.id(), deviceId);
734
735 return CODECS.actionProfileMember().decodeAll(memberMsgs, pipeconf);
Carmelo Casconee44592f2018-09-12 02:24:47 -0700736 }
737
Carmelo Casconecb4327a2018-09-11 15:17:23 -0700738 private List<PiActionProfileMemberId> doRemoveActionProfileMembers(
Carmelo Casconee44592f2018-09-12 02:24:47 -0700739 PiActionProfileId actionProfileId,
Carmelo Casconecb4327a2018-09-11 15:17:23 -0700740 List<PiActionProfileMemberId> memberIds,
Carmelo Casconee44592f2018-09-12 02:24:47 -0700741 PiPipeconf pipeconf) {
742
743 if (memberIds.isEmpty()) {
744 return Collections.emptyList();
745 }
746
747 final P4InfoBrowser browser = PipeconfHelper.getP4InfoBrowser(pipeconf);
748 if (browser == null) {
Carmelo Cascone99c59db2019-01-17 15:39:35 -0800749 log.error(MISSING_P4INFO_BROWSER, pipeconf);
Carmelo Casconee44592f2018-09-12 02:24:47 -0700750 return Collections.emptyList();
751 }
752
753 final int p4ActProfId;
754 try {
755 p4ActProfId = browser.actionProfiles()
756 .getByName(actionProfileId.id()).getPreamble().getId();
757 } catch (P4InfoBrowser.NotFoundException e) {
758 log.warn("Unable to cleanup action profile members: {}", e.getMessage());
759 return Collections.emptyList();
760 }
761
762 final List<Update> updateMsgs = memberIds.stream()
763 .map(m -> ActionProfileMember.newBuilder()
764 .setActionProfileId(p4ActProfId)
765 .setMemberId(m.id()).build())
766 .map(m -> Entity.newBuilder().setActionProfileMember(m).build())
767 .map(e -> Update.newBuilder().setEntity(e)
768 .setType(Update.Type.DELETE).build())
Carmelo Cascone99c59db2019-01-17 15:39:35 -0800769 .collect(toList());
Carmelo Casconee44592f2018-09-12 02:24:47 -0700770
771 log.debug("Removing {} members of action profile '{}'...",
772 memberIds.size(), actionProfileId);
773
774 return writeAndReturnSuccessEntities(
775 updateMsgs, memberIds, WriteOperationType.DELETE,
776 "action profile members");
777 }
778
Carmelo Casconecb4327a2018-09-11 15:17:23 -0700779 private boolean doWriteActionProfileGroup(
Carmelo Cascone99c59db2019-01-17 15:39:35 -0800780 PiActionProfileGroup group, WriteOperationType opType, PiPipeconf pipeconf) {
781 final ActionProfileGroup groupMsg;
Yi Tseng82512da2017-08-16 19:46:36 -0700782 try {
Carmelo Cascone99c59db2019-01-17 15:39:35 -0800783 groupMsg = CODECS.actionProfileGroup().encode(group, pipeconf);
784 } catch (CodecException e) {
785 log.warn("Unable to encode group, aborting {} operation: {}",
786 opType.name(), e.getMessage());
Yi Tseng82512da2017-08-16 19:46:36 -0700787 return false;
788 }
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200789
Carmelo Cascone58136812018-07-19 03:40:16 +0200790 final Update updateMsg = Update.newBuilder()
791 .setEntity(Entity.newBuilder()
Carmelo Cascone99c59db2019-01-17 15:39:35 -0800792 .setActionProfileGroup(groupMsg)
Carmelo Cascone58136812018-07-19 03:40:16 +0200793 .build())
794 .setType(UPDATE_TYPES.get(opType))
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200795 .build();
Carmelo Cascone58136812018-07-19 03:40:16 +0200796
Carmelo Casconee44592f2018-09-12 02:24:47 -0700797 return write(singletonList(updateMsg), singletonList(group),
Carmelo Cascone58136812018-07-19 03:40:16 +0200798 opType, "group");
Yi Tseng82512da2017-08-16 19:46:36 -0700799 }
800
Carmelo Casconee44592f2018-09-12 02:24:47 -0700801 private List<PiMeterCellConfig> doReadAllMeterCells(
802 List<PiMeterId> meterIds, PiPipeconf pipeconf) {
Carmelo Cascone81929aa2018-04-07 01:38:55 -0700803 return doReadMeterEntities(MeterEntryCodec.readAllCellsEntities(
804 meterIds, pipeconf), pipeconf);
805 }
Frank Wangd7e3b4b2017-09-24 13:37:54 +0900806
Carmelo Casconee44592f2018-09-12 02:24:47 -0700807 private List<PiMeterCellConfig> doReadMeterCells(
808 List<PiMeterCellId> cellIds, PiPipeconf pipeconf) {
Carmelo Cascone81929aa2018-04-07 01:38:55 -0700809
Carmelo Casconee44592f2018-09-12 02:24:47 -0700810 final List<PiMeterCellConfig> piMeterCellConfigs = cellIds.stream()
Frank Wangd7e3b4b2017-09-24 13:37:54 +0900811 .map(cellId -> PiMeterCellConfig.builder()
Carmelo Cascone81929aa2018-04-07 01:38:55 -0700812 .withMeterCellId(cellId)
813 .build())
Carmelo Cascone99c59db2019-01-17 15:39:35 -0800814 .collect(toList());
Frank Wangd7e3b4b2017-09-24 13:37:54 +0900815
Carmelo Cascone81929aa2018-04-07 01:38:55 -0700816 return doReadMeterEntities(MeterEntryCodec.encodePiMeterCellConfigs(
817 piMeterCellConfigs, pipeconf), pipeconf);
818 }
819
Carmelo Casconee44592f2018-09-12 02:24:47 -0700820 private List<PiMeterCellConfig> doReadMeterEntities(
821 List<Entity> entitiesToRead, PiPipeconf pipeconf) {
Carmelo Cascone81929aa2018-04-07 01:38:55 -0700822
Carmelo Cascone99c59db2019-01-17 15:39:35 -0800823 final List<Entity> responseEntities = blockingRead(
824 entitiesToRead, METER_ENTRY, DIRECT_METER_ENTRY)
825 .collect(toList());
Frank Wangd7e3b4b2017-09-24 13:37:54 +0900826
Carmelo Cascone81929aa2018-04-07 01:38:55 -0700827 return MeterEntryCodec.decodeMeterEntities(responseEntities, pipeconf);
Frank Wangd7e3b4b2017-09-24 13:37:54 +0900828 }
829
Carmelo Casconee44592f2018-09-12 02:24:47 -0700830 private boolean doWriteMeterCells(List<PiMeterCellConfig> cellConfigs, PiPipeconf pipeconf) {
Frank Wangd7e3b4b2017-09-24 13:37:54 +0900831
Carmelo Casconee44592f2018-09-12 02:24:47 -0700832 List<Update> updateMsgs = MeterEntryCodec.encodePiMeterCellConfigs(cellConfigs, pipeconf)
Frank Wangd7e3b4b2017-09-24 13:37:54 +0900833 .stream()
834 .map(meterEntryMsg ->
835 Update.newBuilder()
836 .setEntity(meterEntryMsg)
837 .setType(UPDATE_TYPES.get(WriteOperationType.MODIFY))
838 .build())
Carmelo Cascone99c59db2019-01-17 15:39:35 -0800839 .collect(toList());
Frank Wangd7e3b4b2017-09-24 13:37:54 +0900840
841 if (updateMsgs.size() == 0) {
842 return true;
843 }
844
Carmelo Cascone58136812018-07-19 03:40:16 +0200845 return write(updateMsgs, cellConfigs, WriteOperationType.MODIFY, "meter cell config");
846 }
847
848 private boolean doWriteMulticastGroupEntries(
Carmelo Casconee44592f2018-09-12 02:24:47 -0700849 List<PiMulticastGroupEntry> entries,
Carmelo Cascone58136812018-07-19 03:40:16 +0200850 WriteOperationType opType) {
851
852 final List<Update> updateMsgs = entries.stream()
Carmelo Cascone03ae0ac2018-10-11 08:31:59 -0700853 .map(piEntry -> {
854 try {
855 return MulticastGroupEntryCodec.encode(piEntry);
Carmelo Cascone99c59db2019-01-17 15:39:35 -0800856 } catch (CodecException e) {
Carmelo Cascone03ae0ac2018-10-11 08:31:59 -0700857 log.warn("Unable to encode PiMulticastGroupEntry: {}", e.getMessage());
858 return null;
859 }
860 })
861 .filter(Objects::nonNull)
Carmelo Cascone58136812018-07-19 03:40:16 +0200862 .map(mcMsg -> PacketReplicationEngineEntry.newBuilder()
863 .setMulticastGroupEntry(mcMsg)
864 .build())
865 .map(preMsg -> Entity.newBuilder()
866 .setPacketReplicationEngineEntry(preMsg)
867 .build())
868 .map(entityMsg -> Update.newBuilder()
869 .setEntity(entityMsg)
870 .setType(UPDATE_TYPES.get(opType))
871 .build())
Carmelo Cascone99c59db2019-01-17 15:39:35 -0800872 .collect(toList());
Carmelo Cascone58136812018-07-19 03:40:16 +0200873 return write(updateMsgs, entries, opType, "multicast group entry");
874 }
875
Carmelo Casconee44592f2018-09-12 02:24:47 -0700876 private List<PiMulticastGroupEntry> doReadAllMulticastGroupEntries() {
Carmelo Cascone58136812018-07-19 03:40:16 +0200877
878 final Entity entity = Entity.newBuilder()
879 .setPacketReplicationEngineEntry(
880 PacketReplicationEngineEntry.newBuilder()
881 .setMulticastGroupEntry(
882 MulticastGroupEntry.newBuilder()
883 .build())
884 .build())
885 .build();
886
Carmelo Cascone99c59db2019-01-17 15:39:35 -0800887 final List<PiMulticastGroupEntry> mcEntries = blockingRead(entity, PACKET_REPLICATION_ENGINE_ENTRY)
Carmelo Cascone58136812018-07-19 03:40:16 +0200888 .map(Entity::getPacketReplicationEngineEntry)
889 .filter(e -> e.getTypeCase().equals(MULTICAST_GROUP_ENTRY))
890 .map(PacketReplicationEngineEntry::getMulticastGroupEntry)
891 .map(MulticastGroupEntryCodec::decode)
Carmelo Cascone99c59db2019-01-17 15:39:35 -0800892 .collect(toList());
Carmelo Cascone58136812018-07-19 03:40:16 +0200893
894 log.debug("Retrieved {} multicast group entries from {}...",
895 mcEntries.size(), deviceId);
896
897 return mcEntries;
898 }
899
Carmelo Casconee44592f2018-09-12 02:24:47 -0700900 private <T> boolean write(List<Update> updates,
901 List<T> writeEntities,
902 WriteOperationType opType,
903 String entryType) {
904 // True if all entities were successfully written.
Carmelo Cascone03ae0ac2018-10-11 08:31:59 -0700905 return writeAndReturnSuccessEntities(updates, writeEntities, opType, entryType)
906 .size() == writeEntities.size();
Carmelo Casconee44592f2018-09-12 02:24:47 -0700907 }
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700908
Carmelo Casconee44592f2018-09-12 02:24:47 -0700909 private <T> List<T> writeAndReturnSuccessEntities(
910 List<Update> updates, List<T> writeEntities,
911 WriteOperationType opType, String entryType) {
Carmelo Cascone03ae0ac2018-10-11 08:31:59 -0700912 if (updates.isEmpty()) {
913 return Collections.emptyList();
914 }
915 if (updates.size() != writeEntities.size()) {
916 log.error("Cannot perform {} operation, provided {} " +
917 "update messages for {} {} - BUG?",
918 opType, updates.size(), writeEntities.size(), entryType);
919 return Collections.emptyList();
920 }
Carmelo Casconee44592f2018-09-12 02:24:47 -0700921 try {
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700922 //noinspection ResultOfMethodCallIgnored
Carmelo Cascone58136812018-07-19 03:40:16 +0200923 blockingStub.write(writeRequest(updates));
Carmelo Casconee44592f2018-09-12 02:24:47 -0700924 return writeEntities;
Carmelo Cascone58136812018-07-19 03:40:16 +0200925 } catch (StatusRuntimeException e) {
Carmelo Casconee44592f2018-09-12 02:24:47 -0700926 return checkAndLogWriteErrors(writeEntities, e, opType, entryType);
Carmelo Cascone58136812018-07-19 03:40:16 +0200927 }
928 }
929
930 private WriteRequest writeRequest(Iterable<Update> updateMsgs) {
931 return WriteRequest.newBuilder()
Frank Wangd7e3b4b2017-09-24 13:37:54 +0900932 .setDeviceId(p4DeviceId)
Carmelo Casconee5b28722018-06-22 17:28:28 +0200933 .setElectionId(clientElectionId)
Frank Wangd7e3b4b2017-09-24 13:37:54 +0900934 .addAllUpdates(updateMsgs)
935 .build();
Frank Wangd7e3b4b2017-09-24 13:37:54 +0900936 }
937
Carmelo Cascone99c59db2019-01-17 15:39:35 -0800938 private Stream<Entity> blockingRead(Entity entity, Entity.EntityCase entityCase) {
939 return blockingRead(singletonList(entity), entityCase);
940 }
941
942 private Stream<Entity> blockingRead(Iterable<Entity> entities,
943 Entity.EntityCase... entityCases) {
944 // Build read request making sure we are reading what declared.
945 final ReadRequest.Builder reqBuilder = ReadRequest.newBuilder()
946 .setDeviceId(p4DeviceId);
947 final Set<Entity.EntityCase> entityCaseSet = Sets.newHashSet(entityCases);
948 for (Entity e : entities) {
949 checkArgument(entityCaseSet.contains(e.getEntityCase()),
950 "Entity case mismatch");
951 reqBuilder.addEntities(e);
952 }
953 final ReadRequest readRequest = reqBuilder.build();
954 if (readRequest.getEntitiesCount() == 0) {
955 return Stream.empty();
956 }
957 // Issue read.
958 final Iterator<ReadResponse> responseIterator;
959 try {
960 responseIterator = blockingStub.read(readRequest);
961 } catch (StatusRuntimeException e) {
962 checkGrpcException(e);
963 final String caseString = entityCaseSet.stream()
964 .map(Entity.EntityCase::name)
965 .collect(joining("/"));
966 log.warn("Unable to read {} from {}: {}",
967 caseString, deviceId, e.getMessage());
968 log.debug("Exception during read", e);
969 return Stream.empty();
970 }
971 // Filter results.
972 return Tools.stream(() -> responseIterator)
973 .map(ReadResponse::getEntitiesList)
974 .flatMap(List::stream)
975 .filter(e -> entityCaseSet.contains(e.getEntityCase()));
976 }
977
Yi Tseng2a340f72018-11-02 16:52:47 -0700978 protected Void doShutdown() {
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700979 streamChannelManager.complete();
Yi Tseng2a340f72018-11-02 16:52:47 -0700980 return super.doShutdown();
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400981 }
982
Carmelo Casconee44592f2018-09-12 02:24:47 -0700983 // Returns the collection of succesfully write entities.
984 private <T> List<T> checkAndLogWriteErrors(
985 List<T> writeEntities, StatusRuntimeException ex,
Carmelo Casconee5b28722018-06-22 17:28:28 +0200986 WriteOperationType opType, String entryType) {
987
988 checkGrpcException(ex);
989
Carmelo Cascone943c6642018-09-11 13:01:03 -0700990 final List<P4RuntimeOuterClass.Error> errors = extractWriteErrorDetails(ex);
Carmelo Cascone5bc7e102018-02-18 18:27:55 -0800991
Carmelo Cascone943c6642018-09-11 13:01:03 -0700992 if (errors.isEmpty()) {
993 final String description = ex.getStatus().getDescription();
994 log.warn("Unable to {} {} {}(s) on {}: {}",
Carmelo Cascone50d195f2018-09-11 13:26:38 -0700995 opType.name(), writeEntities.size(), entryType, deviceId,
996 ex.getStatus().getCode().name(),
Carmelo Cascone943c6642018-09-11 13:01:03 -0700997 description == null ? "" : " - " + description);
Carmelo Casconee44592f2018-09-12 02:24:47 -0700998 return Collections.emptyList();
Carmelo Cascone5bc7e102018-02-18 18:27:55 -0800999 }
1000
Carmelo Cascone5bc7e102018-02-18 18:27:55 -08001001 if (errors.size() == writeEntities.size()) {
Carmelo Casconee44592f2018-09-12 02:24:47 -07001002 List<T> okEntities = Lists.newArrayList();
1003 Iterator<T> entityIterator = writeEntities.iterator();
1004 for (P4RuntimeOuterClass.Error error : errors) {
1005 T entity = entityIterator.next();
1006 if (error.getCanonicalCode() != Status.OK.getCode().value()) {
1007 log.warn("Unable to {} {} on {}: {} [{}]",
1008 opType.name(), entryType, deviceId,
1009 parseP4Error(error), entity.toString());
1010 } else {
1011 okEntities.add(entity);
1012 }
1013 }
1014 return okEntities;
Carmelo Cascone5bc7e102018-02-18 18:27:55 -08001015 } else {
Carmelo Casconeb5324e72018-11-25 02:26:32 -08001016 log.warn("Unable to reconcile error details to {} updates " +
Carmelo Casconee44592f2018-09-12 02:24:47 -07001017 "(sent {} updates, but device returned {} errors)",
1018 entryType, writeEntities.size(), errors.size());
Carmelo Cascone5bc7e102018-02-18 18:27:55 -08001019 errors.stream()
1020 .filter(err -> err.getCanonicalCode() != Status.OK.getCode().value())
1021 .forEach(err -> log.warn("Unable to {} {} (unknown): {}",
Carmelo Cascone58136812018-07-19 03:40:16 +02001022 opType.name(), entryType, parseP4Error(err)));
Carmelo Casconee44592f2018-09-12 02:24:47 -07001023 return Collections.emptyList();
Carmelo Cascone5bc7e102018-02-18 18:27:55 -08001024 }
1025 }
1026
1027 private List<P4RuntimeOuterClass.Error> extractWriteErrorDetails(
Carmelo Cascone943c6642018-09-11 13:01:03 -07001028 StatusRuntimeException ex) {
1029 if (!ex.getTrailers().containsKey(STATUS_DETAILS_KEY)) {
Carmelo Cascone5bc7e102018-02-18 18:27:55 -08001030 return Collections.emptyList();
1031 }
Carmelo Cascone943c6642018-09-11 13:01:03 -07001032 com.google.rpc.Status status = ex.getTrailers().get(STATUS_DETAILS_KEY);
1033 if (status == null) {
1034 return Collections.emptyList();
1035 }
Carmelo Cascone5bc7e102018-02-18 18:27:55 -08001036 return status.getDetailsList().stream()
1037 .map(any -> {
1038 try {
1039 return any.unpack(P4RuntimeOuterClass.Error.class);
1040 } catch (InvalidProtocolBufferException e) {
1041 log.warn("Unable to unpack P4Runtime Error: {}",
1042 any.toString());
1043 return null;
1044 }
1045 })
1046 .filter(Objects::nonNull)
Carmelo Cascone99c59db2019-01-17 15:39:35 -08001047 .collect(toList());
Carmelo Cascone5bc7e102018-02-18 18:27:55 -08001048 }
1049
1050 private String parseP4Error(P4RuntimeOuterClass.Error err) {
Carmelo Cascone943c6642018-09-11 13:01:03 -07001051 return format("%s %s%s (%s:%d)",
1052 Status.fromCodeValue(err.getCanonicalCode()).getCode(),
Carmelo Cascone5bc7e102018-02-18 18:27:55 -08001053 err.getMessage(),
Carmelo Cascone943c6642018-09-11 13:01:03 -07001054 err.hasDetails() ? ", " + err.getDetails().toString() : "",
Carmelo Cascone5bc7e102018-02-18 18:27:55 -08001055 err.getSpace(),
Carmelo Cascone943c6642018-09-11 13:01:03 -07001056 err.getCode());
Carmelo Cascone5bc7e102018-02-18 18:27:55 -08001057 }
1058
Carmelo Casconee5b28722018-06-22 17:28:28 +02001059 private void checkGrpcException(StatusRuntimeException ex) {
1060 switch (ex.getStatus().getCode()) {
1061 case OK:
1062 break;
1063 case CANCELLED:
1064 break;
1065 case UNKNOWN:
1066 break;
1067 case INVALID_ARGUMENT:
1068 break;
1069 case DEADLINE_EXCEEDED:
1070 break;
1071 case NOT_FOUND:
1072 break;
1073 case ALREADY_EXISTS:
1074 break;
1075 case PERMISSION_DENIED:
1076 // Notify upper layers that this node is not master.
1077 controller.postEvent(new P4RuntimeEvent(
Carmelo Casconede3b6842018-09-05 17:45:10 -07001078 P4RuntimeEvent.Type.PERMISSION_DENIED,
1079 new BaseP4RuntimeEventSubject(deviceId)));
Carmelo Casconee5b28722018-06-22 17:28:28 +02001080 break;
1081 case RESOURCE_EXHAUSTED:
1082 break;
1083 case FAILED_PRECONDITION:
1084 break;
1085 case ABORTED:
1086 break;
1087 case OUT_OF_RANGE:
1088 break;
1089 case UNIMPLEMENTED:
1090 break;
1091 case INTERNAL:
1092 break;
1093 case UNAVAILABLE:
1094 // Channel might be closed.
1095 controller.postEvent(new P4RuntimeEvent(
1096 P4RuntimeEvent.Type.CHANNEL_EVENT,
1097 new ChannelEvent(deviceId, ChannelEvent.Type.ERROR)));
1098 break;
1099 case DATA_LOSS:
1100 break;
1101 case UNAUTHENTICATED:
1102 break;
1103 default:
1104 break;
1105 }
1106 }
1107
1108 private Uint128 bigIntegerToUint128(BigInteger value) {
1109 final byte[] arr = value.toByteArray();
1110 final ByteBuffer bb = ByteBuffer.allocate(Long.BYTES * 2)
1111 .put(new byte[Long.BYTES * 2 - arr.length])
1112 .put(arr);
1113 bb.rewind();
1114 return Uint128.newBuilder()
1115 .setHigh(bb.getLong())
1116 .setLow(bb.getLong())
1117 .build();
1118 }
1119
1120 private BigInteger uint128ToBigInteger(Uint128 value) {
1121 return new BigInteger(
1122 ByteBuffer.allocate(Long.BYTES * 2)
1123 .putLong(value.getHigh())
1124 .putLong(value.getLow())
1125 .array());
1126 }
1127
Carmelo Cascone8d99b172017-07-18 17:26:31 -04001128 /**
Carmelo Cascone9e4972c2018-08-30 00:29:16 -07001129 * A manager for the P4Runtime stream channel that opportunistically creates
1130 * new stream RCP stubs (e.g. when one fails because of errors) and posts
1131 * channel events via the P4Runtime controller.
1132 */
1133 private final class StreamChannelManager {
1134
1135 private final ManagedChannel channel;
1136 private final AtomicBoolean open;
1137 private final StreamObserver<StreamMessageResponse> responseObserver;
1138 private ClientCallStreamObserver<StreamMessageRequest> requestObserver;
1139
1140 private StreamChannelManager(ManagedChannel channel) {
1141 this.channel = channel;
1142 this.responseObserver = new InternalStreamResponseObserver(this);
1143 this.open = new AtomicBoolean(false);
1144 }
1145
1146 private void initIfRequired() {
1147 if (requestObserver == null) {
1148 log.debug("Creating new stream channel for {}...", deviceId);
1149 requestObserver =
1150 (ClientCallStreamObserver<StreamMessageRequest>)
1151 P4RuntimeGrpc.newStub(channel)
1152 .streamChannel(responseObserver);
1153 open.set(false);
1154 }
1155 }
1156
1157 public boolean send(StreamMessageRequest value) {
1158 synchronized (this) {
1159 initIfRequired();
1160 try {
1161 requestObserver.onNext(value);
1162 // FIXME
1163 // signalOpen();
1164 return true;
1165 } catch (Throwable ex) {
1166 if (ex instanceof StatusRuntimeException) {
1167 log.warn("Unable to send {} to {}: {}",
1168 value.getUpdateCase().toString(), deviceId, ex.getMessage());
1169 } else {
1170 log.warn(format(
1171 "Exception while sending %s to %s",
1172 value.getUpdateCase().toString(), deviceId), ex);
1173 }
1174 complete();
1175 return false;
1176 }
1177 }
1178 }
1179
1180 public void complete() {
1181 synchronized (this) {
1182 signalClosed();
1183 if (requestObserver != null) {
1184 requestObserver.onCompleted();
1185 requestObserver.cancel("Terminated", null);
1186 requestObserver = null;
1187 }
1188 }
1189 }
1190
1191 void signalOpen() {
1192 synchronized (this) {
1193 final boolean wasOpen = open.getAndSet(true);
1194 if (!wasOpen) {
1195 controller.postEvent(new P4RuntimeEvent(
1196 P4RuntimeEvent.Type.CHANNEL_EVENT,
1197 new ChannelEvent(deviceId, ChannelEvent.Type.OPEN)));
1198 }
1199 }
1200 }
1201
1202 void signalClosed() {
1203 synchronized (this) {
1204 final boolean wasOpen = open.getAndSet(false);
1205 if (wasOpen) {
1206 controller.postEvent(new P4RuntimeEvent(
1207 P4RuntimeEvent.Type.CHANNEL_EVENT,
1208 new ChannelEvent(deviceId, ChannelEvent.Type.CLOSED)));
1209 }
1210 }
1211 }
1212
1213 public boolean isOpen() {
1214 return open.get();
1215 }
1216 }
1217
1218 /**
Carmelo Cascone8d99b172017-07-18 17:26:31 -04001219 * Handles messages received from the device on the stream channel.
1220 */
Carmelo Cascone9e4972c2018-08-30 00:29:16 -07001221 private final class InternalStreamResponseObserver
Carmelo Casconee5b28722018-06-22 17:28:28 +02001222 implements StreamObserver<StreamMessageResponse> {
Carmelo Casconef7aa3f92017-07-06 23:56:50 -04001223
Carmelo Cascone9e4972c2018-08-30 00:29:16 -07001224 private final StreamChannelManager streamChannelManager;
1225
1226 private InternalStreamResponseObserver(
1227 StreamChannelManager streamChannelManager) {
1228 this.streamChannelManager = streamChannelManager;
1229 }
1230
Carmelo Casconef7aa3f92017-07-06 23:56:50 -04001231 @Override
1232 public void onNext(StreamMessageResponse message) {
Carmelo Cascone9e4972c2018-08-30 00:29:16 -07001233 streamChannelManager.signalOpen();
Carmelo Cascone8d99b172017-07-18 17:26:31 -04001234 executorService.submit(() -> doNext(message));
1235 }
Carmelo Casconef7aa3f92017-07-06 23:56:50 -04001236
Carmelo Cascone8d99b172017-07-18 17:26:31 -04001237 private void doNext(StreamMessageResponse message) {
Carmelo Casconea966c342017-07-30 01:56:30 -04001238 try {
Carmelo Casconee5b28722018-06-22 17:28:28 +02001239 log.debug("Received message on stream channel from {}: {}",
1240 deviceId, message.getUpdateCase());
Carmelo Casconea966c342017-07-30 01:56:30 -04001241 switch (message.getUpdateCase()) {
1242 case PACKET:
Carmelo Casconea966c342017-07-30 01:56:30 -04001243 doPacketIn(message.getPacket());
Andrea Campanella288b2732017-07-28 14:16:16 +02001244 return;
Carmelo Casconea966c342017-07-30 01:56:30 -04001245 case ARBITRATION:
Carmelo Casconee5b28722018-06-22 17:28:28 +02001246 doArbitrationResponse(message.getArbitration());
Carmelo Casconea966c342017-07-30 01:56:30 -04001247 return;
1248 default:
Carmelo Casconee5b28722018-06-22 17:28:28 +02001249 log.warn("Unrecognized stream message from {}: {}",
1250 deviceId, message.getUpdateCase());
Carmelo Casconea966c342017-07-30 01:56:30 -04001251 }
1252 } catch (Throwable ex) {
Carmelo Casconee5b28722018-06-22 17:28:28 +02001253 log.error("Exception while processing stream message from {}",
1254 deviceId, ex);
Carmelo Casconef7aa3f92017-07-06 23:56:50 -04001255 }
Carmelo Casconef7aa3f92017-07-06 23:56:50 -04001256 }
1257
1258 @Override
1259 public void onError(Throwable throwable) {
Carmelo Cascone9e4972c2018-08-30 00:29:16 -07001260 if (throwable instanceof StatusRuntimeException) {
1261 StatusRuntimeException sre = (StatusRuntimeException) throwable;
1262 if (sre.getStatus().getCause() instanceof ConnectException) {
1263 log.warn("Device {} is unreachable ({})",
1264 deviceId, sre.getCause().getMessage());
1265 } else {
1266 log.warn("Received error on stream channel for {}: {}",
1267 deviceId, throwable.getMessage());
1268 }
1269 } else {
1270 log.warn(format("Received exception on stream channel for %s",
1271 deviceId), throwable);
1272 }
1273 streamChannelManager.complete();
Carmelo Casconef7aa3f92017-07-06 23:56:50 -04001274 }
1275
1276 @Override
1277 public void onCompleted() {
Carmelo Cascone59f57de2017-07-11 19:55:09 -04001278 log.warn("Stream channel for {} has completed", deviceId);
Carmelo Cascone9e4972c2018-08-30 00:29:16 -07001279 streamChannelManager.complete();
Carmelo Casconef7aa3f92017-07-06 23:56:50 -04001280 }
1281 }
Carmelo Cascone87892e22017-11-13 16:01:29 -08001282}