blob: 7ee8712049539f54d2f52cc989dff6ac88507eb7 [file] [log] [blame]
Jonathan Hart4f60f982014-10-27 08:11:17 -07001/*
2 * Copyright 2014 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Brian O'Connorabafb502014-12-02 22:26:20 -080016package org.onosproject.store.packet.impl;
Jonathan Hart4f60f982014-10-27 08:11:17 -070017
Jonathan Hart4f60f982014-10-27 08:11:17 -070018import org.apache.felix.scr.annotations.Activate;
19import org.apache.felix.scr.annotations.Component;
20import org.apache.felix.scr.annotations.Deactivate;
21import org.apache.felix.scr.annotations.Reference;
22import org.apache.felix.scr.annotations.ReferenceCardinality;
23import org.apache.felix.scr.annotations.Service;
Brian O'Connor5eb77c82015-03-02 18:09:39 -080024import org.onlab.util.KryoNamespace;
Brian O'Connorabafb502014-12-02 22:26:20 -080025import org.onosproject.cluster.ClusterService;
26import org.onosproject.cluster.NodeId;
27import org.onosproject.mastership.MastershipService;
28import org.onosproject.net.packet.OutboundPacket;
29import org.onosproject.net.packet.PacketEvent;
30import org.onosproject.net.packet.PacketEvent.Type;
alshabib42947782015-03-31 14:59:06 -070031import org.onosproject.net.packet.PacketRequest;
Brian O'Connorabafb502014-12-02 22:26:20 -080032import org.onosproject.net.packet.PacketStore;
33import org.onosproject.net.packet.PacketStoreDelegate;
34import org.onosproject.store.AbstractStore;
35import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
36import org.onosproject.store.cluster.messaging.ClusterMessage;
37import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
38import org.onosproject.store.cluster.messaging.MessageSubject;
39import org.onosproject.store.serializers.KryoNamespaces;
40import org.onosproject.store.serializers.KryoSerializer;
alshabib42947782015-03-31 14:59:06 -070041import org.onosproject.store.service.ConsistentMap;
42import org.onosproject.store.service.Serializer;
43import org.onosproject.store.service.StorageService;
Jonathan Hart4f60f982014-10-27 08:11:17 -070044import org.slf4j.Logger;
45
alshabib42947782015-03-31 14:59:06 -070046import java.util.Set;
Brian O'Connor5eb77c82015-03-02 18:09:39 -080047import java.util.concurrent.ExecutorService;
48import java.util.concurrent.Executors;
49
50import static org.onlab.util.Tools.groupedThreads;
51import static org.slf4j.LoggerFactory.getLogger;
52
Jonathan Hart4f60f982014-10-27 08:11:17 -070053/**
54 * Distributed packet store implementation allowing packets to be sent to
55 * remote instances.
56 */
57@Component(immediate = true)
58@Service
59public class DistributedPacketStore
60 extends AbstractStore<PacketEvent, PacketStoreDelegate>
61 implements PacketStore {
62
63 private final Logger log = getLogger(getClass());
64
Madan Jampani2af244a2015-02-22 13:12:01 -080065 // TODO: make this configurable.
66 private static final int MESSAGE_HANDLER_THREAD_POOL_SIZE = 4;
67
Jonathan Hart4f60f982014-10-27 08:11:17 -070068 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Thomas Vachuskaff965232015-03-17 14:10:52 -070069 protected MastershipService mastershipService;
Jonathan Hart4f60f982014-10-27 08:11:17 -070070
71 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Thomas Vachuskaff965232015-03-17 14:10:52 -070072 protected ClusterService clusterService;
Jonathan Hart4f60f982014-10-27 08:11:17 -070073
74 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Thomas Vachuskaff965232015-03-17 14:10:52 -070075 protected ClusterCommunicationService communicationService;
Jonathan Hart4f60f982014-10-27 08:11:17 -070076
alshabib42947782015-03-31 14:59:06 -070077 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
78 protected StorageService storageService;
79
80 private PacketRequestTracker tracker;
81
Jonathan Hart4f60f982014-10-27 08:11:17 -070082 private static final MessageSubject PACKET_OUT_SUBJECT =
83 new MessageSubject("packet-out");
84
85 private static final KryoSerializer SERIALIZER = new KryoSerializer() {
86 @Override
87 protected void setupKryoPool() {
88 serializerPool = KryoNamespace.newBuilder()
89 .register(KryoNamespaces.API)
Yuta HIGUCHI91768e32014-11-22 05:06:35 -080090 .nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID)
91 .build();
Jonathan Hart4f60f982014-10-27 08:11:17 -070092 }
93 };
94
Madan Jampani2af244a2015-02-22 13:12:01 -080095 private ExecutorService messageHandlingExecutor;
96
Jonathan Hart4f60f982014-10-27 08:11:17 -070097 @Activate
98 public void activate() {
Madan Jampani2af244a2015-02-22 13:12:01 -080099 messageHandlingExecutor = Executors.newFixedThreadPool(
100 MESSAGE_HANDLER_THREAD_POOL_SIZE,
Madan Jampani6b5b7172015-02-23 13:02:26 -0800101 groupedThreads("onos/store/packet", "message-handlers"));
Jonathan Hart4f60f982014-10-27 08:11:17 -0700102
Thomas Vachuskaff965232015-03-17 14:10:52 -0700103 communicationService.addSubscriber(PACKET_OUT_SUBJECT,
104 new InternalClusterMessageHandler(),
105 messageHandlingExecutor);
Madan Jampani2af244a2015-02-22 13:12:01 -0800106
alshabib42947782015-03-31 14:59:06 -0700107 tracker = new PacketRequestTracker();
108
Madan Jampani2af244a2015-02-22 13:12:01 -0800109 log.info("Started");
Jonathan Hart4f60f982014-10-27 08:11:17 -0700110 }
111
112 @Deactivate
113 public void deactivate() {
Madan Jampani2af244a2015-02-22 13:12:01 -0800114 communicationService.removeSubscriber(PACKET_OUT_SUBJECT);
115 messageHandlingExecutor.shutdown();
Jonathan Hart4f60f982014-10-27 08:11:17 -0700116 log.info("Stopped");
117 }
118
119 @Override
120 public void emit(OutboundPacket packet) {
121 NodeId myId = clusterService.getLocalNode().id();
122 NodeId master = mastershipService.getMasterFor(packet.sendThrough());
123
Jonathan Hart7466d612014-11-24 17:09:53 -0800124 if (master == null) {
125 return;
126 }
127
Jonathan Hart4f60f982014-10-27 08:11:17 -0700128 if (myId.equals(master)) {
129 notifyDelegate(new PacketEvent(Type.EMIT, packet));
130 return;
131 }
132
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800133 // TODO check unicast return value
Thomas Vachuskaff965232015-03-17 14:10:52 -0700134 communicationService.unicast(new ClusterMessage(myId, PACKET_OUT_SUBJECT,
135 SERIALIZER.encode(packet)),
136 master);
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800137 // error log: log.warn("Failed to send packet-out to {}", master);
Jonathan Hart4f60f982014-10-27 08:11:17 -0700138 }
139
alshabib42947782015-03-31 14:59:06 -0700140 @Override
141 public boolean requestPackets(PacketRequest request) {
142 return tracker.add(request);
143 }
144
145 @Override
146 public Set<PacketRequest> existingRequests() {
147 return tracker.requests();
148 }
149
Jonathan Hart4f60f982014-10-27 08:11:17 -0700150 /**
151 * Handles incoming cluster messages.
152 */
153 private class InternalClusterMessageHandler implements ClusterMessageHandler {
154 @Override
155 public void handle(ClusterMessage message) {
156 if (!message.subject().equals(PACKET_OUT_SUBJECT)) {
157 log.warn("Received message with wrong subject: {}", message);
158 }
159
160 OutboundPacket packet = SERIALIZER.decode(message.payload());
161 notifyDelegate(new PacketEvent(Type.EMIT, packet));
162 }
163 }
164
alshabib42947782015-03-31 14:59:06 -0700165 private class PacketRequestTracker {
166
167 private ConsistentMap<PacketRequest, Boolean> requests;
168
169 public PacketRequestTracker() {
170 requests = storageService.<PacketRequest, Boolean>consistentMapBuilder()
171 .withName("packet-requests")
172 .withSerializer(new Serializer() {
173 KryoNamespace kryo = new KryoNamespace.Builder()
174 .register(KryoNamespaces.API)
175 .build();
176 @Override
177 public <T> byte[] encode(T object) {
178 return kryo.serialize(object);
179 }
180
181 @Override
182 public <T> T decode(byte[] bytes) {
183 return kryo.deserialize(bytes);
184 }
185 }).build();
186 }
187
188 public boolean add(PacketRequest request) {
189 if (requests.putIfAbsent(request, true) == null) {
190 return true;
191 }
192 return false;
193 }
194
195 public boolean remove(PacketRequest request) {
196 if (requests.remove(request) == null) {
197 return false;
198 }
199 return true;
200 }
201
202 public Set<PacketRequest> requests() {
203 return requests.keySet();
204 }
205
206 }
Jonathan Hart4f60f982014-10-27 08:11:17 -0700207}