blob: 308119480c88c479d247fb90e5a587e39bf49aed [file] [log] [blame]
Jonathan Hart4f60f982014-10-27 08:11:17 -07001/*
Ray Milkey34c95902015-04-15 09:47:53 -07002 * Copyright 2014-2015 Open Networking Laboratory
Jonathan Hart4f60f982014-10-27 08:11:17 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Brian O'Connorabafb502014-12-02 22:26:20 -080016package org.onosproject.store.packet.impl;
Jonathan Hart4f60f982014-10-27 08:11:17 -070017
Jonathan Hart4f60f982014-10-27 08:11:17 -070018import org.apache.felix.scr.annotations.Activate;
19import org.apache.felix.scr.annotations.Component;
20import org.apache.felix.scr.annotations.Deactivate;
21import org.apache.felix.scr.annotations.Reference;
22import org.apache.felix.scr.annotations.ReferenceCardinality;
23import org.apache.felix.scr.annotations.Service;
Brian O'Connor5eb77c82015-03-02 18:09:39 -080024import org.onlab.util.KryoNamespace;
Brian O'Connorabafb502014-12-02 22:26:20 -080025import org.onosproject.cluster.ClusterService;
26import org.onosproject.cluster.NodeId;
27import org.onosproject.mastership.MastershipService;
28import org.onosproject.net.packet.OutboundPacket;
29import org.onosproject.net.packet.PacketEvent;
30import org.onosproject.net.packet.PacketEvent.Type;
alshabib42947782015-03-31 14:59:06 -070031import org.onosproject.net.packet.PacketRequest;
Brian O'Connorabafb502014-12-02 22:26:20 -080032import org.onosproject.net.packet.PacketStore;
33import org.onosproject.net.packet.PacketStoreDelegate;
34import org.onosproject.store.AbstractStore;
35import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
36import org.onosproject.store.cluster.messaging.ClusterMessage;
37import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
38import org.onosproject.store.cluster.messaging.MessageSubject;
39import org.onosproject.store.serializers.KryoNamespaces;
40import org.onosproject.store.serializers.KryoSerializer;
alshabib42947782015-03-31 14:59:06 -070041import org.onosproject.store.service.ConsistentMap;
42import org.onosproject.store.service.Serializer;
43import org.onosproject.store.service.StorageService;
Jonathan Hart4f60f982014-10-27 08:11:17 -070044import org.slf4j.Logger;
45
alshabib42947782015-03-31 14:59:06 -070046import java.util.Set;
Brian O'Connor5eb77c82015-03-02 18:09:39 -080047import java.util.concurrent.ExecutorService;
48import java.util.concurrent.Executors;
49
50import static org.onlab.util.Tools.groupedThreads;
51import static org.slf4j.LoggerFactory.getLogger;
52
Jonathan Hart4f60f982014-10-27 08:11:17 -070053/**
54 * Distributed packet store implementation allowing packets to be sent to
55 * remote instances.
56 */
57@Component(immediate = true)
58@Service
59public class DistributedPacketStore
60 extends AbstractStore<PacketEvent, PacketStoreDelegate>
61 implements PacketStore {
62
63 private final Logger log = getLogger(getClass());
64
Madan Jampani2af244a2015-02-22 13:12:01 -080065 // TODO: make this configurable.
66 private static final int MESSAGE_HANDLER_THREAD_POOL_SIZE = 4;
67
Jonathan Hart4f60f982014-10-27 08:11:17 -070068 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Thomas Vachuskaff965232015-03-17 14:10:52 -070069 protected MastershipService mastershipService;
Jonathan Hart4f60f982014-10-27 08:11:17 -070070
71 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Thomas Vachuskaff965232015-03-17 14:10:52 -070072 protected ClusterService clusterService;
Jonathan Hart4f60f982014-10-27 08:11:17 -070073
74 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Thomas Vachuskaff965232015-03-17 14:10:52 -070075 protected ClusterCommunicationService communicationService;
Jonathan Hart4f60f982014-10-27 08:11:17 -070076
alshabib42947782015-03-31 14:59:06 -070077 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
78 protected StorageService storageService;
79
80 private PacketRequestTracker tracker;
81
Jonathan Hart4f60f982014-10-27 08:11:17 -070082 private static final MessageSubject PACKET_OUT_SUBJECT =
83 new MessageSubject("packet-out");
84
85 private static final KryoSerializer SERIALIZER = new KryoSerializer() {
86 @Override
87 protected void setupKryoPool() {
88 serializerPool = KryoNamespace.newBuilder()
89 .register(KryoNamespaces.API)
Yuta HIGUCHI91768e32014-11-22 05:06:35 -080090 .nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID)
91 .build();
Jonathan Hart4f60f982014-10-27 08:11:17 -070092 }
93 };
94
Madan Jampani2af244a2015-02-22 13:12:01 -080095 private ExecutorService messageHandlingExecutor;
96
Jonathan Hart4f60f982014-10-27 08:11:17 -070097 @Activate
98 public void activate() {
Madan Jampani2af244a2015-02-22 13:12:01 -080099 messageHandlingExecutor = Executors.newFixedThreadPool(
100 MESSAGE_HANDLER_THREAD_POOL_SIZE,
Madan Jampani6b5b7172015-02-23 13:02:26 -0800101 groupedThreads("onos/store/packet", "message-handlers"));
Jonathan Hart4f60f982014-10-27 08:11:17 -0700102
Thomas Vachuskaff965232015-03-17 14:10:52 -0700103 communicationService.addSubscriber(PACKET_OUT_SUBJECT,
104 new InternalClusterMessageHandler(),
105 messageHandlingExecutor);
Madan Jampani2af244a2015-02-22 13:12:01 -0800106
alshabib42947782015-03-31 14:59:06 -0700107 tracker = new PacketRequestTracker();
108
Madan Jampani2af244a2015-02-22 13:12:01 -0800109 log.info("Started");
Jonathan Hart4f60f982014-10-27 08:11:17 -0700110 }
111
112 @Deactivate
113 public void deactivate() {
Madan Jampani2af244a2015-02-22 13:12:01 -0800114 communicationService.removeSubscriber(PACKET_OUT_SUBJECT);
115 messageHandlingExecutor.shutdown();
Jonathan Hart4f60f982014-10-27 08:11:17 -0700116 log.info("Stopped");
117 }
118
119 @Override
120 public void emit(OutboundPacket packet) {
121 NodeId myId = clusterService.getLocalNode().id();
122 NodeId master = mastershipService.getMasterFor(packet.sendThrough());
123
Jonathan Hart7466d612014-11-24 17:09:53 -0800124 if (master == null) {
125 return;
126 }
127
Jonathan Hart4f60f982014-10-27 08:11:17 -0700128 if (myId.equals(master)) {
129 notifyDelegate(new PacketEvent(Type.EMIT, packet));
130 return;
131 }
132
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800133 // TODO check unicast return value
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700134 communicationService.unicast(packet, PACKET_OUT_SUBJECT, SERIALIZER::encode, master);
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800135 // error log: log.warn("Failed to send packet-out to {}", master);
Jonathan Hart4f60f982014-10-27 08:11:17 -0700136 }
137
alshabib42947782015-03-31 14:59:06 -0700138 @Override
139 public boolean requestPackets(PacketRequest request) {
140 return tracker.add(request);
141 }
142
143 @Override
144 public Set<PacketRequest> existingRequests() {
145 return tracker.requests();
146 }
147
Jonathan Hart4f60f982014-10-27 08:11:17 -0700148 /**
149 * Handles incoming cluster messages.
150 */
151 private class InternalClusterMessageHandler implements ClusterMessageHandler {
152 @Override
153 public void handle(ClusterMessage message) {
154 if (!message.subject().equals(PACKET_OUT_SUBJECT)) {
155 log.warn("Received message with wrong subject: {}", message);
156 }
157
158 OutboundPacket packet = SERIALIZER.decode(message.payload());
159 notifyDelegate(new PacketEvent(Type.EMIT, packet));
160 }
161 }
162
alshabib42947782015-03-31 14:59:06 -0700163 private class PacketRequestTracker {
164
165 private ConsistentMap<PacketRequest, Boolean> requests;
166
167 public PacketRequestTracker() {
168 requests = storageService.<PacketRequest, Boolean>consistentMapBuilder()
169 .withName("packet-requests")
170 .withSerializer(new Serializer() {
171 KryoNamespace kryo = new KryoNamespace.Builder()
172 .register(KryoNamespaces.API)
173 .build();
174 @Override
175 public <T> byte[] encode(T object) {
176 return kryo.serialize(object);
177 }
178
179 @Override
180 public <T> T decode(byte[] bytes) {
181 return kryo.deserialize(bytes);
182 }
183 }).build();
184 }
185
186 public boolean add(PacketRequest request) {
187 if (requests.putIfAbsent(request, true) == null) {
188 return true;
189 }
190 return false;
191 }
192
193 public boolean remove(PacketRequest request) {
194 if (requests.remove(request) == null) {
195 return false;
196 }
197 return true;
198 }
199
200 public Set<PacketRequest> requests() {
201 return requests.keySet();
202 }
203
204 }
Jonathan Hart4f60f982014-10-27 08:11:17 -0700205}