blob: 77e87ef36b3802ee5b88a91fdec03b978a313dc9 [file] [log] [blame]
/*
* Copyright 2016-present Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.protocol.restconf.ctl;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.glassfish.jersey.client.ChunkedInput;
import org.onlab.packet.IpAddress;
import org.onosproject.net.DeviceId;
import org.onosproject.protocol.http.ctl.HttpSBControllerImpl;
import org.onosproject.protocol.rest.RestSBDevice;
import org.onosproject.protocol.restconf.RestConfNotificationEventListener;
import org.onosproject.protocol.restconf.RestConfSBController;
import org.onosproject.protocol.restconf.RestconfNotificationEventListener;
import org.onosproject.yms.ych.YangProtocolEncodingFormat;
import org.onosproject.yms.ymsm.YmsService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.ws.rs.client.WebTarget;
import javax.ws.rs.core.GenericType;
import javax.ws.rs.core.Response;
import java.io.InputStream;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* The implementation of RestConfSBController.
*/
@Component(immediate = true)
@Service
public class RestConfSBControllerImpl extends HttpSBControllerImpl
implements RestConfSBController {
private static final Logger log = LoggerFactory
.getLogger(RestConfSBControllerImpl.class);
// TODO: for the Ibis release when both RESTCONF server and RESTCONF client
// fully support root resource discovery, ROOT_RESOURCE constant will be
// removed and rather the value would get discovered dynamically.
private static final String ROOT_RESOURCE = "/onos/restconf";
private static final String RESOURCE_PATH_PREFIX = "/data/";
private static final String NOTIFICATION_PATH_PREFIX = "/streams/";
private Map<DeviceId, Set<RestconfNotificationEventListener>>
restconfNotificationListenerMap = new ConcurrentHashMap<>();
private Map<DeviceId, GetChunksRunnable> runnableTable = new ConcurrentHashMap<>();
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected YmsService ymsService;
ExecutorService executor = Executors.newCachedThreadPool();
@Activate
public void activate() {
log.info("RESTCONF SBI Started");
if (ymsService != null) {
ymsService
.registerDefaultCodec(new JsonYdtCodec(ymsService),
YangProtocolEncodingFormat.JSON);
}
}
@Deactivate
public void deactivate() {
log.info("RESTCONF SBI Stopped");
executor.shutdown();
this.getClientMap().clear();
this.getDeviceMap().clear();
}
@Override
public Map<DeviceId, RestSBDevice> getDevices() {
log.trace("RESTCONF SBI::getDevices");
return super.getDevices();
}
@Override
public RestSBDevice getDevice(DeviceId deviceInfo) {
log.trace("RESTCONF SBI::getDevice with deviceId");
return super.getDevice(deviceInfo);
}
@Override
public RestSBDevice getDevice(IpAddress ip, int port) {
log.trace("RESTCONF SBI::getDevice with ip and port");
return super.getDevice(ip, port);
}
@Override
public void addDevice(RestSBDevice device) {
log.trace("RESTCONF SBI::addDevice");
super.addDevice(device);
}
@Override
public void removeDevice(DeviceId deviceId) {
log.trace("RESTCONF SBI::removeDevice");
super.removeDevice(deviceId);
}
@Override
public boolean post(DeviceId device, String request, InputStream payload,
String mediaType) {
request = discoverRootResource(device) + RESOURCE_PATH_PREFIX
+ request;
return super.post(device, request, payload, mediaType);
}
@Override
public <T> T post(DeviceId device, String request, InputStream payload,
String mediaType, Class<T> responseClass) {
request = discoverRootResource(device) + RESOURCE_PATH_PREFIX
+ request;
return super.post(device, request, payload, mediaType, responseClass);
}
@Override
public boolean put(DeviceId device, String request, InputStream payload,
String mediaType) {
request = discoverRootResource(device) + RESOURCE_PATH_PREFIX
+ request;
return super.put(device, request, payload, mediaType);
}
@Override
public InputStream get(DeviceId device, String request, String mediaType) {
request = discoverRootResource(device) + RESOURCE_PATH_PREFIX
+ request;
return super.get(device, request, mediaType);
}
@Override
public boolean patch(DeviceId device, String request, InputStream payload,
String mediaType) {
request = discoverRootResource(device) + RESOURCE_PATH_PREFIX
+ request;
return super.patch(device, request, payload, mediaType);
}
@Override
public boolean delete(DeviceId device, String request, InputStream payload,
String mediaType) {
request = discoverRootResource(device) + RESOURCE_PATH_PREFIX
+ request;
return super.delete(device, request, payload, mediaType);
}
@Override
public void enableNotifications(DeviceId device, String request,
String mediaType,
RestconfNotificationEventListener listener) {
if (isNotificationEnabled(device)) {
log.warn("enableNotifications: already enabled on device: {}", device);
return;
}
request = discoverRootResource(device) + NOTIFICATION_PATH_PREFIX
+ request;
addNotificationListener(device, listener);
GetChunksRunnable runnable = new GetChunksRunnable(request, mediaType,
device);
runnableTable.put(device, runnable);
executor.execute(runnable);
}
@Override
public void enableNotifications(DeviceId device, String request,
String mediaType,
RestConfNotificationEventListener callBackListener) {
//TODO: to be removed once the call to the API is updated.
}
public void stopNotifications(DeviceId device) {
runnableTable.get(device).terminate();
runnableTable.remove(device);
restconfNotificationListenerMap.remove(device);
log.debug("Stop sending notifications for device URI: " + device.uri().toString());
}
public class GetChunksRunnable implements Runnable {
private String request;
private String mediaType;
private DeviceId device;
private volatile boolean running = true;
public void terminate() {
running = false;
}
/**
* @param request request
* @param mediaType media type
* @param device device identifier
*/
public GetChunksRunnable(String request, String mediaType,
DeviceId device) {
this.request = request;
this.mediaType = mediaType;
this.device = device;
}
@Override
public void run() {
WebTarget wt = getWebTarget(device, request);
Response clientResp = wt.request(mediaType).get();
Set<RestconfNotificationEventListener> listeners =
restconfNotificationListenerMap.get(device);
final ChunkedInput<String> chunkedInput = (ChunkedInput<String>) clientResp
.readEntity(new GenericType<ChunkedInput<String>>() {
});
String chunk;
// Note that the read() is a blocking operation and the invoking
// thread is blocked until a new chunk comes. Jersey implementation
// of this IO operation is in a way that it does not respond to
// interrupts.
while (running) {
chunk = chunkedInput.read();
if (chunk != null) {
if (running) {
for (RestconfNotificationEventListener listener : listeners) {
listener.handleNotificationEvent(device, chunk);
}
} else {
log.trace("the requesting client is no more interested "
+ "to receive such notifications.");
}
} else {
log.trace("The received notification chunk is null. do not continue any more.");
break;
}
}
log.trace("out of while loop -- end of run");
}
}
public String discoverRootResource(DeviceId device) {
// FIXME: send a GET command to the device to discover the root resource.
// The plan to fix this is for the Ibis release when the RESTCONF server and
// the RESTCONF client both support root resource discovery.
return ROOT_RESOURCE;
}
@Override
public void addNotificationListener(DeviceId deviceId,
RestconfNotificationEventListener listener) {
Set<RestconfNotificationEventListener> listeners =
restconfNotificationListenerMap.get(deviceId);
if (listeners == null) {
listeners = new HashSet<>();
}
listeners.add(listener);
this.restconfNotificationListenerMap.put(deviceId, listeners);
}
@Override
public void removeNotificationListener(DeviceId deviceId,
RestconfNotificationEventListener listener) {
Set<RestconfNotificationEventListener> listeners =
restconfNotificationListenerMap.get(deviceId);
if (listeners != null) {
listeners.remove(listener);
}
}
@Override
public void removeNotificationListener(DeviceId deviceId) {
//TODO: This API is obsolete.
return;
}
public boolean isNotificationEnabled(DeviceId deviceId) {
return runnableTable.containsKey(deviceId);
}
}