Introduced SetPackage and SwitchControlProcessor RPC to gNOI implementation.
Change-Id: Ib8df537459b50bb83fadfe217e6ea5cd2bf6da5f
diff --git a/cli/src/main/java/org/onosproject/cli/net/SoftwareUpgradeCommand.java b/cli/src/main/java/org/onosproject/cli/net/SoftwareUpgradeCommand.java
new file mode 100644
index 0000000..7031a78
--- /dev/null
+++ b/cli/src/main/java/org/onosproject/cli/net/SoftwareUpgradeCommand.java
@@ -0,0 +1,103 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.cli.net;
+
+import org.apache.karaf.shell.api.action.Argument;
+import org.apache.karaf.shell.api.action.Command;
+import org.apache.karaf.shell.api.action.Option;
+import org.apache.karaf.shell.api.action.Completion;
+import org.apache.karaf.shell.api.action.lifecycle.Service;
+import org.onosproject.cli.AbstractShellCommand;
+import org.onosproject.net.Device;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.device.DeviceService;
+import org.onosproject.net.behaviour.SoftwareUpgrade;
+import org.onosproject.net.behaviour.SoftwareUpgrade.Response;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * CLI command to Administratively upgrade device.
+ */
+@Service
+@Command(scope = "onos", name = "device-upgrade",
+ description = "Administratively upgrades a device")
+public class SoftwareUpgradeCommand extends AbstractShellCommand {
+ @Argument(index = 0, name = "deviceId", description = "Device ID",
+ required = true, multiValued = false)
+ @Completion(DeviceIdCompleter.class)
+ String deviceId = null;
+
+ @Option(name = "-p", aliases = "--package-to-upload",
+ description = "Path to the package to be installed",
+ required = false, multiValued = false)
+ String packageToUpload = null;
+
+ @Option(name = "-d", aliases = "--device-local-path",
+ description = "Path on target device where (if specified) the package will be uploaded "
+ + "and/or the package to be installed",
+ required = false, multiValued = false)
+ String deviceLocalPath = null;
+
+ @Override
+ protected void doExecute() {
+ if (packageToUpload == null && deviceLocalPath == null) {
+ log.warn("Please specify the path to the file you want to install");
+ return;
+ }
+ Device device = get(DeviceService.class).getDevice(DeviceId.deviceId(deviceId));
+
+ if (device == null) {
+ log.warn("Device {} does not exist", deviceId);
+ return;
+ }
+
+ if (!device.is(SoftwareUpgrade.class)) {
+ log.warn("Device {} does not support {} behaviour", deviceId, SoftwareUpgrade.class.getName());
+ return;
+ }
+
+ log.info("Starting upgrade for {} (check log for errors)...", deviceId);
+ CompletableFuture.supplyAsync(() -> {
+ if (packageToUpload != null) {
+ return device.as(SoftwareUpgrade.class)
+ .uploadPackage(packageToUpload, deviceLocalPath)
+ .join();
+ } else {
+ return deviceLocalPath;
+ }
+ })
+ .thenCompose((String pathOnDevice) -> {
+ if (pathOnDevice == null) {
+ log.warn("Package Upload for {} on {} failed", packageToUpload, deviceId);
+ return CompletableFuture.completedFuture(new Response());
+ }
+ return device.as(SoftwareUpgrade.class)
+ .swapAgent(pathOnDevice);
+ })
+ .whenComplete((Response result, Throwable exception) -> {
+ if (exception != null) {
+ log.error("Error while upgrading device {}", deviceId, exception);
+ } else if (result == null || !result.isSuccess()) {
+ log.warn("Upgrade on {} failed", deviceId);
+ } else {
+ log.info("Upgrade on {} succeeded! \n" +
+ "New SW version: {} \n" +
+ "Uptime: {} ns",
+ deviceId, result.getVersion(), result.getUptime());
+ }
+ });
+ }
+}
diff --git a/core/api/src/main/java/org/onosproject/net/behaviour/SoftwareUpgrade.java b/core/api/src/main/java/org/onosproject/net/behaviour/SoftwareUpgrade.java
index c6cdb44..24bb597 100644
--- a/core/api/src/main/java/org/onosproject/net/behaviour/SoftwareUpgrade.java
+++ b/core/api/src/main/java/org/onosproject/net/behaviour/SoftwareUpgrade.java
@@ -20,7 +20,6 @@
import org.onosproject.net.driver.HandlerBehaviour;
-import java.net.URI;
import java.util.concurrent.CompletableFuture;
/**
@@ -31,32 +30,55 @@
public interface SoftwareUpgrade extends HandlerBehaviour {
/**
- * Completion status of upgrade.
+ * Upload a package to device. If no destination path is specified
+ * the package will be stored in the /tmp folder with a randomized name.
+ *
+ * @param sourcePath path to local package.
+ * @param destinationPath (optional) path where the package will be saved.
+ * @return path where the package was saved on device or null in case of an error.
*/
- public enum Status {
- /**
- * Indicates a successfully completed upgrade.
- */
- SUCCESS,
+ public CompletableFuture<String> uploadPackage(String sourcePath, String destinationPath);
- /**
- * Indicates an aborted upgrade.
- */
- FAILURE
+ /**
+ * Causes the device to switch from the current software agent
+ * to the provided agent.
+ *
+ * @param packagePath path to package on device.
+ * @return success - if no exceptions occured; device uptime; device version.
+ */
+ public CompletableFuture<Response> swapAgent(String packagePath);
+
+ /**
+ * Response of SwapAgent.
+ */
+ public final class Response {
+ private final Long uptime;
+ private final String version;
+ private final boolean success;
+
+ public Response(Long a, String b) {
+ uptime = a;
+ version = b;
+ success = true;
+ }
+
+ public Response() {
+ uptime = 0L;
+ version = "";
+ success = false;
+ }
+
+ public Long getUptime() {
+ return uptime;
+ }
+
+ public String getVersion() {
+ return version;
+ }
+
+ public boolean isSuccess() {
+ return success;
+ }
}
- /**
- * Configures the uri from where the upgrade will be pulled.
- *
- * @param uri uri of the software upgrade location
- * @return boolean true if the uri was properly configured
- */
- boolean configureUri(URI uri);
-
- /**
- * Performs an upgrade.
- *
- * @return A future that will be completed when the upgrade completes
- */
- CompletableFuture<Status> upgrade();
}
diff --git a/drivers/gnoi/src/main/java/org/onosproject/drivers/gnoi/GnoiSoftwareUpgradeImpl.java b/drivers/gnoi/src/main/java/org/onosproject/drivers/gnoi/GnoiSoftwareUpgradeImpl.java
new file mode 100644
index 0000000..1ad50d2d
--- /dev/null
+++ b/drivers/gnoi/src/main/java/org/onosproject/drivers/gnoi/GnoiSoftwareUpgradeImpl.java
@@ -0,0 +1,221 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.drivers.gnoi;
+
+import com.google.protobuf.ByteString;
+import gnoi.system.SystemOuterClass;
+import gnoi.system.SystemOuterClass.SetPackageRequest;
+import gnoi.system.SystemOuterClass.SetPackageResponse;
+import gnoi.system.SystemOuterClass.SwitchControlProcessorRequest;
+import gnoi.types.Types;
+import org.onosproject.gnoi.api.GnoiClient;
+import org.onosproject.gnoi.api.GnoiController;
+import org.onosproject.grpc.utils.AbstractGrpcHandlerBehaviour;
+import org.onosproject.net.behaviour.SoftwareUpgrade;
+import org.apache.commons.io.FilenameUtils;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.UUID;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import javax.xml.bind.DatatypeConverter;
+
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+
+/**
+ * Implementation that upgrades the software on a device.
+ */
+public class GnoiSoftwareUpgradeImpl
+ extends AbstractGrpcHandlerBehaviour<GnoiClient, GnoiController>
+ implements SoftwareUpgrade {
+
+ private static final String HASHING_METHOD = "MD5";
+ private static final int MAX_CHUNK_SIZE = 64_000;
+ private static final Path DEFAULT_PACKAGE_PATH = Paths.get("/tmp");
+ private static final int STREAM_TIMEOUT_SECONDS = 10;
+
+ public GnoiSoftwareUpgradeImpl() {
+ super(GnoiController.class);
+ }
+
+ @Override
+ public CompletableFuture<String> uploadPackage(String sourcePath, String dst) {
+ if (!setupBehaviour("uploadPackage()")) {
+ return CompletableFuture.completedFuture(null);
+ }
+ checkNotNull(sourcePath, "Source file not specified.");
+
+ final CompletableFuture<String> future = new CompletableFuture<>();
+
+ final File deb = Paths.get(sourcePath).toFile();
+ final String destinationPath;
+
+ final SetPackageRequest.Builder requestBuilder = SetPackageRequest.newBuilder();
+ final SystemOuterClass.Package.Builder pkgBuilder = SystemOuterClass.Package.newBuilder();
+ final Types.HashType.Builder hashBuilder = Types.HashType.newBuilder();
+
+ final SynchronousQueue<SetPackageRequest> stream = new SynchronousQueue<>();
+ final CompletableFuture<SetPackageResponse> futureResponse = client.setPackage(stream);
+
+ if (dst == null) {
+ destinationPath = getTempPath(sourcePath);
+ } else {
+ destinationPath = dst;
+ }
+
+ futureResponse.whenComplete((response, exception) -> {
+ if (exception == null) {
+ future.complete(destinationPath);
+ } else {
+ future.complete(null);
+ }
+ });
+
+ // Handle reading file, creating requests, etc...
+ CompletableFuture.runAsync(() -> {
+ try {
+
+ if (!deb.isFile()) {
+ log.error("File {} does not exist", sourcePath);
+ future.complete(null);
+ return;
+ }
+ // Set general package info (filename, version, etc...).
+ pkgBuilder.setFilename(destinationPath);
+ requestBuilder.setPackage(pkgBuilder.build());
+ boolean requestSent = stream.offer(requestBuilder.build(), STREAM_TIMEOUT_SECONDS, TimeUnit.SECONDS);
+ if (!requestSent) {
+ future.complete(null);
+ return;
+ }
+
+ final MessageDigest md = MessageDigest.getInstance(HASHING_METHOD);
+ final FileInputStream buffer = new FileInputStream(deb);
+ byte[] contents = new byte[MAX_CHUNK_SIZE];
+ int read = 0;
+
+ // Read file in 64k chunks.
+ while ((read = buffer.read(contents, 0, MAX_CHUNK_SIZE)) != -1) {
+ // Calculate File hash.
+ md.update(contents, 0, read);
+
+ // Form next request.
+ requestBuilder.setContents(ByteString.copyFrom(contents, 0, read));
+
+ // Add file chunk to stream.
+ requestSent = stream.offer(requestBuilder.build(), STREAM_TIMEOUT_SECONDS, TimeUnit.SECONDS);
+ if (!requestSent) {
+ future.complete(null);
+ return;
+ }
+ }
+
+ // Convert hash to lowercase string.
+ String hash = DatatypeConverter
+ .printHexBinary(md.digest())
+ .toLowerCase();
+
+ hashBuilder
+ .setMethodValue(Types.HashType.HashMethod.MD5.getNumber())
+ .setHash(ByteString.copyFrom(hash.getBytes()));
+
+ // Form last request with file hash.
+ requestBuilder.setHash(hashBuilder.build());
+
+ // Add file chunk to stream.
+ requestSent = stream.offer(requestBuilder.build(), STREAM_TIMEOUT_SECONDS, TimeUnit.SECONDS);
+ if (!requestSent) {
+ future.complete(null);
+ return;
+ }
+
+ } catch (IOException e) {
+ log.error("Error while reading file {}", sourcePath, e);
+ future.complete(null);
+ return;
+ } catch (InterruptedException e) {
+ log.error("Interrupted while sending package", e);
+ future.complete(null);
+ return;
+ } catch (SecurityException e) {
+ log.error("File {} cannot be accessed", sourcePath, e);
+ future.complete(null);
+ return;
+ } catch (NoSuchAlgorithmException e) {
+ log.error("Invalid hashing algorithm {}", HASHING_METHOD, e);
+ future.complete(null);
+ return;
+ }
+ });
+
+ return future;
+ }
+
+ @Override
+ public CompletableFuture<Response> swapAgent(String packagePath) {
+ if (!setupBehaviour("swapAgent()")) {
+ return CompletableFuture.completedFuture(new Response());
+ }
+ checkNotNull(packagePath, "File path not specified.");
+
+ final CompletableFuture<Response> future = new CompletableFuture<>();
+ final Types.Path.Builder routeProcessor = Types.Path.newBuilder();
+ final SwitchControlProcessorRequest.Builder requestMsg = SwitchControlProcessorRequest.newBuilder();
+
+ Paths.get(packagePath)
+ .iterator()
+ .forEachRemaining(part -> {
+ routeProcessor.addElem(
+ Types.PathElem.newBuilder()
+ .setName(part.toString())
+ .build());
+ });
+
+ requestMsg.setControlProcessor(routeProcessor.build());
+
+ client.switchControlProcessor(requestMsg.build())
+ .whenComplete((response, exception) -> {
+ if (exception != null) {
+ future.complete(new Response());
+ } else {
+ future.complete(new Response(response.getUptime(), response.getVersion()));
+ }
+ });
+
+ return future;
+ }
+
+ private String getTempPath(String source) {
+ String baseName = FilenameUtils.getBaseName(source);
+ String extension = FilenameUtils.getExtension(source);
+
+ if (extension.length() != 0) {
+ extension = "." + extension;
+ }
+
+ String filename = baseName + "_" + UUID.randomUUID().toString() + extension;
+ return DEFAULT_PACKAGE_PATH.resolve(filename).toString();
+ }
+}
diff --git a/drivers/gnoi/src/main/resources/gnoi-drivers.xml b/drivers/gnoi/src/main/resources/gnoi-drivers.xml
index 891fcd7..028a1ee 100644
--- a/drivers/gnoi/src/main/resources/gnoi-drivers.xml
+++ b/drivers/gnoi/src/main/resources/gnoi-drivers.xml
@@ -20,6 +20,8 @@
impl="org.onosproject.drivers.gnoi.GnoiDeviceDescriptionDiscovery"/>
<behaviour api="org.onosproject.net.behaviour.BasicSystemOperations"
impl="org.onosproject.drivers.gnoi.GnoiBasicSystemOperationsImpl"/>
+ <behaviour api="org.onosproject.net.behaviour.SoftwareUpgrade"
+ impl="org.onosproject.drivers.gnoi.GnoiSoftwareUpgradeImpl"/>
<behaviour api="org.onosproject.net.device.DeviceHandshaker"
impl="org.onosproject.drivers.gnoi.GnoiHandshaker"/>
</driver>
diff --git a/protocols/gnoi/api/src/main/java/org/onosproject/gnoi/api/GnoiClient.java b/protocols/gnoi/api/src/main/java/org/onosproject/gnoi/api/GnoiClient.java
index d66c6f1..277d0c3 100644
--- a/protocols/gnoi/api/src/main/java/org/onosproject/gnoi/api/GnoiClient.java
+++ b/protocols/gnoi/api/src/main/java/org/onosproject/gnoi/api/GnoiClient.java
@@ -19,9 +19,14 @@
import gnoi.system.SystemOuterClass.TimeResponse;
import gnoi.system.SystemOuterClass.RebootRequest;
import gnoi.system.SystemOuterClass.RebootResponse;
+import gnoi.system.SystemOuterClass.SetPackageRequest;
+import gnoi.system.SystemOuterClass.SetPackageResponse;
+import gnoi.system.SystemOuterClass.SwitchControlProcessorRequest;
+import gnoi.system.SystemOuterClass.SwitchControlProcessorResponse;
import com.google.common.annotations.Beta;
import org.onosproject.grpc.api.GrpcClient;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.SynchronousQueue;
/**
* Client to control a gNOI server.
@@ -43,4 +48,23 @@
* @return the RebootResponse result
*/
CompletableFuture<RebootResponse> reboot(RebootRequest request);
+
+ /**
+ * Executes a SetPackage RPC for the given list
+ * of SetPackageRequest messages.
+ *
+ * @param stream list of SetPackageRequests.
+ * @return SetPackageResponse from device.
+ */
+ CompletableFuture<SetPackageResponse> setPackage(SynchronousQueue<SetPackageRequest> stream);
+
+ /**
+ * Executes a SwitchControlProcessor RPC given the path
+ * to the new software agent.
+ *
+ * @param request SwitchControlProcessorRequest
+ * @return SwitchControlProcessorResponse from device.
+ */
+ CompletableFuture<SwitchControlProcessorResponse> switchControlProcessor(SwitchControlProcessorRequest request);
+
}
diff --git a/protocols/gnoi/ctl/src/main/java/org/onosproject/gnoi/ctl/GnoiClientImpl.java b/protocols/gnoi/ctl/src/main/java/org/onosproject/gnoi/ctl/GnoiClientImpl.java
index 84c8364..2e31b8c 100644
--- a/protocols/gnoi/ctl/src/main/java/org/onosproject/gnoi/ctl/GnoiClientImpl.java
+++ b/protocols/gnoi/ctl/src/main/java/org/onosproject/gnoi/ctl/GnoiClientImpl.java
@@ -21,8 +21,13 @@
import gnoi.system.SystemOuterClass.RebootResponse;
import gnoi.system.SystemOuterClass.TimeRequest;
import gnoi.system.SystemOuterClass.TimeResponse;
+import gnoi.system.SystemOuterClass.SetPackageRequest;
+import gnoi.system.SystemOuterClass.SetPackageResponse;
+import gnoi.system.SystemOuterClass.SwitchControlProcessorRequest;
+import gnoi.system.SystemOuterClass.SwitchControlProcessorResponse;
import io.grpc.ManagedChannel;
import io.grpc.stub.StreamObserver;
+import io.grpc.stub.ClientCallStreamObserver;
import org.onosproject.gnoi.api.GnoiClient;
import org.onosproject.grpc.ctl.AbstractGrpcClient;
import org.onosproject.net.DeviceId;
@@ -32,6 +37,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
+import java.util.concurrent.SynchronousQueue;
/**
* Implementation of gNOI client.
@@ -40,6 +46,7 @@
private static final int RPC_TIMEOUT_SECONDS = 10;
private static final Logger log = LoggerFactory.getLogger(GnoiClientImpl.class);
+ private ClientCallStreamObserver<SetPackageRequest> requestObserver = null;
GnoiClientImpl(DeviceId deviceId, ManagedChannel managedChannel, GnoiControllerImpl controller) {
super(deviceId, managedChannel, false, controller);
@@ -110,6 +117,69 @@
return future;
}
+ @Override
+ public CompletableFuture<SetPackageResponse> setPackage(SynchronousQueue<SetPackageRequest> stream) {
+ final CompletableFuture<SetPackageResponse> future = new CompletableFuture<>();
+ final StreamObserver<SetPackageResponse> observer =
+ new StreamObserver<SetPackageResponse>() {
+ @Override
+ public void onNext(SetPackageResponse value) {
+ future.complete(value);
+ }
+ @Override
+ public void onError(Throwable t) {
+ future.completeExceptionally(t);
+ handleRpcError(t, "gNOI set package request");
+ }
+ @Override
+ public void onCompleted() {
+ // ignore
+ }
+ };
+
+ execRpc(s -> requestObserver =
+ (ClientCallStreamObserver<SetPackageRequest>) s.setPackage(observer));
+
+ CompletableFuture.runAsync(() -> {
+ SetPackageRequest request;
+ try {
+ while ((request = stream.poll(RPC_TIMEOUT_SECONDS, TimeUnit.SECONDS)) != null) {
+ requestObserver.onNext(request);
+ }
+ } catch (InterruptedException e) {
+ return;
+ }
+ });
+
+ return future;
+ }
+
+ @Override
+ public CompletableFuture<SwitchControlProcessorResponse> switchControlProcessor(
+ SwitchControlProcessorRequest request) {
+
+ final CompletableFuture<SwitchControlProcessorResponse> future = new CompletableFuture<>();
+ final StreamObserver<SwitchControlProcessorResponse> observer =
+ new StreamObserver<SwitchControlProcessorResponse>() {
+ @Override
+ public void onNext(SwitchControlProcessorResponse value) {
+ future.complete(value);
+ }
+ @Override
+ public void onError(Throwable t) {
+ handleRpcError(t, "gNOI SwitchControlProcessor request");
+ future.completeExceptionally(t);
+ }
+ @Override
+ public void onCompleted() {
+ // ignore
+ }
+ };
+
+ execRpc(s -> s.switchControlProcessor(request, observer));
+ return future;
+ }
+
/**
* Forces execution of an RPC in a cancellable context with a timeout.
*