blob: 152106a9981b3a588d30749743562ea66ae1629c [file] [log] [blame]
Andrea Campanella378e21a2017-06-07 12:09:59 +02001/*
2 * Copyright 2017-present Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.grpc.ctl;
18
19import com.google.common.collect.ImmutableSet;
20import io.grpc.ManagedChannel;
21import io.grpc.ManagedChannelBuilder;
Carmelo Cascone59f57de2017-07-11 19:55:09 -040022import io.grpc.Status;
23import io.grpc.StatusRuntimeException;
Andrea Campanella378e21a2017-06-07 12:09:59 +020024import org.apache.felix.scr.annotations.Activate;
25import org.apache.felix.scr.annotations.Component;
26import org.apache.felix.scr.annotations.Deactivate;
27import org.apache.felix.scr.annotations.Service;
28import org.onosproject.grpc.api.GrpcChannelId;
29import org.onosproject.grpc.api.GrpcController;
30import org.onosproject.grpc.api.GrpcObserverHandler;
31import org.onosproject.grpc.api.GrpcStreamObserverId;
Carmelo Cascone59f57de2017-07-11 19:55:09 -040032import org.onosproject.grpc.ctl.dummy.Dummy;
33import org.onosproject.grpc.ctl.dummy.DummyServiceGrpc;
Andrea Campanella378e21a2017-06-07 12:09:59 +020034import org.onosproject.net.DeviceId;
35import org.slf4j.Logger;
36import org.slf4j.LoggerFactory;
37
Carmelo Cascone59f57de2017-07-11 19:55:09 -040038import java.io.IOException;
Andrea Campanella378e21a2017-06-07 12:09:59 +020039import java.util.Collection;
40import java.util.HashSet;
41import java.util.Map;
42import java.util.Optional;
43import java.util.Set;
44import java.util.concurrent.ConcurrentHashMap;
Carmelo Cascone59f57de2017-07-11 19:55:09 -040045import java.util.concurrent.TimeUnit;
Andrea Campanella378e21a2017-06-07 12:09:59 +020046
47/**
48 * Default implementation of the GrpcController.
49 */
50@Component(immediate = true)
51@Service
52public class GrpcControllerImpl implements GrpcController {
53
Carmelo Cascone59f57de2017-07-11 19:55:09 -040054 private static final int CONNECTION_TIMEOUT_SECONDS = 20;
55
Andrea Campanella378e21a2017-06-07 12:09:59 +020056 public static final Logger log = LoggerFactory
57 .getLogger(GrpcControllerImpl.class);
58
59 private Map<GrpcStreamObserverId, GrpcObserverHandler> observers;
60 private Map<GrpcChannelId, ManagedChannel> channels;
61 private Map<GrpcChannelId, ManagedChannelBuilder<?>> channelBuilders;
62
63 @Activate
64 public void activate() {
65 observers = new ConcurrentHashMap<>();
66 channels = new ConcurrentHashMap<>();
67 channelBuilders = new ConcurrentHashMap<>();
68 log.info("Started");
69 }
70
71 @Deactivate
72 public void deactivate() {
73 channels.values().forEach(ManagedChannel::shutdown);
74 observers.clear();
75 channels.clear();
76 channelBuilders.clear();
77 log.info("Stopped");
78 }
79
80 @Override
81 public void addObserver(GrpcStreamObserverId observerId, GrpcObserverHandler grpcObserverHandler) {
82 grpcObserverHandler.bindObserver(channels.get(observerId.serviceId().channelId()));
83 observers.put(observerId, grpcObserverHandler);
84 }
85
86 @Override
87 public void removeObserver(GrpcStreamObserverId observerId) {
88 observers.get(observerId).removeObserver();
89 observers.remove(observerId);
90 }
91
92 @Override
93 public Optional<GrpcObserverHandler> getObserverManager(GrpcStreamObserverId observerId) {
94 return Optional.ofNullable(observers.get(observerId));
95 }
96
97 @Override
Carmelo Cascone59f57de2017-07-11 19:55:09 -040098 public ManagedChannel connectChannel(GrpcChannelId channelId, ManagedChannelBuilder<?> channelBuilder)
99 throws IOException {
Andrea Campanella378e21a2017-06-07 12:09:59 +0200100 ManagedChannel channel = channelBuilder.build();
101
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400102 // Forced connection not yet implemented. Use workaround...
103 // channel.getState(true);
104 doDummyMessage(channel);
105
Andrea Campanella378e21a2017-06-07 12:09:59 +0200106 channelBuilders.put(channelId, channelBuilder);
107 channels.put(channelId, channel);
108 return channel;
109 }
110
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400111 private void doDummyMessage(ManagedChannel channel) throws IOException {
112 DummyServiceGrpc.DummyServiceBlockingStub dummyStub = DummyServiceGrpc.newBlockingStub(channel)
113 .withDeadlineAfter(CONNECTION_TIMEOUT_SECONDS, TimeUnit.SECONDS);
114 try {
115 dummyStub.sayHello(Dummy.DummyMessageThatNoOneWouldReallyUse.getDefaultInstance());
116 } catch (StatusRuntimeException e) {
117 if (e.getStatus() != Status.UNIMPLEMENTED) {
118 // UNIMPLEMENTED means that server received our message but doesn't know how to handle it.
119 // Hence, channel is open.
120 throw new IOException(e);
121 }
122 }
123 }
124
125 @Override
126 public boolean isChannelOpen(GrpcChannelId channelId) {
127 if (!channels.containsKey(channelId)) {
128 log.warn("Can't check if channel open for unknown channel id {}", channelId);
129 return false;
130 }
131
132 try {
133 doDummyMessage(channels.get(channelId));
134 return true;
135 } catch (IOException e) {
136 return false;
137 }
138 }
139
Andrea Campanella378e21a2017-06-07 12:09:59 +0200140 @Override
141 public void disconnectChannel(GrpcChannelId channelId) {
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400142 if (!channels.containsKey(channelId)) {
143 // Nothing to do.
144 return;
145 }
146 ManagedChannel channel = channels.get(channelId);
147
148 try {
149 channel.shutdown().awaitTermination(5, TimeUnit.SECONDS);
150 } catch (InterruptedException e) {
151 log.warn("Channel {} didn't shut down in time.");
152 channel.shutdownNow();
153 }
154
Andrea Campanella378e21a2017-06-07 12:09:59 +0200155 channels.remove(channelId);
156 channelBuilders.remove(channelId);
157 }
158
159 @Override
160 public Map<GrpcChannelId, ManagedChannel> getChannels() {
161 return channels;
162 }
163
164 @Override
165 public Collection<ManagedChannel> getChannels(final DeviceId deviceId) {
166 final Set<ManagedChannel> deviceChannels = new HashSet<>();
167 channels.forEach((k, v) -> {
168 if (k.deviceId().equals(deviceId)) {
169 deviceChannels.add(v);
170 }
171 });
172
173 return ImmutableSet.copyOf(deviceChannels);
174 }
175
176 @Override
177 public Optional<ManagedChannel> getChannel(GrpcChannelId channelId) {
178 return Optional.ofNullable(channels.get(channelId));
179 }
180}