blob: b4d88a712b2637e1b226d4b5266ad3292fc0e22a [file] [log] [blame]
Carmelo Casconef7aa3f92017-07-06 23:56:50 -04001/*
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2017-present Open Networking Foundation
Carmelo Casconef7aa3f92017-07-06 23:56:50 -04003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.p4runtime.ctl;
18
Carmelo Cascone87b9b392017-10-02 18:33:20 +020019import com.google.common.collect.HashMultimap;
Carmelo Cascone8d99b172017-07-18 17:26:31 -040020import com.google.common.collect.ImmutableMap;
Yi Tseng82512da2017-08-16 19:46:36 -070021import com.google.common.collect.Lists;
Yi Tseng82512da2017-08-16 19:46:36 -070022import com.google.common.collect.Multimap;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040023import com.google.protobuf.ByteString;
Yi Tsenge67e1412018-01-31 17:35:20 -080024import com.google.protobuf.InvalidProtocolBufferException;
Carmelo Cascone59f57de2017-07-11 19:55:09 -040025import io.grpc.Context;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040026import io.grpc.ManagedChannel;
27import io.grpc.Status;
28import io.grpc.StatusRuntimeException;
29import io.grpc.stub.StreamObserver;
Carmelo Cascone5bc7e102018-02-18 18:27:55 -080030import org.apache.commons.lang3.tuple.ImmutablePair;
Andrea Campanella288b2732017-07-28 14:16:16 +020031import org.onlab.osgi.DefaultServiceDirectory;
Carmelo Casconee5b28722018-06-22 17:28:28 +020032import org.onlab.util.SharedExecutors;
Yi Tseng82512da2017-08-16 19:46:36 -070033import org.onlab.util.Tools;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040034import org.onosproject.net.DeviceId;
Carmelo Cascone87892e22017-11-13 16:01:29 -080035import org.onosproject.net.pi.model.PiActionProfileId;
36import org.onosproject.net.pi.model.PiCounterId;
Carmelo Cascone81929aa2018-04-07 01:38:55 -070037import org.onosproject.net.pi.model.PiMeterId;
Andrea Campanella432f7182017-07-14 18:43:27 +020038import org.onosproject.net.pi.model.PiPipeconf;
Carmelo Cascone87892e22017-11-13 16:01:29 -080039import org.onosproject.net.pi.model.PiTableId;
Carmelo Cascone87b9b392017-10-02 18:33:20 +020040import org.onosproject.net.pi.runtime.PiActionGroup;
41import org.onosproject.net.pi.runtime.PiActionGroupMember;
Carmelo Casconeb045ddc2017-09-01 01:26:35 +020042import org.onosproject.net.pi.runtime.PiCounterCellData;
43import org.onosproject.net.pi.runtime.PiCounterCellId;
Carmelo Cascone5bc7e102018-02-18 18:27:55 -080044import org.onosproject.net.pi.runtime.PiEntity;
Frank Wangd7e3b4b2017-09-24 13:37:54 +090045import org.onosproject.net.pi.runtime.PiMeterCellConfig;
46import org.onosproject.net.pi.runtime.PiMeterCellId;
Carmelo Cascone58136812018-07-19 03:40:16 +020047import org.onosproject.net.pi.runtime.PiMulticastGroupEntry;
Andrea Campanella432f7182017-07-14 18:43:27 +020048import org.onosproject.net.pi.runtime.PiPacketOperation;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040049import org.onosproject.net.pi.runtime.PiTableEntry;
Yi Tsenge67e1412018-01-31 17:35:20 -080050import org.onosproject.net.pi.service.PiPipeconfService;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040051import org.onosproject.p4runtime.api.P4RuntimeClient;
52import org.onosproject.p4runtime.api.P4RuntimeEvent;
53import org.slf4j.Logger;
Carmelo Casconee5b28722018-06-22 17:28:28 +020054import p4.config.v1.P4InfoOuterClass.P4Info;
55import p4.tmp.P4Config;
Carmelo Cascone6af4e172018-06-15 16:01:30 +020056import p4.v1.P4RuntimeGrpc;
57import p4.v1.P4RuntimeOuterClass;
58import p4.v1.P4RuntimeOuterClass.ActionProfileGroup;
59import p4.v1.P4RuntimeOuterClass.ActionProfileMember;
60import p4.v1.P4RuntimeOuterClass.Entity;
61import p4.v1.P4RuntimeOuterClass.ForwardingPipelineConfig;
62import p4.v1.P4RuntimeOuterClass.MasterArbitrationUpdate;
Carmelo Cascone58136812018-07-19 03:40:16 +020063import p4.v1.P4RuntimeOuterClass.MulticastGroupEntry;
64import p4.v1.P4RuntimeOuterClass.PacketReplicationEngineEntry;
Carmelo Cascone6af4e172018-06-15 16:01:30 +020065import p4.v1.P4RuntimeOuterClass.ReadRequest;
66import p4.v1.P4RuntimeOuterClass.ReadResponse;
67import p4.v1.P4RuntimeOuterClass.SetForwardingPipelineConfigRequest;
68import p4.v1.P4RuntimeOuterClass.StreamMessageRequest;
69import p4.v1.P4RuntimeOuterClass.StreamMessageResponse;
70import p4.v1.P4RuntimeOuterClass.TableEntry;
71import p4.v1.P4RuntimeOuterClass.Uint128;
72import p4.v1.P4RuntimeOuterClass.Update;
73import p4.v1.P4RuntimeOuterClass.WriteRequest;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040074
Carmelo Casconee5b28722018-06-22 17:28:28 +020075import java.math.BigInteger;
Carmelo Casconed61fdb32017-10-30 10:09:57 -070076import java.nio.ByteBuffer;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040077import java.util.Collection;
Carmelo Cascone8d99b172017-07-18 17:26:31 -040078import java.util.Collections;
79import java.util.Iterator;
80import java.util.List;
81import java.util.Map;
Carmelo Cascone87b9b392017-10-02 18:33:20 +020082import java.util.Objects;
Carmelo Casconeb045ddc2017-09-01 01:26:35 +020083import java.util.Set;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040084import java.util.concurrent.CompletableFuture;
Carmelo Cascone8d99b172017-07-18 17:26:31 -040085import java.util.concurrent.Executor;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040086import java.util.concurrent.ExecutorService;
Carmelo Cascone8d99b172017-07-18 17:26:31 -040087import java.util.concurrent.Executors;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040088import java.util.concurrent.TimeUnit;
Carmelo Cascone8d99b172017-07-18 17:26:31 -040089import java.util.concurrent.locks.Lock;
90import java.util.concurrent.locks.ReentrantLock;
91import java.util.function.Supplier;
92import java.util.stream.Collectors;
93import java.util.stream.StreamSupport;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040094
Carmelo Casconed61fdb32017-10-30 10:09:57 -070095import static com.google.common.base.Preconditions.checkNotNull;
Carmelo Cascone5bc7e102018-02-18 18:27:55 -080096import static java.lang.String.format;
Carmelo Cascone8d99b172017-07-18 17:26:31 -040097import static org.onlab.util.Tools.groupedThreads;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040098import static org.slf4j.LoggerFactory.getLogger;
Carmelo Cascone6af4e172018-06-15 16:01:30 +020099import static p4.v1.P4RuntimeOuterClass.Entity.EntityCase.ACTION_PROFILE_GROUP;
100import static p4.v1.P4RuntimeOuterClass.Entity.EntityCase.ACTION_PROFILE_MEMBER;
Carmelo Cascone58136812018-07-19 03:40:16 +0200101import static p4.v1.P4RuntimeOuterClass.Entity.EntityCase.PACKET_REPLICATION_ENGINE_ENTRY;
Carmelo Cascone6af4e172018-06-15 16:01:30 +0200102import static p4.v1.P4RuntimeOuterClass.Entity.EntityCase.TABLE_ENTRY;
Carmelo Casconee5b28722018-06-22 17:28:28 +0200103import static p4.v1.P4RuntimeOuterClass.PacketIn;
Carmelo Cascone6af4e172018-06-15 16:01:30 +0200104import static p4.v1.P4RuntimeOuterClass.PacketOut;
Carmelo Cascone58136812018-07-19 03:40:16 +0200105import static p4.v1.P4RuntimeOuterClass.PacketReplicationEngineEntry.TypeCase.MULTICAST_GROUP_ENTRY;
Carmelo Cascone6af4e172018-06-15 16:01:30 +0200106import static p4.v1.P4RuntimeOuterClass.SetForwardingPipelineConfigRequest.Action.VERIFY_AND_COMMIT;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400107
108/**
109 * Implementation of a P4Runtime client.
110 */
Carmelo Casconee5b28722018-06-22 17:28:28 +0200111final class P4RuntimeClientImpl implements P4RuntimeClient {
112
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200113 // Timeout in seconds to obtain the request lock.
114 private static final int LOCK_TIMEOUT = 60;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400115
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400116 private static final Map<WriteOperationType, Update.Type> UPDATE_TYPES = ImmutableMap.of(
117 WriteOperationType.UNSPECIFIED, Update.Type.UNSPECIFIED,
118 WriteOperationType.INSERT, Update.Type.INSERT,
119 WriteOperationType.MODIFY, Update.Type.MODIFY,
120 WriteOperationType.DELETE, Update.Type.DELETE
121 );
122
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400123 private final Logger log = getLogger(getClass());
124
Carmelo Casconee5b28722018-06-22 17:28:28 +0200125 private final Lock requestLock = new ReentrantLock();
126 private final Context.CancellableContext cancellableContext =
127 Context.current().withCancellation();
128
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400129 private final DeviceId deviceId;
Carmelo Casconef423bec2017-08-30 01:56:25 +0200130 private final long p4DeviceId;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400131 private final P4RuntimeControllerImpl controller;
132 private final P4RuntimeGrpc.P4RuntimeBlockingStub blockingStub;
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400133 private final ExecutorService executorService;
134 private final Executor contextExecutor;
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400135 private final StreamObserver<StreamMessageRequest> streamRequestObserver;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400136
Carmelo Casconee5b28722018-06-22 17:28:28 +0200137 // Used by this client for write requests.
138 private Uint128 clientElectionId = Uint128.newBuilder().setLow(1).build();
Yi Tseng3e7f1452017-10-20 10:31:53 -0700139
Yi Tseng82512da2017-08-16 19:46:36 -0700140 /**
141 * Default constructor.
142 *
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200143 * @param deviceId the ONOS device id
Yi Tseng82512da2017-08-16 19:46:36 -0700144 * @param p4DeviceId the P4 device id
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200145 * @param channel gRPC channel
Yi Tseng82512da2017-08-16 19:46:36 -0700146 * @param controller runtime client controller
147 */
Carmelo Casconef423bec2017-08-30 01:56:25 +0200148 P4RuntimeClientImpl(DeviceId deviceId, long p4DeviceId, ManagedChannel channel,
149 P4RuntimeControllerImpl controller) {
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400150 this.deviceId = deviceId;
151 this.p4DeviceId = p4DeviceId;
152 this.controller = controller;
Carmelo Casconea966c342017-07-30 01:56:30 -0400153 this.executorService = Executors.newFixedThreadPool(15, groupedThreads(
Carmelo Casconee5b28722018-06-22 17:28:28 +0200154 "onos-p4runtime-client-" + deviceId.toString(), "%d"));
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400155 this.contextExecutor = this.cancellableContext.fixedContextExecutor(executorService);
Carmelo Casconee5b28722018-06-22 17:28:28 +0200156 //TODO Investigate use of stub deadlines instead of timeout in supplyInContext
Carmelo Cascone2cad9ef2017-08-01 21:52:07 +0200157 this.blockingStub = P4RuntimeGrpc.newBlockingStub(channel);
Carmelo Casconee5b28722018-06-22 17:28:28 +0200158 this.streamRequestObserver = P4RuntimeGrpc.newStub(channel)
159 .streamChannel(new StreamChannelResponseObserver());
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400160 }
161
162 /**
Carmelo Cascone58136812018-07-19 03:40:16 +0200163 * Submits a task for async execution via the given executor. All tasks
164 * submitted with this method will be executed sequentially.
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400165 */
Carmelo Casconee5b28722018-06-22 17:28:28 +0200166 private <U> CompletableFuture<U> supplyWithExecutor(
167 Supplier<U> supplier, String opDescription, Executor executor) {
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400168 return CompletableFuture.supplyAsync(() -> {
169 // TODO: explore a more relaxed locking strategy.
Carmelo Casconee5b28722018-06-22 17:28:28 +0200170 try {
171 if (!requestLock.tryLock(LOCK_TIMEOUT, TimeUnit.SECONDS)) {
172 log.error("LOCK TIMEOUT! This is likely a deadlock, "
173 + "please debug (executing {})",
174 opDescription);
175 throw new IllegalThreadStateException("Lock timeout");
176 }
177 } catch (InterruptedException e) {
178 log.warn("Thread interrupted while waiting for lock (executing {})",
179 opDescription);
Ray Milkeydbd38212018-07-02 09:18:09 -0700180 throw new IllegalStateException(e);
Carmelo Casconee5b28722018-06-22 17:28:28 +0200181 }
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400182 try {
183 return supplier.get();
Yi Tsenge67e1412018-01-31 17:35:20 -0800184 } catch (StatusRuntimeException ex) {
Carmelo Casconee5b28722018-06-22 17:28:28 +0200185 log.warn("Unable to execute {} on {}: {}",
186 opDescription, deviceId, ex.toString());
Yi Tsenge67e1412018-01-31 17:35:20 -0800187 throw ex;
Carmelo Casconea966c342017-07-30 01:56:30 -0400188 } catch (Throwable ex) {
Carmelo Casconee5b28722018-06-22 17:28:28 +0200189 log.error("Exception in client of {}, executing {}",
190 deviceId, opDescription, ex);
Carmelo Casconea966c342017-07-30 01:56:30 -0400191 throw ex;
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400192 } finally {
Carmelo Casconee5b28722018-06-22 17:28:28 +0200193 requestLock.unlock();
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400194 }
Carmelo Casconee5b28722018-06-22 17:28:28 +0200195 }, executor);
196 }
197
198 /**
199 * Equivalent of supplyWithExecutor using the gRPC context executor of this
200 * client, such that if the context is cancelled (e.g. client shutdown) the
201 * RPC is automatically cancelled.
202 */
203 private <U> CompletableFuture<U> supplyInContext(
204 Supplier<U> supplier, String opDescription) {
205 return supplyWithExecutor(supplier, opDescription, contextExecutor);
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400206 }
207
208 @Override
Carmelo Casconee5b28722018-06-22 17:28:28 +0200209 public CompletableFuture<Boolean> start() {
210 return supplyInContext(this::doInitStreamChannel,
211 "start-initStreamChannel");
212 }
213
214 @Override
215 public CompletableFuture<Void> shutdown() {
216 return supplyWithExecutor(this::doShutdown, "shutdown",
217 SharedExecutors.getPoolThreadExecutor());
218 }
219
220 @Override
221 public CompletableFuture<Boolean> becomeMaster() {
222 return supplyInContext(this::doBecomeMaster,
223 "becomeMaster");
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400224 }
225
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400226 @Override
Carmelo Casconed61fdb32017-10-30 10:09:57 -0700227 public CompletableFuture<Boolean> setPipelineConfig(PiPipeconf pipeconf, ByteBuffer deviceData) {
228 return supplyInContext(() -> doSetPipelineConfig(pipeconf, deviceData), "setPipelineConfig");
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400229 }
230
231 @Override
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400232 public CompletableFuture<Boolean> writeTableEntries(Collection<PiTableEntry> piTableEntries,
233 WriteOperationType opType, PiPipeconf pipeconf) {
Carmelo Cascone2cad9ef2017-08-01 21:52:07 +0200234 return supplyInContext(() -> doWriteTableEntries(piTableEntries, opType, pipeconf),
235 "writeTableEntries-" + opType.name());
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400236 }
237
238 @Override
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400239 public CompletableFuture<Collection<PiTableEntry>> dumpTable(PiTableId piTableId, PiPipeconf pipeconf) {
Carmelo Cascone2cad9ef2017-08-01 21:52:07 +0200240 return supplyInContext(() -> doDumpTable(piTableId, pipeconf), "dumpTable-" + piTableId);
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400241 }
242
243 @Override
Carmelo Casconee5b28722018-06-22 17:28:28 +0200244 public CompletableFuture<Collection<PiTableEntry>> dumpAllTables(PiPipeconf pipeconf) {
245 return supplyInContext(() -> doDumpTable(null, pipeconf), "dumpAllTables");
246 }
247
248 @Override
Andrea Campanella432f7182017-07-14 18:43:27 +0200249 public CompletableFuture<Boolean> packetOut(PiPacketOperation packet, PiPipeconf pipeconf) {
Carmelo Cascone2cad9ef2017-08-01 21:52:07 +0200250 return supplyInContext(() -> doPacketOut(packet, pipeconf), "packetOut");
Andrea Campanella432f7182017-07-14 18:43:27 +0200251 }
252
Carmelo Casconeb045ddc2017-09-01 01:26:35 +0200253 @Override
254 public CompletableFuture<Collection<PiCounterCellData>> readCounterCells(Set<PiCounterCellId> cellIds,
255 PiPipeconf pipeconf) {
256 return supplyInContext(() -> doReadCounterCells(cellIds, pipeconf),
257 "readCounterCells-" + cellIds.hashCode());
258 }
259
260 @Override
261 public CompletableFuture<Collection<PiCounterCellData>> readAllCounterCells(Set<PiCounterId> counterIds,
262 PiPipeconf pipeconf) {
Carmelo Cascone81929aa2018-04-07 01:38:55 -0700263 return supplyInContext(() -> doReadAllCounterCells(counterIds, pipeconf),
264 "readAllCounterCells-" + counterIds.hashCode());
Carmelo Casconeb045ddc2017-09-01 01:26:35 +0200265 }
266
Yi Tseng82512da2017-08-16 19:46:36 -0700267 @Override
Yi Tseng8d355132018-04-13 01:40:48 +0800268 public CompletableFuture<Boolean> writeActionGroupMembers(PiActionProfileId profileId,
269 Collection<PiActionGroupMember> members,
Yi Tseng82512da2017-08-16 19:46:36 -0700270 WriteOperationType opType,
271 PiPipeconf pipeconf) {
Yi Tseng8d355132018-04-13 01:40:48 +0800272 return supplyInContext(() -> doWriteActionGroupMembers(profileId, members, opType, pipeconf),
Yi Tseng82512da2017-08-16 19:46:36 -0700273 "writeActionGroupMembers-" + opType.name());
274 }
275
Yi Tseng8d355132018-04-13 01:40:48 +0800276
Yi Tseng82512da2017-08-16 19:46:36 -0700277 @Override
278 public CompletableFuture<Boolean> writeActionGroup(PiActionGroup group,
279 WriteOperationType opType,
280 PiPipeconf pipeconf) {
281 return supplyInContext(() -> doWriteActionGroup(group, opType, pipeconf),
282 "writeActionGroup-" + opType.name());
283 }
284
285 @Override
286 public CompletableFuture<Collection<PiActionGroup>> dumpGroups(PiActionProfileId actionProfileId,
287 PiPipeconf pipeconf) {
288 return supplyInContext(() -> doDumpGroups(actionProfileId, pipeconf),
289 "dumpGroups-" + actionProfileId.id());
290 }
291
Yi Tseng3e7f1452017-10-20 10:31:53 -0700292 @Override
Frank Wangd7e3b4b2017-09-24 13:37:54 +0900293 public CompletableFuture<Boolean> writeMeterCells(Collection<PiMeterCellConfig> cellIds, PiPipeconf pipeconf) {
294
295 return supplyInContext(() -> doWriteMeterCells(cellIds, pipeconf),
296 "writeMeterCells");
297 }
298
299 @Override
Carmelo Cascone58136812018-07-19 03:40:16 +0200300 public CompletableFuture<Boolean> writePreMulticastGroupEntries(
301 Collection<PiMulticastGroupEntry> entries,
302 WriteOperationType opType) {
303 return supplyInContext(() -> doWriteMulticastGroupEntries(entries, opType),
304 "writePreMulticastGroupEntries");
305 }
306
307 @Override
308 public CompletableFuture<Collection<PiMulticastGroupEntry>> readAllMulticastGroupEntries() {
309 return supplyInContext(this::doReadAllMulticastGroupEntries,
310 "readAllMulticastGroupEntries");
311 }
312
313 @Override
Frank Wangd7e3b4b2017-09-24 13:37:54 +0900314 public CompletableFuture<Collection<PiMeterCellConfig>> readMeterCells(Set<PiMeterCellId> cellIds,
315 PiPipeconf pipeconf) {
316 return supplyInContext(() -> doReadMeterCells(cellIds, pipeconf),
317 "readMeterCells-" + cellIds.hashCode());
318 }
319
320 @Override
321 public CompletableFuture<Collection<PiMeterCellConfig>> readAllMeterCells(Set<PiMeterId> meterIds,
Carmelo Cascone58136812018-07-19 03:40:16 +0200322 PiPipeconf pipeconf) {
Carmelo Cascone81929aa2018-04-07 01:38:55 -0700323 return supplyInContext(() -> doReadAllMeterCells(meterIds, pipeconf),
324 "readAllMeterCells-" + meterIds.hashCode());
Frank Wangd7e3b4b2017-09-24 13:37:54 +0900325 }
Yi Tseng3e7f1452017-10-20 10:31:53 -0700326
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400327 /* Blocking method implementations below */
328
Carmelo Casconee5b28722018-06-22 17:28:28 +0200329 private boolean doBecomeMaster() {
330 final Uint128 newId = bigIntegerToUint128(
331 controller.newMasterElectionId(deviceId));
332 if (sendMasterArbitrationUpdate(newId)) {
333 clientElectionId = newId;
334 return true;
335 }
336 return false;
337 }
Andrea Campanella1e573442018-05-17 17:07:13 +0200338
Carmelo Casconee5b28722018-06-22 17:28:28 +0200339 private boolean sendMasterArbitrationUpdate(Uint128 electionId) {
340 log.info("Sending arbitration update to {}... electionId={}",
Carmelo Cascone58136812018-07-19 03:40:16 +0200341 deviceId, uint128ToBigInteger(electionId));
Yi Tseng3e7f1452017-10-20 10:31:53 -0700342 try {
Carmelo Casconee5b28722018-06-22 17:28:28 +0200343 streamRequestObserver.onNext(
344 StreamMessageRequest.newBuilder()
345 .setArbitration(
346 MasterArbitrationUpdate
347 .newBuilder()
348 .setDeviceId(p4DeviceId)
349 .setElectionId(electionId)
350 .build())
351 .build());
352 return true;
Yi Tsenge67e1412018-01-31 17:35:20 -0800353 } catch (StatusRuntimeException e) {
Carmelo Cascone5bc7e102018-02-18 18:27:55 -0800354 log.error("Unable to perform arbitration update on {}: {}", deviceId, e.getMessage());
Yi Tseng3e7f1452017-10-20 10:31:53 -0700355 }
Carmelo Casconee5b28722018-06-22 17:28:28 +0200356 return false;
Yi Tseng3e7f1452017-10-20 10:31:53 -0700357 }
Carmelo Casconee5b28722018-06-22 17:28:28 +0200358
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400359 private boolean doInitStreamChannel() {
360 // To listen for packets and other events, we need to start the RPC.
Carmelo Casconee5b28722018-06-22 17:28:28 +0200361 // Here we send an empty StreamMessageRequest.
362 try {
363 log.info("Starting stream channel with {}...", deviceId);
364 streamRequestObserver.onNext(StreamMessageRequest.newBuilder().build());
365 return true;
366 } catch (StatusRuntimeException e) {
367 log.error("Unable to start stream channel with {}: {}",
368 deviceId, e.getMessage());
369 return false;
370 }
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400371 }
372
Carmelo Casconed61fdb32017-10-30 10:09:57 -0700373 private boolean doSetPipelineConfig(PiPipeconf pipeconf, ByteBuffer deviceData) {
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400374
Carmelo Casconed61fdb32017-10-30 10:09:57 -0700375 log.info("Setting pipeline config for {} to {}...", deviceId, pipeconf.id());
376
377 checkNotNull(deviceData, "deviceData cannot be null");
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400378
379 P4Info p4Info = PipeconfHelper.getP4Info(pipeconf);
380 if (p4Info == null) {
381 // Problem logged by PipeconfHelper.
382 return false;
383 }
384
Carmelo Casconed61fdb32017-10-30 10:09:57 -0700385 P4Config.P4DeviceConfig p4DeviceConfigMsg = P4Config.P4DeviceConfig
386 .newBuilder()
387 .setExtras(P4Config.P4DeviceConfig.Extras.getDefaultInstance())
388 .setReassign(true)
389 .setDeviceData(ByteString.copyFrom(deviceData))
390 .build();
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400391
Carmelo Casconed61fdb32017-10-30 10:09:57 -0700392 ForwardingPipelineConfig pipelineConfig = ForwardingPipelineConfig
Andrea Campanella0288c872017-08-07 18:32:51 +0200393 .newBuilder()
Carmelo Casconed61fdb32017-10-30 10:09:57 -0700394 .setP4Info(p4Info)
395 .setP4DeviceConfig(p4DeviceConfigMsg.toByteString())
396 .build();
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400397
398 SetForwardingPipelineConfigRequest request = SetForwardingPipelineConfigRequest
399 .newBuilder()
Andrea Campanella8bcd5862017-12-11 11:34:45 +0100400 .setDeviceId(p4DeviceId)
Carmelo Casconee5b28722018-06-22 17:28:28 +0200401 .setElectionId(clientElectionId)
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400402 .setAction(VERIFY_AND_COMMIT)
Andrea Campanella8bcd5862017-12-11 11:34:45 +0100403 .setConfig(pipelineConfig)
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400404 .build();
405
406 try {
407 this.blockingStub.setForwardingPipelineConfig(request);
Carmelo Casconed61fdb32017-10-30 10:09:57 -0700408 return true;
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400409 } catch (StatusRuntimeException ex) {
Carmelo Cascone5bc7e102018-02-18 18:27:55 -0800410 log.warn("Unable to set pipeline config on {}: {}", deviceId, ex.getMessage());
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400411 return false;
412 }
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400413 }
414
415 private boolean doWriteTableEntries(Collection<PiTableEntry> piTableEntries, WriteOperationType opType,
416 PiPipeconf pipeconf) {
Carmelo Cascone5bc7e102018-02-18 18:27:55 -0800417 if (piTableEntries.size() == 0) {
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400418 return true;
419 }
420
Carmelo Cascone81929aa2018-04-07 01:38:55 -0700421 Collection<Update> updateMsgs;
Carmelo Cascone5bc7e102018-02-18 18:27:55 -0800422 try {
423 updateMsgs = TableEntryEncoder.encode(piTableEntries, pipeconf)
424 .stream()
425 .map(tableEntryMsg ->
426 Update.newBuilder()
427 .setEntity(Entity.newBuilder()
428 .setTableEntry(tableEntryMsg)
429 .build())
430 .setType(UPDATE_TYPES.get(opType))
431 .build())
432 .collect(Collectors.toList());
433 } catch (EncodeException e) {
434 log.error("Unable to encode table entries, aborting {} operation: {}",
435 opType.name(), e.getMessage());
436 return false;
437 }
438
Carmelo Cascone58136812018-07-19 03:40:16 +0200439 return write(updateMsgs, piTableEntries, opType, "table entry");
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400440 }
441
442 private Collection<PiTableEntry> doDumpTable(PiTableId piTableId, PiPipeconf pipeconf) {
443
Carmelo Cascone9f007702017-08-24 13:30:51 +0200444 log.debug("Dumping table {} from {} (pipeconf {})...", piTableId, deviceId, pipeconf.id());
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400445
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400446 int tableId;
Carmelo Casconee5b28722018-06-22 17:28:28 +0200447 if (piTableId == null) {
448 // Dump all tables.
449 tableId = 0;
450 } else {
451 P4InfoBrowser browser = PipeconfHelper.getP4InfoBrowser(pipeconf);
Carmelo Cascone58136812018-07-19 03:40:16 +0200452 if (browser == null) {
453 log.warn("Unable to get a P4Info browser for pipeconf {}", pipeconf);
454 return Collections.emptyList();
455 }
Carmelo Casconee5b28722018-06-22 17:28:28 +0200456 try {
457 tableId = browser.tables().getByName(piTableId.id()).getPreamble().getId();
458 } catch (P4InfoBrowser.NotFoundException e) {
459 log.warn("Unable to dump table: {}", e.getMessage());
460 return Collections.emptyList();
461 }
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400462 }
463
464 ReadRequest requestMsg = ReadRequest.newBuilder()
465 .setDeviceId(p4DeviceId)
466 .addEntities(Entity.newBuilder()
467 .setTableEntry(TableEntry.newBuilder()
468 .setTableId(tableId)
469 .build())
470 .build())
471 .build();
472
473 Iterator<ReadResponse> responses;
474 try {
475 responses = blockingStub.read(requestMsg);
476 } catch (StatusRuntimeException e) {
Carmelo Cascone5bc7e102018-02-18 18:27:55 -0800477 log.warn("Unable to dump table {} from {}: {}", piTableId, deviceId, e.getMessage());
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400478 return Collections.emptyList();
479 }
480
481 Iterable<ReadResponse> responseIterable = () -> responses;
482 List<TableEntry> tableEntryMsgs = StreamSupport
483 .stream(responseIterable.spliterator(), false)
484 .map(ReadResponse::getEntitiesList)
485 .flatMap(List::stream)
486 .filter(entity -> entity.getEntityCase() == TABLE_ENTRY)
487 .map(Entity::getTableEntry)
488 .collect(Collectors.toList());
489
Carmelo Cascone9f007702017-08-24 13:30:51 +0200490 log.debug("Retrieved {} entries from table {} on {}...", tableEntryMsgs.size(), piTableId, deviceId);
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400491
492 return TableEntryEncoder.decode(tableEntryMsgs, pipeconf);
493 }
494
Andrea Campanellafc1d34c2017-07-18 17:01:41 +0200495 private boolean doPacketOut(PiPacketOperation packet, PiPipeconf pipeconf) {
496 try {
497 //encode the PiPacketOperation into a PacketOut
498 PacketOut packetOut = PacketIOCodec.encodePacketOut(packet, pipeconf);
499
500 //Build the request
501 StreamMessageRequest packetOutRequest = StreamMessageRequest
502 .newBuilder().setPacket(packetOut).build();
503
504 //Send the request
505 streamRequestObserver.onNext(packetOutRequest);
506
507 } catch (P4InfoBrowser.NotFoundException e) {
508 log.error("Cant find expected metadata in p4Info file. {}", e.getMessage());
509 log.debug("Exception", e);
510 return false;
511 }
512 return true;
513 }
514
Carmelo Casconea966c342017-07-30 01:56:30 -0400515 private void doPacketIn(PacketIn packetInMsg) {
516
517 // Retrieve the pipeconf for this client's device.
518 PiPipeconfService pipeconfService = DefaultServiceDirectory.getService(PiPipeconfService.class);
519 if (pipeconfService == null) {
520 throw new IllegalStateException("PiPipeconfService is null. Can't handle packet in.");
521 }
522 final PiPipeconf pipeconf;
523 if (pipeconfService.ofDevice(deviceId).isPresent() &&
524 pipeconfService.getPipeconf(pipeconfService.ofDevice(deviceId).get()).isPresent()) {
525 pipeconf = pipeconfService.getPipeconf(pipeconfService.ofDevice(deviceId).get()).get();
526 } else {
527 log.warn("Unable to get pipeconf of {}. Can't handle packet in", deviceId);
528 return;
529 }
530 // Decode packet message and post event.
Carmelo Cascone87892e22017-11-13 16:01:29 -0800531 PiPacketOperation packetOperation = PacketIOCodec.decodePacketIn(packetInMsg, pipeconf, deviceId);
Carmelo Casconee5b28722018-06-22 17:28:28 +0200532 PacketInEvent packetInEventSubject = new PacketInEvent(deviceId, packetOperation);
Carmelo Cascone2cad9ef2017-08-01 21:52:07 +0200533 P4RuntimeEvent event = new P4RuntimeEvent(P4RuntimeEvent.Type.PACKET_IN, packetInEventSubject);
Carmelo Casconea966c342017-07-30 01:56:30 -0400534 log.debug("Received packet in: {}", event);
535 controller.postEvent(event);
536 }
537
Carmelo Casconee5b28722018-06-22 17:28:28 +0200538 private void doArbitrationResponse(MasterArbitrationUpdate msg) {
539 // From the spec...
540 // - Election_id: The stream RPC with the highest election_id is the
541 // master. Switch populates with the highest election ID it
542 // has received from all connected controllers.
543 // - Status: Switch populates this with OK for the client that is the
544 // master, and with an error status for all other connected clients (at
545 // every mastership change).
546 if (!msg.hasElectionId() || !msg.hasStatus()) {
Yi Tseng3e7f1452017-10-20 10:31:53 -0700547 return;
548 }
Carmelo Casconee5b28722018-06-22 17:28:28 +0200549 final boolean isMaster = msg.getStatus().getCode() == Status.OK.getCode().value();
550 log.info("Received arbitration update from {}: isMaster={}, electionId={}",
551 deviceId, isMaster, uint128ToBigInteger(msg.getElectionId()));
552 controller.postEvent(new P4RuntimeEvent(
553 P4RuntimeEvent.Type.ARBITRATION_RESPONSE,
554 new ArbitrationResponse(deviceId, isMaster)));
Carmelo Casconea966c342017-07-30 01:56:30 -0400555 }
556
Carmelo Cascone81929aa2018-04-07 01:38:55 -0700557 private Collection<PiCounterCellData> doReadAllCounterCells(
558 Collection<PiCounterId> counterIds, PiPipeconf pipeconf) {
559 return doReadCounterEntities(
560 CounterEntryCodec.readAllCellsEntities(counterIds, pipeconf),
561 pipeconf);
562 }
Carmelo Casconeb045ddc2017-09-01 01:26:35 +0200563
Carmelo Cascone81929aa2018-04-07 01:38:55 -0700564 private Collection<PiCounterCellData> doReadCounterCells(
565 Collection<PiCounterCellId> cellIds, PiPipeconf pipeconf) {
566 return doReadCounterEntities(
567 CounterEntryCodec.encodePiCounterCellIds(cellIds, pipeconf),
568 pipeconf);
569 }
570
571 private Collection<PiCounterCellData> doReadCounterEntities(
572 Collection<Entity> counterEntities, PiPipeconf pipeconf) {
573
574 if (counterEntities.size() == 0) {
575 return Collections.emptyList();
576 }
Carmelo Casconeb045ddc2017-09-01 01:26:35 +0200577
Carmelo Cascone7f75be42017-09-07 14:37:02 +0200578 final ReadRequest request = ReadRequest.newBuilder()
579 .setDeviceId(p4DeviceId)
Carmelo Cascone81929aa2018-04-07 01:38:55 -0700580 .addAllEntities(counterEntities)
Carmelo Cascone7f75be42017-09-07 14:37:02 +0200581 .build();
582
Carmelo Cascone7f75be42017-09-07 14:37:02 +0200583 final Iterable<ReadResponse> responses;
Carmelo Casconeb045ddc2017-09-01 01:26:35 +0200584 try {
Carmelo Cascone7f75be42017-09-07 14:37:02 +0200585 responses = () -> blockingStub.read(request);
Carmelo Casconeb045ddc2017-09-01 01:26:35 +0200586 } catch (StatusRuntimeException e) {
Carmelo Cascone5bc7e102018-02-18 18:27:55 -0800587 log.warn("Unable to read counter cells from {}: {}", deviceId, e.getMessage());
Carmelo Casconeb045ddc2017-09-01 01:26:35 +0200588 return Collections.emptyList();
589 }
590
Carmelo Cascone7f75be42017-09-07 14:37:02 +0200591 List<Entity> entities = StreamSupport.stream(responses.spliterator(), false)
Carmelo Casconeb045ddc2017-09-01 01:26:35 +0200592 .map(ReadResponse::getEntitiesList)
593 .flatMap(List::stream)
Carmelo Cascone7f75be42017-09-07 14:37:02 +0200594 .collect(Collectors.toList());
Carmelo Casconeb045ddc2017-09-01 01:26:35 +0200595
Carmelo Cascone81929aa2018-04-07 01:38:55 -0700596 return CounterEntryCodec.decodeCounterEntities(entities, pipeconf);
Carmelo Casconeb045ddc2017-09-01 01:26:35 +0200597 }
598
Yi Tseng8d355132018-04-13 01:40:48 +0800599 private boolean doWriteActionGroupMembers(PiActionProfileId profileId, Collection<PiActionGroupMember> members,
600 WriteOperationType opType, PiPipeconf pipeconf) {
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200601 final Collection<ActionProfileMember> actionProfileMembers = Lists.newArrayList();
Carmelo Cascone5bc7e102018-02-18 18:27:55 -0800602
Yi Tseng8d355132018-04-13 01:40:48 +0800603 for (PiActionGroupMember member : members) {
Carmelo Cascone5bc7e102018-02-18 18:27:55 -0800604 try {
Yi Tseng8d355132018-04-13 01:40:48 +0800605 actionProfileMembers.add(ActionProfileMemberEncoder.encode(profileId, member, pipeconf));
Carmelo Cascone5bc7e102018-02-18 18:27:55 -0800606 } catch (EncodeException | P4InfoBrowser.NotFoundException e) {
607 log.warn("Unable to encode group member, aborting {} operation: {} [{}]",
608 opType.name(), e.getMessage(), member.toString());
609 return false;
Yi Tseng82512da2017-08-16 19:46:36 -0700610 }
Yi Tseng82512da2017-08-16 19:46:36 -0700611 }
612
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200613 final Collection<Update> updateMsgs = actionProfileMembers.stream()
Yi Tseng82512da2017-08-16 19:46:36 -0700614 .map(actionProfileMember ->
615 Update.newBuilder()
616 .setEntity(Entity.newBuilder()
617 .setActionProfileMember(actionProfileMember)
618 .build())
619 .setType(UPDATE_TYPES.get(opType))
620 .build())
621 .collect(Collectors.toList());
622
623 if (updateMsgs.size() == 0) {
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200624 // Nothing to update.
Yi Tseng82512da2017-08-16 19:46:36 -0700625 return true;
626 }
627
Carmelo Cascone58136812018-07-19 03:40:16 +0200628 return write(updateMsgs, members, opType, "group member");
Yi Tseng82512da2017-08-16 19:46:36 -0700629 }
630
631 private Collection<PiActionGroup> doDumpGroups(PiActionProfileId piActionProfileId, PiPipeconf pipeconf) {
632 log.debug("Dumping groups from action profile {} from {} (pipeconf {})...",
633 piActionProfileId.id(), deviceId, pipeconf.id());
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200634
635 final P4InfoBrowser browser = PipeconfHelper.getP4InfoBrowser(pipeconf);
Yi Tseng82512da2017-08-16 19:46:36 -0700636 if (browser == null) {
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200637 log.warn("Unable to get a P4Info browser for pipeconf {}, aborting dump action profile", pipeconf);
Yi Tseng82512da2017-08-16 19:46:36 -0700638 return Collections.emptySet();
639 }
640
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200641 final int actionProfileId;
Yi Tseng82512da2017-08-16 19:46:36 -0700642 try {
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200643 actionProfileId = browser
644 .actionProfiles()
Carmelo Casconecb0a49c2017-10-03 14:32:23 +0200645 .getByName(piActionProfileId.id())
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200646 .getPreamble()
647 .getId();
Yi Tseng82512da2017-08-16 19:46:36 -0700648 } catch (P4InfoBrowser.NotFoundException e) {
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200649 log.warn("Unable to dump groups: {}", e.getMessage());
Yi Tseng82512da2017-08-16 19:46:36 -0700650 return Collections.emptySet();
651 }
652
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200653 // Prepare read request to read all groups from the given action profile.
654 final ReadRequest groupRequestMsg = ReadRequest.newBuilder()
Yi Tseng82512da2017-08-16 19:46:36 -0700655 .setDeviceId(p4DeviceId)
656 .addEntities(Entity.newBuilder()
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200657 .setActionProfileGroup(
658 ActionProfileGroup.newBuilder()
659 .setActionProfileId(actionProfileId)
660 .build())
Yi Tseng82512da2017-08-16 19:46:36 -0700661 .build())
662 .build();
663
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200664 // Read groups.
665 final Iterator<ReadResponse> groupResponses;
Yi Tseng82512da2017-08-16 19:46:36 -0700666 try {
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200667 groupResponses = blockingStub.read(groupRequestMsg);
Yi Tseng82512da2017-08-16 19:46:36 -0700668 } catch (StatusRuntimeException e) {
Carmelo Cascone5bc7e102018-02-18 18:27:55 -0800669 log.warn("Unable to dump action profile {} from {}: {}", piActionProfileId, deviceId, e.getMessage());
Yi Tseng82512da2017-08-16 19:46:36 -0700670 return Collections.emptySet();
671 }
672
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200673 final List<ActionProfileGroup> groupMsgs = Tools.stream(() -> groupResponses)
674 .map(ReadResponse::getEntitiesList)
675 .flatMap(List::stream)
676 .filter(entity -> entity.getEntityCase() == ACTION_PROFILE_GROUP)
677 .map(Entity::getActionProfileGroup)
678 .collect(Collectors.toList());
Yi Tseng82512da2017-08-16 19:46:36 -0700679
680 log.debug("Retrieved {} groups from action profile {} on {}...",
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200681 groupMsgs.size(), piActionProfileId.id(), deviceId);
Yi Tseng82512da2017-08-16 19:46:36 -0700682
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200683 // Returned groups contain only a minimal description of their members.
684 // We need to issue a new request to get the full description of each member.
Yi Tseng82512da2017-08-16 19:46:36 -0700685
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200686 // Keep a map of all member IDs for each group ID, will need it later.
687 final Multimap<Integer, Integer> groupIdToMemberIdsMap = HashMultimap.create();
688 groupMsgs.forEach(g -> groupIdToMemberIdsMap.putAll(
689 g.getGroupId(),
690 g.getMembersList().stream()
691 .map(ActionProfileGroup.Member::getMemberId)
692 .collect(Collectors.toList())));
Yi Tseng82512da2017-08-16 19:46:36 -0700693
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200694 // Prepare one big read request to read all members in one shot.
695 final Set<Entity> entityMsgs = groupMsgs.stream()
696 .flatMap(g -> g.getMembersList().stream())
697 .map(ActionProfileGroup.Member::getMemberId)
698 // Prevent issuing many read requests for the same member.
699 .distinct()
700 .map(id -> ActionProfileMember.newBuilder()
701 .setActionProfileId(actionProfileId)
702 .setMemberId(id)
703 .build())
704 .map(m -> Entity.newBuilder()
705 .setActionProfileMember(m)
706 .build())
707 .collect(Collectors.toSet());
708 final ReadRequest memberRequestMsg = ReadRequest.newBuilder().setDeviceId(p4DeviceId)
709 .addAllEntities(entityMsgs)
710 .build();
Yi Tseng82512da2017-08-16 19:46:36 -0700711
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200712 // Read members.
713 final Iterator<ReadResponse> memberResponses;
714 try {
715 memberResponses = blockingStub.read(memberRequestMsg);
716 } catch (StatusRuntimeException e) {
Carmelo Cascone5bc7e102018-02-18 18:27:55 -0800717 log.warn("Unable to read members of action profile {} from {}: {}",
718 piActionProfileId, deviceId, e.getMessage());
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200719 return Collections.emptyList();
Yi Tseng82512da2017-08-16 19:46:36 -0700720 }
721
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200722 final Multimap<Integer, ActionProfileMember> groupIdToMembersMap = HashMultimap.create();
723 Tools.stream(() -> memberResponses)
724 .map(ReadResponse::getEntitiesList)
725 .flatMap(List::stream)
726 .filter(e -> e.getEntityCase() == ACTION_PROFILE_MEMBER)
727 .map(Entity::getActionProfileMember)
728 .forEach(member -> groupIdToMemberIdsMap.asMap()
729 // Get all group IDs that contain this member.
730 .entrySet()
731 .stream()
732 .filter(entry -> entry.getValue().contains(member.getMemberId()))
733 .map(Map.Entry::getKey)
734 .forEach(gid -> groupIdToMembersMap.put(gid, member)));
735
736 log.debug("Retrieved {} group members from action profile {} on {}...",
737 groupIdToMembersMap.size(), piActionProfileId.id(), deviceId);
738
739 return groupMsgs.stream()
740 .map(groupMsg -> {
741 try {
742 return ActionProfileGroupEncoder.decode(groupMsg,
743 groupIdToMembersMap.get(groupMsg.getGroupId()),
744 pipeconf);
745 } catch (P4InfoBrowser.NotFoundException | EncodeException e) {
746 log.warn("Unable to decode group: {}\n {}", e.getMessage(), groupMsg);
747 return null;
748 }
749 })
750 .filter(Objects::nonNull)
751 .collect(Collectors.toList());
Yi Tseng82512da2017-08-16 19:46:36 -0700752 }
753
754 private boolean doWriteActionGroup(PiActionGroup group, WriteOperationType opType, PiPipeconf pipeconf) {
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200755 final ActionProfileGroup actionProfileGroup;
Yi Tseng82512da2017-08-16 19:46:36 -0700756 try {
757 actionProfileGroup = ActionProfileGroupEncoder.encode(group, pipeconf);
758 } catch (EncodeException | P4InfoBrowser.NotFoundException e) {
Carmelo Cascone5bc7e102018-02-18 18:27:55 -0800759 log.warn("Unable to encode group, aborting {} operation: {}", e.getMessage(), opType.name());
Yi Tseng82512da2017-08-16 19:46:36 -0700760 return false;
761 }
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200762
Carmelo Cascone58136812018-07-19 03:40:16 +0200763 final Update updateMsg = Update.newBuilder()
764 .setEntity(Entity.newBuilder()
765 .setActionProfileGroup(actionProfileGroup)
766 .build())
767 .setType(UPDATE_TYPES.get(opType))
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200768 .build();
Carmelo Cascone58136812018-07-19 03:40:16 +0200769
770 return write(Collections.singleton(updateMsg), Collections.singleton(group),
771 opType, "group");
Yi Tseng82512da2017-08-16 19:46:36 -0700772 }
773
Carmelo Cascone81929aa2018-04-07 01:38:55 -0700774 private Collection<PiMeterCellConfig> doReadAllMeterCells(
775 Collection<PiMeterId> meterIds, PiPipeconf pipeconf) {
776 return doReadMeterEntities(MeterEntryCodec.readAllCellsEntities(
777 meterIds, pipeconf), pipeconf);
778 }
Frank Wangd7e3b4b2017-09-24 13:37:54 +0900779
Carmelo Cascone81929aa2018-04-07 01:38:55 -0700780 private Collection<PiMeterCellConfig> doReadMeterCells(
781 Collection<PiMeterCellId> cellIds, PiPipeconf pipeconf) {
782
783 final Collection<PiMeterCellConfig> piMeterCellConfigs = cellIds.stream()
Frank Wangd7e3b4b2017-09-24 13:37:54 +0900784 .map(cellId -> PiMeterCellConfig.builder()
Carmelo Cascone81929aa2018-04-07 01:38:55 -0700785 .withMeterCellId(cellId)
786 .build())
Frank Wangd7e3b4b2017-09-24 13:37:54 +0900787 .collect(Collectors.toList());
788
Carmelo Cascone81929aa2018-04-07 01:38:55 -0700789 return doReadMeterEntities(MeterEntryCodec.encodePiMeterCellConfigs(
790 piMeterCellConfigs, pipeconf), pipeconf);
791 }
792
793 private Collection<PiMeterCellConfig> doReadMeterEntities(
794 Collection<Entity> entitiesToRead, PiPipeconf pipeconf) {
795
796 if (entitiesToRead.size() == 0) {
797 return Collections.emptyList();
798 }
799
Frank Wangd7e3b4b2017-09-24 13:37:54 +0900800 final ReadRequest request = ReadRequest.newBuilder()
801 .setDeviceId(p4DeviceId)
Carmelo Cascone81929aa2018-04-07 01:38:55 -0700802 .addAllEntities(entitiesToRead)
Frank Wangd7e3b4b2017-09-24 13:37:54 +0900803 .build();
804
Frank Wangd7e3b4b2017-09-24 13:37:54 +0900805 final Iterable<ReadResponse> responses;
806 try {
807 responses = () -> blockingStub.read(request);
808 } catch (StatusRuntimeException e) {
Carmelo Cascone81929aa2018-04-07 01:38:55 -0700809 log.warn("Unable to read meter cells: {}", e.getMessage());
Frank Wangd7e3b4b2017-09-24 13:37:54 +0900810 log.debug("exception", e);
811 return Collections.emptyList();
812 }
813
Carmelo Cascone81929aa2018-04-07 01:38:55 -0700814 List<Entity> responseEntities = StreamSupport
815 .stream(responses.spliterator(), false)
Frank Wangd7e3b4b2017-09-24 13:37:54 +0900816 .map(ReadResponse::getEntitiesList)
817 .flatMap(List::stream)
818 .collect(Collectors.toList());
819
Carmelo Cascone81929aa2018-04-07 01:38:55 -0700820 return MeterEntryCodec.decodeMeterEntities(responseEntities, pipeconf);
Frank Wangd7e3b4b2017-09-24 13:37:54 +0900821 }
822
Carmelo Cascone58136812018-07-19 03:40:16 +0200823 private boolean doWriteMeterCells(Collection<PiMeterCellConfig> cellConfigs, PiPipeconf pipeconf) {
Frank Wangd7e3b4b2017-09-24 13:37:54 +0900824
Carmelo Cascone58136812018-07-19 03:40:16 +0200825 Collection<Update> updateMsgs = MeterEntryCodec.encodePiMeterCellConfigs(cellConfigs, pipeconf)
Frank Wangd7e3b4b2017-09-24 13:37:54 +0900826 .stream()
827 .map(meterEntryMsg ->
828 Update.newBuilder()
829 .setEntity(meterEntryMsg)
830 .setType(UPDATE_TYPES.get(WriteOperationType.MODIFY))
831 .build())
832 .collect(Collectors.toList());
833
834 if (updateMsgs.size() == 0) {
835 return true;
836 }
837
Carmelo Cascone58136812018-07-19 03:40:16 +0200838 return write(updateMsgs, cellConfigs, WriteOperationType.MODIFY, "meter cell config");
839 }
840
841 private boolean doWriteMulticastGroupEntries(
842 Collection<PiMulticastGroupEntry> entries,
843 WriteOperationType opType) {
844
845 final List<Update> updateMsgs = entries.stream()
846 .map(MulticastGroupEntryCodec::encode)
847 .map(mcMsg -> PacketReplicationEngineEntry.newBuilder()
848 .setMulticastGroupEntry(mcMsg)
849 .build())
850 .map(preMsg -> Entity.newBuilder()
851 .setPacketReplicationEngineEntry(preMsg)
852 .build())
853 .map(entityMsg -> Update.newBuilder()
854 .setEntity(entityMsg)
855 .setType(UPDATE_TYPES.get(opType))
856 .build())
857 .collect(Collectors.toList());
858 return write(updateMsgs, entries, opType, "multicast group entry");
859 }
860
861 private Collection<PiMulticastGroupEntry> doReadAllMulticastGroupEntries() {
862
863 final Entity entity = Entity.newBuilder()
864 .setPacketReplicationEngineEntry(
865 PacketReplicationEngineEntry.newBuilder()
866 .setMulticastGroupEntry(
867 MulticastGroupEntry.newBuilder()
868 .build())
869 .build())
870 .build();
871
872 final ReadRequest req = ReadRequest.newBuilder()
873 .setDeviceId(p4DeviceId)
874 .addEntities(entity)
875 .build();
876
877 Iterator<ReadResponse> responses;
878 try {
879 responses = blockingStub.read(req);
880 } catch (StatusRuntimeException e) {
881 log.warn("Unable to read multicast group entries from {}: {}", deviceId, e.getMessage());
882 return Collections.emptyList();
883 }
884
885 Iterable<ReadResponse> responseIterable = () -> responses;
886 final List<PiMulticastGroupEntry> mcEntries = StreamSupport
887 .stream(responseIterable.spliterator(), false)
888 .map(ReadResponse::getEntitiesList)
889 .flatMap(List::stream)
890 .filter(e -> e.getEntityCase()
891 .equals(PACKET_REPLICATION_ENGINE_ENTRY))
892 .map(Entity::getPacketReplicationEngineEntry)
893 .filter(e -> e.getTypeCase().equals(MULTICAST_GROUP_ENTRY))
894 .map(PacketReplicationEngineEntry::getMulticastGroupEntry)
895 .map(MulticastGroupEntryCodec::decode)
896 .collect(Collectors.toList());
897
898 log.debug("Retrieved {} multicast group entries from {}...",
899 mcEntries.size(), deviceId);
900
901 return mcEntries;
902 }
903
904 private <E extends PiEntity> boolean write(Collection<Update> updates,
905 Collection<E> writeEntities,
906 WriteOperationType opType,
907 String entryType) {
908 try {
909 blockingStub.write(writeRequest(updates));
910 return true;
911 } catch (StatusRuntimeException e) {
912 checkAndLogWriteErrors(writeEntities, e, opType, entryType);
913 return false;
914 }
915 }
916
917 private WriteRequest writeRequest(Iterable<Update> updateMsgs) {
918 return WriteRequest.newBuilder()
Frank Wangd7e3b4b2017-09-24 13:37:54 +0900919 .setDeviceId(p4DeviceId)
Carmelo Casconee5b28722018-06-22 17:28:28 +0200920 .setElectionId(clientElectionId)
Frank Wangd7e3b4b2017-09-24 13:37:54 +0900921 .addAllUpdates(updateMsgs)
922 .build();
Frank Wangd7e3b4b2017-09-24 13:37:54 +0900923 }
924
Carmelo Casconee5b28722018-06-22 17:28:28 +0200925 private Void doShutdown() {
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400926 log.info("Shutting down client for {}...", deviceId);
Carmelo Casconee5b28722018-06-22 17:28:28 +0200927 if (streamRequestObserver != null) {
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400928 try {
Carmelo Casconee5b28722018-06-22 17:28:28 +0200929 streamRequestObserver.onCompleted();
930 } catch (IllegalStateException e) {
931 // Thrown if stream channel is already completed. Can ignore.
932 log.debug("Ignored expection: {}", e);
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400933 }
Carmelo Casconee5b28722018-06-22 17:28:28 +0200934 cancellableContext.cancel(new InterruptedException(
935 "Requested client shutdown"));
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400936 }
Carmelo Casconee5b28722018-06-22 17:28:28 +0200937 this.executorService.shutdown();
938 try {
939 executorService.awaitTermination(5, TimeUnit.SECONDS);
940 } catch (InterruptedException e) {
941 log.warn("Executor service didn't shutdown in time.");
942 Thread.currentThread().interrupt();
943 }
944 return null;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400945 }
946
Carmelo Casconee5b28722018-06-22 17:28:28 +0200947 private <E extends PiEntity> void checkAndLogWriteErrors(
948 Collection<E> writeEntities, StatusRuntimeException ex,
949 WriteOperationType opType, String entryType) {
950
951 checkGrpcException(ex);
952
Carmelo Cascone5bc7e102018-02-18 18:27:55 -0800953 List<P4RuntimeOuterClass.Error> errors = null;
954 String description = null;
955 try {
956 errors = extractWriteErrorDetails(ex);
957 } catch (InvalidProtocolBufferException e) {
958 description = ex.getStatus().getDescription();
959 }
960
961 log.warn("Unable to {} {} {}(s) on {}: {}{} (detailed errors might be logged below)",
962 opType.name(), writeEntities.size(), entryType, deviceId,
963 ex.getStatus().getCode().name(),
964 description == null ? "" : " - " + description);
965
966 if (errors == null || errors.isEmpty()) {
967 return;
968 }
969
970 // FIXME: we are assuming entities is an ordered collection, e.g. a list,
971 // and that errors are reported in the same order as the corresponding
972 // written entity. Write RPC methods should be refactored to accept an
973 // order list of entities, instead of a collection.
974 if (errors.size() == writeEntities.size()) {
975 Iterator<E> entityIterator = writeEntities.iterator();
976 errors.stream()
977 .map(e -> ImmutablePair.of(e, entityIterator.next()))
978 .filter(p -> p.left.getCanonicalCode() != Status.OK.getCode().value())
979 .forEach(p -> log.warn("Unable to {} {}: {} [{}]",
980 opType.name(), entryType, parseP4Error(p.getLeft()),
981 p.getRight().toString()));
982 } else {
983 log.error("Unable to reconcile error details to updates " +
984 "(sent {} updates, but device returned {} errors)",
985 entryType, writeEntities.size(), errors.size());
986 errors.stream()
987 .filter(err -> err.getCanonicalCode() != Status.OK.getCode().value())
988 .forEach(err -> log.warn("Unable to {} {} (unknown): {}",
Carmelo Cascone58136812018-07-19 03:40:16 +0200989 opType.name(), entryType, parseP4Error(err)));
Carmelo Cascone5bc7e102018-02-18 18:27:55 -0800990 }
991 }
992
993 private List<P4RuntimeOuterClass.Error> extractWriteErrorDetails(
994 StatusRuntimeException ex) throws InvalidProtocolBufferException {
995 String statusString = ex.getStatus().getDescription();
996 if (statusString == null) {
997 return Collections.emptyList();
998 }
999 com.google.rpc.Status status = com.google.rpc.Status
1000 .parseFrom(statusString.getBytes());
1001 return status.getDetailsList().stream()
1002 .map(any -> {
1003 try {
1004 return any.unpack(P4RuntimeOuterClass.Error.class);
1005 } catch (InvalidProtocolBufferException e) {
1006 log.warn("Unable to unpack P4Runtime Error: {}",
1007 any.toString());
1008 return null;
1009 }
1010 })
1011 .filter(Objects::nonNull)
1012 .collect(Collectors.toList());
1013
1014 }
1015
1016 private String parseP4Error(P4RuntimeOuterClass.Error err) {
1017 return format("%s %s (%s code %d)%s",
1018 Status.fromCodeValue(err.getCanonicalCode()),
1019 err.getMessage(),
1020 err.getSpace(),
1021 err.getCode(),
1022 err.hasDetails() ? "\n" + err.getDetails().toString() : "");
1023 }
1024
Carmelo Casconee5b28722018-06-22 17:28:28 +02001025 private void checkGrpcException(StatusRuntimeException ex) {
1026 switch (ex.getStatus().getCode()) {
1027 case OK:
1028 break;
1029 case CANCELLED:
1030 break;
1031 case UNKNOWN:
1032 break;
1033 case INVALID_ARGUMENT:
1034 break;
1035 case DEADLINE_EXCEEDED:
1036 break;
1037 case NOT_FOUND:
1038 break;
1039 case ALREADY_EXISTS:
1040 break;
1041 case PERMISSION_DENIED:
1042 // Notify upper layers that this node is not master.
1043 controller.postEvent(new P4RuntimeEvent(
1044 P4RuntimeEvent.Type.ARBITRATION_RESPONSE,
1045 new ArbitrationResponse(deviceId, false)));
1046 break;
1047 case RESOURCE_EXHAUSTED:
1048 break;
1049 case FAILED_PRECONDITION:
1050 break;
1051 case ABORTED:
1052 break;
1053 case OUT_OF_RANGE:
1054 break;
1055 case UNIMPLEMENTED:
1056 break;
1057 case INTERNAL:
1058 break;
1059 case UNAVAILABLE:
1060 // Channel might be closed.
1061 controller.postEvent(new P4RuntimeEvent(
1062 P4RuntimeEvent.Type.CHANNEL_EVENT,
1063 new ChannelEvent(deviceId, ChannelEvent.Type.ERROR)));
1064 break;
1065 case DATA_LOSS:
1066 break;
1067 case UNAUTHENTICATED:
1068 break;
1069 default:
1070 break;
1071 }
1072 }
1073
1074 private Uint128 bigIntegerToUint128(BigInteger value) {
1075 final byte[] arr = value.toByteArray();
1076 final ByteBuffer bb = ByteBuffer.allocate(Long.BYTES * 2)
1077 .put(new byte[Long.BYTES * 2 - arr.length])
1078 .put(arr);
1079 bb.rewind();
1080 return Uint128.newBuilder()
1081 .setHigh(bb.getLong())
1082 .setLow(bb.getLong())
1083 .build();
1084 }
1085
1086 private BigInteger uint128ToBigInteger(Uint128 value) {
1087 return new BigInteger(
1088 ByteBuffer.allocate(Long.BYTES * 2)
1089 .putLong(value.getHigh())
1090 .putLong(value.getLow())
1091 .array());
1092 }
1093
Carmelo Cascone8d99b172017-07-18 17:26:31 -04001094 /**
1095 * Handles messages received from the device on the stream channel.
1096 */
Carmelo Casconee5b28722018-06-22 17:28:28 +02001097 private class StreamChannelResponseObserver
1098 implements StreamObserver<StreamMessageResponse> {
Carmelo Casconef7aa3f92017-07-06 23:56:50 -04001099
1100 @Override
1101 public void onNext(StreamMessageResponse message) {
Carmelo Cascone8d99b172017-07-18 17:26:31 -04001102 executorService.submit(() -> doNext(message));
1103 }
Carmelo Casconef7aa3f92017-07-06 23:56:50 -04001104
Carmelo Cascone8d99b172017-07-18 17:26:31 -04001105 private void doNext(StreamMessageResponse message) {
Carmelo Casconea966c342017-07-30 01:56:30 -04001106 try {
Carmelo Casconee5b28722018-06-22 17:28:28 +02001107 log.debug("Received message on stream channel from {}: {}",
1108 deviceId, message.getUpdateCase());
Carmelo Casconea966c342017-07-30 01:56:30 -04001109 switch (message.getUpdateCase()) {
1110 case PACKET:
Carmelo Casconea966c342017-07-30 01:56:30 -04001111 doPacketIn(message.getPacket());
Andrea Campanella288b2732017-07-28 14:16:16 +02001112 return;
Carmelo Casconea966c342017-07-30 01:56:30 -04001113 case ARBITRATION:
Carmelo Casconee5b28722018-06-22 17:28:28 +02001114 doArbitrationResponse(message.getArbitration());
Carmelo Casconea966c342017-07-30 01:56:30 -04001115 return;
1116 default:
Carmelo Casconee5b28722018-06-22 17:28:28 +02001117 log.warn("Unrecognized stream message from {}: {}",
1118 deviceId, message.getUpdateCase());
Carmelo Casconea966c342017-07-30 01:56:30 -04001119 }
1120 } catch (Throwable ex) {
Carmelo Casconee5b28722018-06-22 17:28:28 +02001121 log.error("Exception while processing stream message from {}",
1122 deviceId, ex);
Carmelo Casconef7aa3f92017-07-06 23:56:50 -04001123 }
Carmelo Casconef7aa3f92017-07-06 23:56:50 -04001124 }
1125
1126 @Override
1127 public void onError(Throwable throwable) {
Carmelo Casconee5b28722018-06-22 17:28:28 +02001128 log.warn("Error on stream channel for {}: {}",
1129 deviceId, Status.fromThrowable(throwable));
1130 controller.postEvent(new P4RuntimeEvent(
1131 P4RuntimeEvent.Type.CHANNEL_EVENT,
1132 new ChannelEvent(deviceId, ChannelEvent.Type.ERROR)));
Carmelo Casconef7aa3f92017-07-06 23:56:50 -04001133 }
1134
1135 @Override
1136 public void onCompleted() {
Carmelo Cascone59f57de2017-07-11 19:55:09 -04001137 log.warn("Stream channel for {} has completed", deviceId);
Carmelo Casconee5b28722018-06-22 17:28:28 +02001138 controller.postEvent(new P4RuntimeEvent(
1139 P4RuntimeEvent.Type.CHANNEL_EVENT,
1140 new ChannelEvent(deviceId, ChannelEvent.Type.CLOSED)));
Carmelo Casconef7aa3f92017-07-06 23:56:50 -04001141 }
1142 }
Carmelo Cascone87892e22017-11-13 16:01:29 -08001143}