blob: 905a690b608ce48516539a0d8ef573213d182724 [file] [log] [blame]
Carmelo Casconef7aa3f92017-07-06 23:56:50 -04001/*
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2017-present Open Networking Foundation
Carmelo Casconef7aa3f92017-07-06 23:56:50 -04003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.p4runtime.ctl;
18
Carmelo Cascone87b9b392017-10-02 18:33:20 +020019import com.google.common.collect.HashMultimap;
Carmelo Cascone8d99b172017-07-18 17:26:31 -040020import com.google.common.collect.ImmutableMap;
Yi Tseng82512da2017-08-16 19:46:36 -070021import com.google.common.collect.Lists;
Carmelo Casconeb045ddc2017-09-01 01:26:35 +020022import com.google.common.collect.Maps;
Yi Tseng82512da2017-08-16 19:46:36 -070023import com.google.common.collect.Multimap;
Carmelo Cascone7f75be42017-09-07 14:37:02 +020024import com.google.common.collect.Sets;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040025import com.google.protobuf.ByteString;
Yi Tsenge67e1412018-01-31 17:35:20 -080026import com.google.protobuf.InvalidProtocolBufferException;
Carmelo Cascone59f57de2017-07-11 19:55:09 -040027import io.grpc.Context;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040028import io.grpc.ManagedChannel;
29import io.grpc.Status;
30import io.grpc.StatusRuntimeException;
31import io.grpc.stub.StreamObserver;
Andrea Campanella288b2732017-07-28 14:16:16 +020032import org.onlab.osgi.DefaultServiceDirectory;
Yi Tseng82512da2017-08-16 19:46:36 -070033import org.onlab.util.Tools;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040034import org.onosproject.net.DeviceId;
Yi Tseng3e7f1452017-10-20 10:31:53 -070035import org.onosproject.net.MastershipRole;
Carmelo Cascone87892e22017-11-13 16:01:29 -080036import org.onosproject.net.pi.model.PiActionProfileId;
37import org.onosproject.net.pi.model.PiCounterId;
38import org.onosproject.net.pi.model.PiCounterType;
Andrea Campanella432f7182017-07-14 18:43:27 +020039import org.onosproject.net.pi.model.PiPipeconf;
Carmelo Cascone87892e22017-11-13 16:01:29 -080040import org.onosproject.net.pi.model.PiTableId;
Carmelo Cascone87b9b392017-10-02 18:33:20 +020041import org.onosproject.net.pi.runtime.PiActionGroup;
42import org.onosproject.net.pi.runtime.PiActionGroupMember;
Carmelo Casconeb045ddc2017-09-01 01:26:35 +020043import org.onosproject.net.pi.runtime.PiCounterCellData;
44import org.onosproject.net.pi.runtime.PiCounterCellId;
Andrea Campanella432f7182017-07-14 18:43:27 +020045import org.onosproject.net.pi.runtime.PiPacketOperation;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040046import org.onosproject.net.pi.runtime.PiTableEntry;
Yi Tsenge67e1412018-01-31 17:35:20 -080047import org.onosproject.net.pi.service.PiPipeconfService;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040048import org.onosproject.p4runtime.api.P4RuntimeClient;
49import org.onosproject.p4runtime.api.P4RuntimeEvent;
50import org.slf4j.Logger;
51import p4.P4RuntimeGrpc;
Yi Tseng82512da2017-08-16 19:46:36 -070052import p4.P4RuntimeOuterClass.ActionProfileGroup;
53import p4.P4RuntimeOuterClass.ActionProfileMember;
Carmelo Cascone8d99b172017-07-18 17:26:31 -040054import p4.P4RuntimeOuterClass.Entity;
55import p4.P4RuntimeOuterClass.ForwardingPipelineConfig;
56import p4.P4RuntimeOuterClass.MasterArbitrationUpdate;
57import p4.P4RuntimeOuterClass.PacketIn;
58import p4.P4RuntimeOuterClass.ReadRequest;
59import p4.P4RuntimeOuterClass.ReadResponse;
60import p4.P4RuntimeOuterClass.SetForwardingPipelineConfigRequest;
61import p4.P4RuntimeOuterClass.StreamMessageRequest;
62import p4.P4RuntimeOuterClass.StreamMessageResponse;
63import p4.P4RuntimeOuterClass.TableEntry;
Yi Tseng3e7f1452017-10-20 10:31:53 -070064import p4.P4RuntimeOuterClass.Uint128;
Carmelo Cascone8d99b172017-07-18 17:26:31 -040065import p4.P4RuntimeOuterClass.Update;
66import p4.P4RuntimeOuterClass.WriteRequest;
Andrea Campanellafc1d34c2017-07-18 17:01:41 +020067import p4.config.P4InfoOuterClass.P4Info;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040068import p4.tmp.P4Config;
69
Carmelo Casconed61fdb32017-10-30 10:09:57 -070070import java.nio.ByteBuffer;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040071import java.util.Collection;
Carmelo Cascone8d99b172017-07-18 17:26:31 -040072import java.util.Collections;
73import java.util.Iterator;
74import java.util.List;
75import java.util.Map;
Carmelo Cascone87b9b392017-10-02 18:33:20 +020076import java.util.Objects;
Carmelo Casconeb045ddc2017-09-01 01:26:35 +020077import java.util.Set;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040078import java.util.concurrent.CompletableFuture;
Yi Tseng3e7f1452017-10-20 10:31:53 -070079import java.util.concurrent.ExecutionException;
Carmelo Cascone8d99b172017-07-18 17:26:31 -040080import java.util.concurrent.Executor;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040081import java.util.concurrent.ExecutorService;
Carmelo Cascone8d99b172017-07-18 17:26:31 -040082import java.util.concurrent.Executors;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040083import java.util.concurrent.TimeUnit;
Carmelo Cascone8d99b172017-07-18 17:26:31 -040084import java.util.concurrent.locks.Lock;
85import java.util.concurrent.locks.ReentrantLock;
86import java.util.function.Supplier;
87import java.util.stream.Collectors;
88import java.util.stream.StreamSupport;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040089
Carmelo Casconed61fdb32017-10-30 10:09:57 -070090import static com.google.common.base.Preconditions.checkNotNull;
Carmelo Cascone8d99b172017-07-18 17:26:31 -040091import static org.onlab.util.Tools.groupedThreads;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040092import static org.slf4j.LoggerFactory.getLogger;
Carmelo Casconed61fdb32017-10-30 10:09:57 -070093import static p4.P4RuntimeOuterClass.Entity.EntityCase.ACTION_PROFILE_GROUP;
94import static p4.P4RuntimeOuterClass.Entity.EntityCase.ACTION_PROFILE_MEMBER;
95import static p4.P4RuntimeOuterClass.Entity.EntityCase.TABLE_ENTRY;
Andrea Campanellafc1d34c2017-07-18 17:01:41 +020096import static p4.P4RuntimeOuterClass.PacketOut;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040097import static p4.P4RuntimeOuterClass.SetForwardingPipelineConfigRequest.Action.VERIFY_AND_COMMIT;
98
99/**
100 * Implementation of a P4Runtime client.
101 */
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400102public final class P4RuntimeClientImpl implements P4RuntimeClient {
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400103
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400104 private static final Map<WriteOperationType, Update.Type> UPDATE_TYPES = ImmutableMap.of(
105 WriteOperationType.UNSPECIFIED, Update.Type.UNSPECIFIED,
106 WriteOperationType.INSERT, Update.Type.INSERT,
107 WriteOperationType.MODIFY, Update.Type.MODIFY,
108 WriteOperationType.DELETE, Update.Type.DELETE
109 );
110
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400111 private final Logger log = getLogger(getClass());
112
113 private final DeviceId deviceId;
Carmelo Casconef423bec2017-08-30 01:56:25 +0200114 private final long p4DeviceId;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400115 private final P4RuntimeControllerImpl controller;
116 private final P4RuntimeGrpc.P4RuntimeBlockingStub blockingStub;
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400117 private final Context.CancellableContext cancellableContext;
118 private final ExecutorService executorService;
119 private final Executor contextExecutor;
120 private final Lock writeLock = new ReentrantLock();
121 private final StreamObserver<StreamMessageRequest> streamRequestObserver;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400122
Yi Tseng3e7f1452017-10-20 10:31:53 -0700123 private Map<Uint128, CompletableFuture<Boolean>> arbitrationUpdateMap = Maps.newConcurrentMap();
124 protected Uint128 p4RuntimeElectionId;
125
Yi Tseng82512da2017-08-16 19:46:36 -0700126 /**
127 * Default constructor.
128 *
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200129 * @param deviceId the ONOS device id
Yi Tseng82512da2017-08-16 19:46:36 -0700130 * @param p4DeviceId the P4 device id
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200131 * @param channel gRPC channel
Yi Tseng82512da2017-08-16 19:46:36 -0700132 * @param controller runtime client controller
133 */
Carmelo Casconef423bec2017-08-30 01:56:25 +0200134 P4RuntimeClientImpl(DeviceId deviceId, long p4DeviceId, ManagedChannel channel,
135 P4RuntimeControllerImpl controller) {
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400136 this.deviceId = deviceId;
137 this.p4DeviceId = p4DeviceId;
138 this.controller = controller;
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400139 this.cancellableContext = Context.current().withCancellation();
Carmelo Casconea966c342017-07-30 01:56:30 -0400140 this.executorService = Executors.newFixedThreadPool(15, groupedThreads(
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400141 "onos/p4runtime-client-" + deviceId.toString(),
142 deviceId.toString() + "-%d"));
143 this.contextExecutor = this.cancellableContext.fixedContextExecutor(executorService);
Carmelo Cascone2cad9ef2017-08-01 21:52:07 +0200144 //TODO Investigate deadline or timeout in supplyInContext Method
145 this.blockingStub = P4RuntimeGrpc.newBlockingStub(channel);
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400146 P4RuntimeGrpc.P4RuntimeStub asyncStub = P4RuntimeGrpc.newStub(channel);
147 this.streamRequestObserver = asyncStub.streamChannel(new StreamChannelResponseObserver());
148 }
149
150 /**
151 * Executes the given task (supplier) in the gRPC context executor of this client, such that if the context is
152 * cancelled (e.g. client shutdown) the RPC is automatically cancelled.
153 * <p>
154 * Important: Tasks submitted in parallel by different threads are forced executed sequentially.
155 * <p>
156 */
Carmelo Cascone2cad9ef2017-08-01 21:52:07 +0200157 private <U> CompletableFuture<U> supplyInContext(Supplier<U> supplier, String opDescription) {
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400158 return CompletableFuture.supplyAsync(() -> {
159 // TODO: explore a more relaxed locking strategy.
160 writeLock.lock();
161 try {
162 return supplier.get();
Yi Tsenge67e1412018-01-31 17:35:20 -0800163 } catch (StatusRuntimeException ex) {
164 logP4RuntimeErrorStatus(ex, opDescription);
165 throw ex;
Carmelo Casconea966c342017-07-30 01:56:30 -0400166 } catch (Throwable ex) {
Yi Tsenge67e1412018-01-31 17:35:20 -0800167 log.error("Exception in client of {}, executing {}", deviceId, opDescription, ex);
Carmelo Casconea966c342017-07-30 01:56:30 -0400168 throw ex;
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400169 } finally {
170 writeLock.unlock();
171 }
172 }, contextExecutor);
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400173 }
174
Yi Tsenge67e1412018-01-31 17:35:20 -0800175 private void logP4RuntimeErrorStatus(StatusRuntimeException ex, String description) {
176 String statusString = ex.getStatus().getDescription();
177 try {
178 com.google.rpc.Status status = com.google.rpc.Status.parseFrom(statusString.getBytes());
179 log.warn("{} failed on {} due to {}", description, deviceId, status.toString());
180 } catch (InvalidProtocolBufferException e) {
181 log.warn("{} failed on {} due to {}", description, deviceId, statusString);
182 } catch (NullPointerException e) {
183 log.warn("{} failed on {}", description, deviceId);
184 }
185 }
186
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400187 @Override
188 public CompletableFuture<Boolean> initStreamChannel() {
Carmelo Cascone2cad9ef2017-08-01 21:52:07 +0200189 return supplyInContext(this::doInitStreamChannel, "initStreamChannel");
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400190 }
191
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400192 @Override
Carmelo Casconed61fdb32017-10-30 10:09:57 -0700193 public CompletableFuture<Boolean> setPipelineConfig(PiPipeconf pipeconf, ByteBuffer deviceData) {
194 return supplyInContext(() -> doSetPipelineConfig(pipeconf, deviceData), "setPipelineConfig");
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400195 }
196
197 @Override
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400198 public CompletableFuture<Boolean> writeTableEntries(Collection<PiTableEntry> piTableEntries,
199 WriteOperationType opType, PiPipeconf pipeconf) {
Carmelo Cascone2cad9ef2017-08-01 21:52:07 +0200200 return supplyInContext(() -> doWriteTableEntries(piTableEntries, opType, pipeconf),
201 "writeTableEntries-" + opType.name());
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400202 }
203
204 @Override
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400205 public CompletableFuture<Collection<PiTableEntry>> dumpTable(PiTableId piTableId, PiPipeconf pipeconf) {
Carmelo Cascone2cad9ef2017-08-01 21:52:07 +0200206 return supplyInContext(() -> doDumpTable(piTableId, pipeconf), "dumpTable-" + piTableId);
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400207 }
208
209 @Override
Andrea Campanella432f7182017-07-14 18:43:27 +0200210 public CompletableFuture<Boolean> packetOut(PiPacketOperation packet, PiPipeconf pipeconf) {
Carmelo Cascone2cad9ef2017-08-01 21:52:07 +0200211 return supplyInContext(() -> doPacketOut(packet, pipeconf), "packetOut");
Andrea Campanella432f7182017-07-14 18:43:27 +0200212 }
213
Carmelo Casconeb045ddc2017-09-01 01:26:35 +0200214 @Override
215 public CompletableFuture<Collection<PiCounterCellData>> readCounterCells(Set<PiCounterCellId> cellIds,
216 PiPipeconf pipeconf) {
217 return supplyInContext(() -> doReadCounterCells(cellIds, pipeconf),
218 "readCounterCells-" + cellIds.hashCode());
219 }
220
221 @Override
222 public CompletableFuture<Collection<PiCounterCellData>> readAllCounterCells(Set<PiCounterId> counterIds,
223 PiPipeconf pipeconf) {
Carmelo Cascone7f75be42017-09-07 14:37:02 +0200224
225 /*
226 From p4runtime.proto, the scope of a ReadRequest is defined as follows:
227 CounterEntry:
228 - All counter cells for all meters if counter_id = 0 (default).
229 - All counter cells for given counter_id if index = 0 (default).
230 DirectCounterEntry:
231 - All counter cells for all meters if counter_id = 0 (default).
232 - All counter cells for given counter_id if table_entry.match is empty.
233 */
234
235 Set<PiCounterCellId> cellIds = Sets.newHashSet();
236
237 for (PiCounterId counterId : counterIds) {
Carmelo Cascone87892e22017-11-13 16:01:29 -0800238 if (!pipeconf.pipelineModel().counter(counterId).isPresent()) {
239 log.warn("Unable to find counter '{}' in pipeline model", counterId);
240 continue;
241 }
242 PiCounterType counterType = pipeconf.pipelineModel().counter(counterId).get().counterType();
243 switch (counterType) {
Carmelo Cascone7f75be42017-09-07 14:37:02 +0200244 case INDIRECT:
Carmelo Cascone87892e22017-11-13 16:01:29 -0800245 cellIds.add(PiCounterCellId.ofIndirect(counterId, 0));
Carmelo Cascone7f75be42017-09-07 14:37:02 +0200246 break;
247 case DIRECT:
Carmelo Cascone87892e22017-11-13 16:01:29 -0800248 cellIds.add(PiCounterCellId.ofDirect(counterId, PiTableEntry.EMTPY));
Carmelo Cascone7f75be42017-09-07 14:37:02 +0200249 break;
250 default:
Carmelo Cascone87892e22017-11-13 16:01:29 -0800251 log.warn("Unrecognized PI counter type '{}'", counterType);
Carmelo Cascone7f75be42017-09-07 14:37:02 +0200252 }
253 }
Carmelo Casconeb045ddc2017-09-01 01:26:35 +0200254
255 return supplyInContext(() -> doReadCounterCells(cellIds, pipeconf),
256 "readAllCounterCells-" + cellIds.hashCode());
257 }
258
Yi Tseng82512da2017-08-16 19:46:36 -0700259 @Override
260 public CompletableFuture<Boolean> writeActionGroupMembers(PiActionGroup group,
Yi Tseng82512da2017-08-16 19:46:36 -0700261 WriteOperationType opType,
262 PiPipeconf pipeconf) {
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200263 return supplyInContext(() -> doWriteActionGroupMembers(group, opType, pipeconf),
Yi Tseng82512da2017-08-16 19:46:36 -0700264 "writeActionGroupMembers-" + opType.name());
265 }
266
267 @Override
268 public CompletableFuture<Boolean> writeActionGroup(PiActionGroup group,
269 WriteOperationType opType,
270 PiPipeconf pipeconf) {
271 return supplyInContext(() -> doWriteActionGroup(group, opType, pipeconf),
272 "writeActionGroup-" + opType.name());
273 }
274
275 @Override
276 public CompletableFuture<Collection<PiActionGroup>> dumpGroups(PiActionProfileId actionProfileId,
277 PiPipeconf pipeconf) {
278 return supplyInContext(() -> doDumpGroups(actionProfileId, pipeconf),
279 "dumpGroups-" + actionProfileId.id());
280 }
281
Yi Tseng3e7f1452017-10-20 10:31:53 -0700282 @Override
283 public CompletableFuture<Boolean> sendMasterArbitrationUpdate() {
284 return supplyInContext(this::doArbitrationUpdate, "arbitrationUpdate");
285 }
286
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400287 /* Blocking method implementations below */
288
Yi Tseng3e7f1452017-10-20 10:31:53 -0700289 private boolean doArbitrationUpdate() {
290 CompletableFuture<Boolean> result = new CompletableFuture<>();
291 // TODO: currently we use 64-bit Long type for election id, should
292 // we use 128-bit ?
293 long nextElectId = controller.getNewMasterElectionId();
294 Uint128 newElectionId = Uint128.newBuilder()
295 .setLow(nextElectId)
296 .build();
297 MasterArbitrationUpdate arbitrationUpdate = MasterArbitrationUpdate.newBuilder()
298 .setDeviceId(p4DeviceId)
299 .setElectionId(newElectionId)
300 .build();
301 StreamMessageRequest requestMsg = StreamMessageRequest.newBuilder()
302 .setArbitration(arbitrationUpdate)
303 .build();
304 log.debug("Sending arbitration update to {} with election id {}...",
305 deviceId, newElectionId);
306 arbitrationUpdateMap.put(newElectionId, result);
307 try {
308 streamRequestObserver.onNext(requestMsg);
309 return result.get();
Yi Tsenge67e1412018-01-31 17:35:20 -0800310 } catch (StatusRuntimeException e) {
311 logP4RuntimeErrorStatus(e, "Arbitration update");
312 arbitrationUpdateMap.remove(newElectionId);
313 return false;
314 } catch (InterruptedException | ExecutionException e) {
Yi Tseng3e7f1452017-10-20 10:31:53 -0700315 log.warn("Arbitration update failed for {} due to {}", deviceId, e);
316 arbitrationUpdateMap.remove(newElectionId);
317 return false;
318 }
319 }
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400320 private boolean doInitStreamChannel() {
321 // To listen for packets and other events, we need to start the RPC.
322 // Here we do it by sending a master arbitration update.
Yi Tseng3e7f1452017-10-20 10:31:53 -0700323 return doArbitrationUpdate();
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400324 }
325
Carmelo Casconed61fdb32017-10-30 10:09:57 -0700326 private boolean doSetPipelineConfig(PiPipeconf pipeconf, ByteBuffer deviceData) {
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400327
Carmelo Casconed61fdb32017-10-30 10:09:57 -0700328 log.info("Setting pipeline config for {} to {}...", deviceId, pipeconf.id());
329
330 checkNotNull(deviceData, "deviceData cannot be null");
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400331
332 P4Info p4Info = PipeconfHelper.getP4Info(pipeconf);
333 if (p4Info == null) {
334 // Problem logged by PipeconfHelper.
335 return false;
336 }
337
Carmelo Casconed61fdb32017-10-30 10:09:57 -0700338 P4Config.P4DeviceConfig p4DeviceConfigMsg = P4Config.P4DeviceConfig
339 .newBuilder()
340 .setExtras(P4Config.P4DeviceConfig.Extras.getDefaultInstance())
341 .setReassign(true)
342 .setDeviceData(ByteString.copyFrom(deviceData))
343 .build();
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400344
Carmelo Casconed61fdb32017-10-30 10:09:57 -0700345 ForwardingPipelineConfig pipelineConfig = ForwardingPipelineConfig
Andrea Campanella0288c872017-08-07 18:32:51 +0200346 .newBuilder()
Carmelo Casconed61fdb32017-10-30 10:09:57 -0700347 .setP4Info(p4Info)
348 .setP4DeviceConfig(p4DeviceConfigMsg.toByteString())
349 .build();
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400350
351 SetForwardingPipelineConfigRequest request = SetForwardingPipelineConfigRequest
352 .newBuilder()
Andrea Campanella8bcd5862017-12-11 11:34:45 +0100353 .setDeviceId(p4DeviceId)
Yi Tseng3e7f1452017-10-20 10:31:53 -0700354 .setElectionId(p4RuntimeElectionId)
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400355 .setAction(VERIFY_AND_COMMIT)
Andrea Campanella8bcd5862017-12-11 11:34:45 +0100356 .setConfig(pipelineConfig)
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400357 .build();
358
359 try {
360 this.blockingStub.setForwardingPipelineConfig(request);
Carmelo Casconed61fdb32017-10-30 10:09:57 -0700361 return true;
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400362 } catch (StatusRuntimeException ex) {
Yi Tsenge67e1412018-01-31 17:35:20 -0800363 logP4RuntimeErrorStatus(ex, "Set pipeline config");
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400364 return false;
365 }
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400366 }
367
368 private boolean doWriteTableEntries(Collection<PiTableEntry> piTableEntries, WriteOperationType opType,
369 PiPipeconf pipeconf) {
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400370 WriteRequest.Builder writeRequestBuilder = WriteRequest.newBuilder();
371
372 Collection<Update> updateMsgs = TableEntryEncoder.encode(piTableEntries, pipeconf)
373 .stream()
374 .map(tableEntryMsg ->
375 Update.newBuilder()
376 .setEntity(Entity.newBuilder()
377 .setTableEntry(tableEntryMsg)
378 .build())
379 .setType(UPDATE_TYPES.get(opType))
380 .build())
381 .collect(Collectors.toList());
382
383 if (updateMsgs.size() == 0) {
384 return true;
385 }
386
387 writeRequestBuilder
388 .setDeviceId(p4DeviceId)
Yi Tseng3e7f1452017-10-20 10:31:53 -0700389 .setElectionId(p4RuntimeElectionId)
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400390 .addAllUpdates(updateMsgs)
391 .build();
392
393 try {
394 blockingStub.write(writeRequestBuilder.build());
395 return true;
396 } catch (StatusRuntimeException e) {
Yi Tsenge67e1412018-01-31 17:35:20 -0800397 logP4RuntimeErrorStatus(e, "Write table entries");
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400398 return false;
399 }
400 }
401
402 private Collection<PiTableEntry> doDumpTable(PiTableId piTableId, PiPipeconf pipeconf) {
403
Carmelo Cascone9f007702017-08-24 13:30:51 +0200404 log.debug("Dumping table {} from {} (pipeconf {})...", piTableId, deviceId, pipeconf.id());
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400405
406 P4InfoBrowser browser = PipeconfHelper.getP4InfoBrowser(pipeconf);
407 int tableId;
408 try {
Carmelo Casconecb0a49c2017-10-03 14:32:23 +0200409 tableId = browser.tables().getByName(piTableId.id()).getPreamble().getId();
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400410 } catch (P4InfoBrowser.NotFoundException e) {
411 log.warn("Unable to dump table: {}", e.getMessage());
412 return Collections.emptyList();
413 }
414
415 ReadRequest requestMsg = ReadRequest.newBuilder()
416 .setDeviceId(p4DeviceId)
417 .addEntities(Entity.newBuilder()
418 .setTableEntry(TableEntry.newBuilder()
419 .setTableId(tableId)
420 .build())
421 .build())
422 .build();
423
424 Iterator<ReadResponse> responses;
425 try {
426 responses = blockingStub.read(requestMsg);
427 } catch (StatusRuntimeException e) {
Yi Tsenge67e1412018-01-31 17:35:20 -0800428 logP4RuntimeErrorStatus(e, "Dump table");
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400429 return Collections.emptyList();
430 }
431
432 Iterable<ReadResponse> responseIterable = () -> responses;
433 List<TableEntry> tableEntryMsgs = StreamSupport
434 .stream(responseIterable.spliterator(), false)
435 .map(ReadResponse::getEntitiesList)
436 .flatMap(List::stream)
437 .filter(entity -> entity.getEntityCase() == TABLE_ENTRY)
438 .map(Entity::getTableEntry)
439 .collect(Collectors.toList());
440
Carmelo Cascone9f007702017-08-24 13:30:51 +0200441 log.debug("Retrieved {} entries from table {} on {}...", tableEntryMsgs.size(), piTableId, deviceId);
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400442
443 return TableEntryEncoder.decode(tableEntryMsgs, pipeconf);
444 }
445
Andrea Campanellafc1d34c2017-07-18 17:01:41 +0200446 private boolean doPacketOut(PiPacketOperation packet, PiPipeconf pipeconf) {
447 try {
448 //encode the PiPacketOperation into a PacketOut
449 PacketOut packetOut = PacketIOCodec.encodePacketOut(packet, pipeconf);
450
451 //Build the request
452 StreamMessageRequest packetOutRequest = StreamMessageRequest
453 .newBuilder().setPacket(packetOut).build();
454
455 //Send the request
456 streamRequestObserver.onNext(packetOutRequest);
457
458 } catch (P4InfoBrowser.NotFoundException e) {
459 log.error("Cant find expected metadata in p4Info file. {}", e.getMessage());
460 log.debug("Exception", e);
461 return false;
462 }
463 return true;
464 }
465
Carmelo Casconea966c342017-07-30 01:56:30 -0400466 private void doPacketIn(PacketIn packetInMsg) {
467
468 // Retrieve the pipeconf for this client's device.
469 PiPipeconfService pipeconfService = DefaultServiceDirectory.getService(PiPipeconfService.class);
470 if (pipeconfService == null) {
471 throw new IllegalStateException("PiPipeconfService is null. Can't handle packet in.");
472 }
473 final PiPipeconf pipeconf;
474 if (pipeconfService.ofDevice(deviceId).isPresent() &&
475 pipeconfService.getPipeconf(pipeconfService.ofDevice(deviceId).get()).isPresent()) {
476 pipeconf = pipeconfService.getPipeconf(pipeconfService.ofDevice(deviceId).get()).get();
477 } else {
478 log.warn("Unable to get pipeconf of {}. Can't handle packet in", deviceId);
479 return;
480 }
481 // Decode packet message and post event.
Carmelo Cascone87892e22017-11-13 16:01:29 -0800482 PiPacketOperation packetOperation = PacketIOCodec.decodePacketIn(packetInMsg, pipeconf, deviceId);
Carmelo Cascone2cad9ef2017-08-01 21:52:07 +0200483 DefaultPacketIn packetInEventSubject = new DefaultPacketIn(deviceId, packetOperation);
484 P4RuntimeEvent event = new P4RuntimeEvent(P4RuntimeEvent.Type.PACKET_IN, packetInEventSubject);
Carmelo Casconea966c342017-07-30 01:56:30 -0400485 log.debug("Received packet in: {}", event);
486 controller.postEvent(event);
487 }
488
489 private void doArbitrationUpdateFromDevice(MasterArbitrationUpdate arbitrationMsg) {
Yi Tseng3e7f1452017-10-20 10:31:53 -0700490 log.debug("Received arbitration update from {}: {}", deviceId, arbitrationMsg);
Carmelo Casconea966c342017-07-30 01:56:30 -0400491
Yi Tseng3e7f1452017-10-20 10:31:53 -0700492 Uint128 electionId = arbitrationMsg.getElectionId();
493 CompletableFuture<Boolean> mastershipFeature = arbitrationUpdateMap.remove(electionId);
494
495 if (mastershipFeature == null) {
496 log.warn("Can't find completable future of election id {}", electionId);
497 return;
498 }
499
500 this.p4RuntimeElectionId = electionId;
501 int statusCode = arbitrationMsg.getStatus().getCode();
502 MastershipRole arbitrationRole;
503 // arbitration update success
504
505 if (statusCode == Status.OK.getCode().value()) {
506 mastershipFeature.complete(true);
507 arbitrationRole = MastershipRole.MASTER;
508 } else {
509 mastershipFeature.complete(false);
510 arbitrationRole = MastershipRole.STANDBY;
511 }
512
513 DefaultArbitration arbitrationEventSubject = new DefaultArbitration(arbitrationRole, electionId);
514 P4RuntimeEvent event = new P4RuntimeEvent(P4RuntimeEvent.Type.ARBITRATION,
515 arbitrationEventSubject);
516 controller.postEvent(event);
Carmelo Casconea966c342017-07-30 01:56:30 -0400517 }
518
Carmelo Casconeb045ddc2017-09-01 01:26:35 +0200519 private Collection<PiCounterCellData> doReadCounterCells(Collection<PiCounterCellId> cellIds, PiPipeconf pipeconf) {
520
Carmelo Cascone7f75be42017-09-07 14:37:02 +0200521 // We use this map to remember the original PI counter IDs of the returned response.
Carmelo Casconeb045ddc2017-09-01 01:26:35 +0200522 final Map<Integer, PiCounterId> counterIdMap = Maps.newHashMap();
523
Carmelo Cascone7f75be42017-09-07 14:37:02 +0200524 final ReadRequest request = ReadRequest.newBuilder()
525 .setDeviceId(p4DeviceId)
526 .addAllEntities(CounterEntryCodec.encodePiCounterCellIds(cellIds, counterIdMap, pipeconf))
527 .build();
528
529 if (request.getEntitiesList().size() == 0) {
530 return Collections.emptyList();
Carmelo Casconeb045ddc2017-09-01 01:26:35 +0200531 }
532
Carmelo Cascone7f75be42017-09-07 14:37:02 +0200533 final Iterable<ReadResponse> responses;
Carmelo Casconeb045ddc2017-09-01 01:26:35 +0200534 try {
Carmelo Cascone7f75be42017-09-07 14:37:02 +0200535 responses = () -> blockingStub.read(request);
Carmelo Casconeb045ddc2017-09-01 01:26:35 +0200536 } catch (StatusRuntimeException e) {
Yi Tsenge67e1412018-01-31 17:35:20 -0800537 logP4RuntimeErrorStatus(e, "Read counter");
Carmelo Casconeb045ddc2017-09-01 01:26:35 +0200538 return Collections.emptyList();
539 }
540
Carmelo Cascone7f75be42017-09-07 14:37:02 +0200541 List<Entity> entities = StreamSupport.stream(responses.spliterator(), false)
Carmelo Casconeb045ddc2017-09-01 01:26:35 +0200542 .map(ReadResponse::getEntitiesList)
543 .flatMap(List::stream)
Carmelo Cascone7f75be42017-09-07 14:37:02 +0200544 .collect(Collectors.toList());
Carmelo Casconeb045ddc2017-09-01 01:26:35 +0200545
Carmelo Cascone7f75be42017-09-07 14:37:02 +0200546 return CounterEntryCodec.decodeCounterEntities(entities, counterIdMap, pipeconf);
Carmelo Casconeb045ddc2017-09-01 01:26:35 +0200547 }
548
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200549 private boolean doWriteActionGroupMembers(PiActionGroup group, WriteOperationType opType, PiPipeconf pipeconf) {
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200550 final Collection<ActionProfileMember> actionProfileMembers = Lists.newArrayList();
Yi Tseng82512da2017-08-16 19:46:36 -0700551 try {
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200552 for (PiActionGroupMember member : group.members()) {
553 actionProfileMembers.add(ActionProfileMemberEncoder.encode(group, member, pipeconf));
Yi Tseng82512da2017-08-16 19:46:36 -0700554 }
555 } catch (EncodeException | P4InfoBrowser.NotFoundException e) {
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200556 log.warn("Unable to write ({}) group members: {}", opType, e.getMessage());
Yi Tseng82512da2017-08-16 19:46:36 -0700557 return false;
558 }
559
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200560 final Collection<Update> updateMsgs = actionProfileMembers.stream()
Yi Tseng82512da2017-08-16 19:46:36 -0700561 .map(actionProfileMember ->
562 Update.newBuilder()
563 .setEntity(Entity.newBuilder()
564 .setActionProfileMember(actionProfileMember)
565 .build())
566 .setType(UPDATE_TYPES.get(opType))
567 .build())
568 .collect(Collectors.toList());
569
570 if (updateMsgs.size() == 0) {
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200571 // Nothing to update.
Yi Tseng82512da2017-08-16 19:46:36 -0700572 return true;
573 }
574
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200575 WriteRequest writeRequestMsg = WriteRequest.newBuilder()
Yi Tseng82512da2017-08-16 19:46:36 -0700576 .setDeviceId(p4DeviceId)
Yi Tseng3e7f1452017-10-20 10:31:53 -0700577 .setElectionId(p4RuntimeElectionId)
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200578 .addAllUpdates(updateMsgs)
579 .build();
Yi Tseng82512da2017-08-16 19:46:36 -0700580 try {
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200581 blockingStub.write(writeRequestMsg);
Yi Tseng82512da2017-08-16 19:46:36 -0700582 return true;
583 } catch (StatusRuntimeException e) {
Yi Tsenge67e1412018-01-31 17:35:20 -0800584 logP4RuntimeErrorStatus(e, String.format("%s group members", opType));
Yi Tseng82512da2017-08-16 19:46:36 -0700585 return false;
586 }
587 }
588
589 private Collection<PiActionGroup> doDumpGroups(PiActionProfileId piActionProfileId, PiPipeconf pipeconf) {
590 log.debug("Dumping groups from action profile {} from {} (pipeconf {})...",
591 piActionProfileId.id(), deviceId, pipeconf.id());
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200592
593 final P4InfoBrowser browser = PipeconfHelper.getP4InfoBrowser(pipeconf);
Yi Tseng82512da2017-08-16 19:46:36 -0700594 if (browser == null) {
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200595 log.warn("Unable to get a P4Info browser for pipeconf {}, aborting dump action profile", pipeconf);
Yi Tseng82512da2017-08-16 19:46:36 -0700596 return Collections.emptySet();
597 }
598
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200599 final int actionProfileId;
Yi Tseng82512da2017-08-16 19:46:36 -0700600 try {
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200601 actionProfileId = browser
602 .actionProfiles()
Carmelo Casconecb0a49c2017-10-03 14:32:23 +0200603 .getByName(piActionProfileId.id())
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200604 .getPreamble()
605 .getId();
Yi Tseng82512da2017-08-16 19:46:36 -0700606 } catch (P4InfoBrowser.NotFoundException e) {
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200607 log.warn("Unable to dump groups: {}", e.getMessage());
Yi Tseng82512da2017-08-16 19:46:36 -0700608 return Collections.emptySet();
609 }
610
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200611 // Prepare read request to read all groups from the given action profile.
612 final ReadRequest groupRequestMsg = ReadRequest.newBuilder()
Yi Tseng82512da2017-08-16 19:46:36 -0700613 .setDeviceId(p4DeviceId)
614 .addEntities(Entity.newBuilder()
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200615 .setActionProfileGroup(
616 ActionProfileGroup.newBuilder()
617 .setActionProfileId(actionProfileId)
618 .build())
Yi Tseng82512da2017-08-16 19:46:36 -0700619 .build())
620 .build();
621
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200622 // Read groups.
623 final Iterator<ReadResponse> groupResponses;
Yi Tseng82512da2017-08-16 19:46:36 -0700624 try {
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200625 groupResponses = blockingStub.read(groupRequestMsg);
Yi Tseng82512da2017-08-16 19:46:36 -0700626 } catch (StatusRuntimeException e) {
Yi Tsenge67e1412018-01-31 17:35:20 -0800627 logP4RuntimeErrorStatus(e, String.format("Dump group from action profile %s",
628 piActionProfileId.id()));
Yi Tseng82512da2017-08-16 19:46:36 -0700629 return Collections.emptySet();
630 }
631
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200632 final List<ActionProfileGroup> groupMsgs = Tools.stream(() -> groupResponses)
633 .map(ReadResponse::getEntitiesList)
634 .flatMap(List::stream)
635 .filter(entity -> entity.getEntityCase() == ACTION_PROFILE_GROUP)
636 .map(Entity::getActionProfileGroup)
637 .collect(Collectors.toList());
Yi Tseng82512da2017-08-16 19:46:36 -0700638
639 log.debug("Retrieved {} groups from action profile {} on {}...",
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200640 groupMsgs.size(), piActionProfileId.id(), deviceId);
Yi Tseng82512da2017-08-16 19:46:36 -0700641
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200642 // Returned groups contain only a minimal description of their members.
643 // We need to issue a new request to get the full description of each member.
Yi Tseng82512da2017-08-16 19:46:36 -0700644
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200645 // Keep a map of all member IDs for each group ID, will need it later.
646 final Multimap<Integer, Integer> groupIdToMemberIdsMap = HashMultimap.create();
647 groupMsgs.forEach(g -> groupIdToMemberIdsMap.putAll(
648 g.getGroupId(),
649 g.getMembersList().stream()
650 .map(ActionProfileGroup.Member::getMemberId)
651 .collect(Collectors.toList())));
Yi Tseng82512da2017-08-16 19:46:36 -0700652
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200653 // Prepare one big read request to read all members in one shot.
654 final Set<Entity> entityMsgs = groupMsgs.stream()
655 .flatMap(g -> g.getMembersList().stream())
656 .map(ActionProfileGroup.Member::getMemberId)
657 // Prevent issuing many read requests for the same member.
658 .distinct()
659 .map(id -> ActionProfileMember.newBuilder()
660 .setActionProfileId(actionProfileId)
661 .setMemberId(id)
662 .build())
663 .map(m -> Entity.newBuilder()
664 .setActionProfileMember(m)
665 .build())
666 .collect(Collectors.toSet());
667 final ReadRequest memberRequestMsg = ReadRequest.newBuilder().setDeviceId(p4DeviceId)
668 .addAllEntities(entityMsgs)
669 .build();
Yi Tseng82512da2017-08-16 19:46:36 -0700670
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200671 // Read members.
672 final Iterator<ReadResponse> memberResponses;
673 try {
674 memberResponses = blockingStub.read(memberRequestMsg);
675 } catch (StatusRuntimeException e) {
Yi Tsenge67e1412018-01-31 17:35:20 -0800676 logP4RuntimeErrorStatus(e, String.format("Read members from action profile %s",
677 piActionProfileId.id()));
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200678 return Collections.emptyList();
Yi Tseng82512da2017-08-16 19:46:36 -0700679 }
680
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200681 final Multimap<Integer, ActionProfileMember> groupIdToMembersMap = HashMultimap.create();
682 Tools.stream(() -> memberResponses)
683 .map(ReadResponse::getEntitiesList)
684 .flatMap(List::stream)
685 .filter(e -> e.getEntityCase() == ACTION_PROFILE_MEMBER)
686 .map(Entity::getActionProfileMember)
687 .forEach(member -> groupIdToMemberIdsMap.asMap()
688 // Get all group IDs that contain this member.
689 .entrySet()
690 .stream()
691 .filter(entry -> entry.getValue().contains(member.getMemberId()))
692 .map(Map.Entry::getKey)
693 .forEach(gid -> groupIdToMembersMap.put(gid, member)));
694
695 log.debug("Retrieved {} group members from action profile {} on {}...",
696 groupIdToMembersMap.size(), piActionProfileId.id(), deviceId);
697
698 return groupMsgs.stream()
699 .map(groupMsg -> {
700 try {
701 return ActionProfileGroupEncoder.decode(groupMsg,
702 groupIdToMembersMap.get(groupMsg.getGroupId()),
703 pipeconf);
704 } catch (P4InfoBrowser.NotFoundException | EncodeException e) {
705 log.warn("Unable to decode group: {}\n {}", e.getMessage(), groupMsg);
706 return null;
707 }
708 })
709 .filter(Objects::nonNull)
710 .collect(Collectors.toList());
Yi Tseng82512da2017-08-16 19:46:36 -0700711 }
712
713 private boolean doWriteActionGroup(PiActionGroup group, WriteOperationType opType, PiPipeconf pipeconf) {
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200714 final ActionProfileGroup actionProfileGroup;
Yi Tseng82512da2017-08-16 19:46:36 -0700715 try {
716 actionProfileGroup = ActionProfileGroupEncoder.encode(group, pipeconf);
717 } catch (EncodeException | P4InfoBrowser.NotFoundException e) {
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200718 log.warn("Unable to encode group: {}", e.getMessage());
Yi Tseng82512da2017-08-16 19:46:36 -0700719 return false;
720 }
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200721
722 final WriteRequest writeRequestMsg = WriteRequest.newBuilder()
Yi Tseng82512da2017-08-16 19:46:36 -0700723 .setDeviceId(p4DeviceId)
Yi Tseng3e7f1452017-10-20 10:31:53 -0700724 .setElectionId(p4RuntimeElectionId)
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200725 .addUpdates(Update.newBuilder()
726 .setEntity(Entity.newBuilder()
727 .setActionProfileGroup(actionProfileGroup)
728 .build())
729 .setType(UPDATE_TYPES.get(opType))
730 .build())
731 .build();
Yi Tseng82512da2017-08-16 19:46:36 -0700732 try {
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200733 blockingStub.write(writeRequestMsg);
Yi Tseng82512da2017-08-16 19:46:36 -0700734 return true;
735 } catch (StatusRuntimeException e) {
Yi Tsenge67e1412018-01-31 17:35:20 -0800736 logP4RuntimeErrorStatus(e, String.format("%s group", opType));
Yi Tseng82512da2017-08-16 19:46:36 -0700737 return false;
738 }
739 }
740
Carmelo Casconeb2e3dba2017-07-27 12:07:09 -0400741 /**
742 * Returns the internal P4 device ID associated with this client.
743 *
744 * @return P4 device ID
745 */
Carmelo Casconef423bec2017-08-30 01:56:25 +0200746 public long p4DeviceId() {
Carmelo Casconeb2e3dba2017-07-27 12:07:09 -0400747 return p4DeviceId;
748 }
749
750 /**
751 * For testing purpose only. TODO: remove before release.
752 *
753 * @return blocking stub
754 */
755 public P4RuntimeGrpc.P4RuntimeBlockingStub blockingStub() {
756 return this.blockingStub;
757 }
758
Andrea Campanellafc1d34c2017-07-18 17:01:41 +0200759
Andrea Campanella432f7182017-07-14 18:43:27 +0200760 @Override
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400761 public void shutdown() {
762
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400763 log.info("Shutting down client for {}...", deviceId);
764
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400765 writeLock.lock();
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400766 try {
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400767 if (streamRequestObserver != null) {
768 streamRequestObserver.onCompleted();
769 cancellableContext.cancel(new InterruptedException("Requested client shutdown"));
770 }
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400771
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400772 this.executorService.shutdown();
773 try {
774 executorService.awaitTermination(5, TimeUnit.SECONDS);
775 } catch (InterruptedException e) {
776 log.warn("Executor service didn't shutdown in time.");
Ray Milkey5c7d4882018-02-05 14:50:39 -0800777 Thread.currentThread().interrupt();
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400778 }
779 } finally {
780 writeLock.unlock();
781 }
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400782 }
783
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400784 /**
785 * Handles messages received from the device on the stream channel.
786 */
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400787 private class StreamChannelResponseObserver implements StreamObserver<StreamMessageResponse> {
788
789 @Override
790 public void onNext(StreamMessageResponse message) {
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400791 executorService.submit(() -> doNext(message));
792 }
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400793
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400794 private void doNext(StreamMessageResponse message) {
Carmelo Casconea966c342017-07-30 01:56:30 -0400795 try {
Andrea Campanella0288c872017-08-07 18:32:51 +0200796 log.debug("Received message on stream channel from {}: {}", deviceId, message.getUpdateCase());
Carmelo Casconea966c342017-07-30 01:56:30 -0400797 switch (message.getUpdateCase()) {
798 case PACKET:
799 // Packet-in
800 doPacketIn(message.getPacket());
Andrea Campanella288b2732017-07-28 14:16:16 +0200801 return;
Carmelo Casconea966c342017-07-30 01:56:30 -0400802 case ARBITRATION:
803 doArbitrationUpdateFromDevice(message.getArbitration());
804 return;
805 default:
806 log.warn("Unrecognized stream message from {}: {}", deviceId, message.getUpdateCase());
807 }
808 } catch (Throwable ex) {
809 log.error("Exception while processing stream channel message from {}", deviceId, ex);
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400810 }
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400811 }
812
813 @Override
814 public void onError(Throwable throwable) {
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400815 log.warn("Error on stream channel for {}: {}", deviceId, Status.fromThrowable(throwable));
816 // FIXME: we might want to recreate the channel.
817 // In general, we want to be robust against any transient error and, if the channel is open, make sure the
818 // stream channel is always on.
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400819 }
820
821 @Override
822 public void onCompleted() {
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400823 log.warn("Stream channel for {} has completed", deviceId);
824 // FIXME: same concern as before.
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400825 }
826 }
Carmelo Cascone87892e22017-11-13 16:01:29 -0800827}