blob: 027378aaa884f95594ee1beed808d30d5588eb2c [file] [log] [blame]
Jonathan Hart4f60f982014-10-27 08:11:17 -07001/*
Ray Milkey34c95902015-04-15 09:47:53 -07002 * Copyright 2014-2015 Open Networking Laboratory
Jonathan Hart4f60f982014-10-27 08:11:17 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Brian O'Connorabafb502014-12-02 22:26:20 -080016package org.onosproject.store.packet.impl;
Jonathan Hart4f60f982014-10-27 08:11:17 -070017
Thomas Vachuska27bee092015-06-23 19:03:10 -070018import com.google.common.collect.ImmutableSet;
Jonathan Hart4f60f982014-10-27 08:11:17 -070019import org.apache.felix.scr.annotations.Activate;
20import org.apache.felix.scr.annotations.Component;
21import org.apache.felix.scr.annotations.Deactivate;
22import org.apache.felix.scr.annotations.Reference;
23import org.apache.felix.scr.annotations.ReferenceCardinality;
24import org.apache.felix.scr.annotations.Service;
Brian O'Connor5eb77c82015-03-02 18:09:39 -080025import org.onlab.util.KryoNamespace;
Brian O'Connorabafb502014-12-02 22:26:20 -080026import org.onosproject.cluster.ClusterService;
27import org.onosproject.cluster.NodeId;
28import org.onosproject.mastership.MastershipService;
Thomas Vachuska27bee092015-06-23 19:03:10 -070029import org.onosproject.net.flow.TrafficSelector;
Brian O'Connorabafb502014-12-02 22:26:20 -080030import org.onosproject.net.packet.OutboundPacket;
31import org.onosproject.net.packet.PacketEvent;
32import org.onosproject.net.packet.PacketEvent.Type;
alshabib42947782015-03-31 14:59:06 -070033import org.onosproject.net.packet.PacketRequest;
Brian O'Connorabafb502014-12-02 22:26:20 -080034import org.onosproject.net.packet.PacketStore;
35import org.onosproject.net.packet.PacketStoreDelegate;
36import org.onosproject.store.AbstractStore;
37import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
38import org.onosproject.store.cluster.messaging.ClusterMessage;
39import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
40import org.onosproject.store.cluster.messaging.MessageSubject;
41import org.onosproject.store.serializers.KryoNamespaces;
42import org.onosproject.store.serializers.KryoSerializer;
alshabib42947782015-03-31 14:59:06 -070043import org.onosproject.store.service.ConsistentMap;
44import org.onosproject.store.service.Serializer;
45import org.onosproject.store.service.StorageService;
Thomas Vachuska27bee092015-06-23 19:03:10 -070046import org.onosproject.store.service.Versioned;
Jonathan Hart4f60f982014-10-27 08:11:17 -070047import org.slf4j.Logger;
48
Thomas Vachuska27bee092015-06-23 19:03:10 -070049import java.util.HashSet;
alshabib42947782015-03-31 14:59:06 -070050import java.util.Set;
Brian O'Connor5eb77c82015-03-02 18:09:39 -080051import java.util.concurrent.ExecutorService;
52import java.util.concurrent.Executors;
53
54import static org.onlab.util.Tools.groupedThreads;
55import static org.slf4j.LoggerFactory.getLogger;
56
Jonathan Hart4f60f982014-10-27 08:11:17 -070057/**
58 * Distributed packet store implementation allowing packets to be sent to
59 * remote instances.
60 */
61@Component(immediate = true)
62@Service
63public class DistributedPacketStore
64 extends AbstractStore<PacketEvent, PacketStoreDelegate>
65 implements PacketStore {
66
67 private final Logger log = getLogger(getClass());
68
Madan Jampani2af244a2015-02-22 13:12:01 -080069 // TODO: make this configurable.
70 private static final int MESSAGE_HANDLER_THREAD_POOL_SIZE = 4;
71
Jonathan Hart4f60f982014-10-27 08:11:17 -070072 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Thomas Vachuskaff965232015-03-17 14:10:52 -070073 protected MastershipService mastershipService;
Jonathan Hart4f60f982014-10-27 08:11:17 -070074
75 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Thomas Vachuskaff965232015-03-17 14:10:52 -070076 protected ClusterService clusterService;
Jonathan Hart4f60f982014-10-27 08:11:17 -070077
78 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Thomas Vachuskaff965232015-03-17 14:10:52 -070079 protected ClusterCommunicationService communicationService;
Jonathan Hart4f60f982014-10-27 08:11:17 -070080
alshabib42947782015-03-31 14:59:06 -070081 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
82 protected StorageService storageService;
83
84 private PacketRequestTracker tracker;
85
Jonathan Hart4f60f982014-10-27 08:11:17 -070086 private static final MessageSubject PACKET_OUT_SUBJECT =
87 new MessageSubject("packet-out");
88
89 private static final KryoSerializer SERIALIZER = new KryoSerializer() {
90 @Override
91 protected void setupKryoPool() {
92 serializerPool = KryoNamespace.newBuilder()
93 .register(KryoNamespaces.API)
Yuta HIGUCHI91768e32014-11-22 05:06:35 -080094 .nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID)
95 .build();
Jonathan Hart4f60f982014-10-27 08:11:17 -070096 }
97 };
98
Madan Jampani2af244a2015-02-22 13:12:01 -080099 private ExecutorService messageHandlingExecutor;
100
Jonathan Hart4f60f982014-10-27 08:11:17 -0700101 @Activate
102 public void activate() {
Thomas Vachuska27bee092015-06-23 19:03:10 -0700103 messageHandlingExecutor = Executors.newFixedThreadPool(
Madan Jampani2af244a2015-02-22 13:12:01 -0800104 MESSAGE_HANDLER_THREAD_POOL_SIZE,
Madan Jampani6b5b7172015-02-23 13:02:26 -0800105 groupedThreads("onos/store/packet", "message-handlers"));
Jonathan Hart4f60f982014-10-27 08:11:17 -0700106
Thomas Vachuskaff965232015-03-17 14:10:52 -0700107 communicationService.addSubscriber(PACKET_OUT_SUBJECT,
108 new InternalClusterMessageHandler(),
109 messageHandlingExecutor);
Madan Jampani2af244a2015-02-22 13:12:01 -0800110
Thomas Vachuska27bee092015-06-23 19:03:10 -0700111 tracker = new PacketRequestTracker();
alshabib42947782015-03-31 14:59:06 -0700112
Madan Jampani2af244a2015-02-22 13:12:01 -0800113 log.info("Started");
Jonathan Hart4f60f982014-10-27 08:11:17 -0700114 }
115
116 @Deactivate
117 public void deactivate() {
Madan Jampani2af244a2015-02-22 13:12:01 -0800118 communicationService.removeSubscriber(PACKET_OUT_SUBJECT);
119 messageHandlingExecutor.shutdown();
Jonathan Hart4f60f982014-10-27 08:11:17 -0700120 log.info("Stopped");
121 }
122
123 @Override
124 public void emit(OutboundPacket packet) {
125 NodeId myId = clusterService.getLocalNode().id();
126 NodeId master = mastershipService.getMasterFor(packet.sendThrough());
127
Jonathan Hart7466d612014-11-24 17:09:53 -0800128 if (master == null) {
129 return;
130 }
131
Jonathan Hart4f60f982014-10-27 08:11:17 -0700132 if (myId.equals(master)) {
133 notifyDelegate(new PacketEvent(Type.EMIT, packet));
134 return;
135 }
136
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800137 // TODO check unicast return value
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700138 communicationService.unicast(packet, PACKET_OUT_SUBJECT, SERIALIZER::encode, master);
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800139 // error log: log.warn("Failed to send packet-out to {}", master);
Jonathan Hart4f60f982014-10-27 08:11:17 -0700140 }
141
alshabib42947782015-03-31 14:59:06 -0700142 @Override
143 public boolean requestPackets(PacketRequest request) {
144 return tracker.add(request);
145 }
146
147 @Override
Thomas Vachuska27bee092015-06-23 19:03:10 -0700148 public boolean cancelPackets(PacketRequest request) {
149 return tracker.remove(request);
150 }
151
152 @Override
alshabib42947782015-03-31 14:59:06 -0700153 public Set<PacketRequest> existingRequests() {
154 return tracker.requests();
155 }
156
Jonathan Hart4f60f982014-10-27 08:11:17 -0700157 /**
158 * Handles incoming cluster messages.
159 */
160 private class InternalClusterMessageHandler implements ClusterMessageHandler {
161 @Override
162 public void handle(ClusterMessage message) {
163 if (!message.subject().equals(PACKET_OUT_SUBJECT)) {
164 log.warn("Received message with wrong subject: {}", message);
165 }
166
167 OutboundPacket packet = SERIALIZER.decode(message.payload());
168 notifyDelegate(new PacketEvent(Type.EMIT, packet));
169 }
170 }
171
alshabib42947782015-03-31 14:59:06 -0700172 private class PacketRequestTracker {
173
Thomas Vachuska27bee092015-06-23 19:03:10 -0700174 private ConsistentMap<TrafficSelector, Set<PacketRequest>> requests;
alshabib42947782015-03-31 14:59:06 -0700175
176 public PacketRequestTracker() {
Thomas Vachuska27bee092015-06-23 19:03:10 -0700177 requests = storageService.<TrafficSelector, Set<PacketRequest>>consistentMapBuilder()
178 .withName("onos-packet-requests")
Thomas Vachuska19f12292015-04-20 16:29:15 -0700179 .withPartitionsDisabled()
Thomas Vachuska27bee092015-06-23 19:03:10 -0700180 .withSerializer(Serializer.using(KryoNamespaces.API))
181 .build();
alshabib42947782015-03-31 14:59:06 -0700182 }
183
184 public boolean add(PacketRequest request) {
Thomas Vachuska27bee092015-06-23 19:03:10 -0700185 Versioned<Set<PacketRequest>> old = requests.get(request.selector());
186 if (old != null && old.value().contains(request)) {
187 return false;
alshabib42947782015-03-31 14:59:06 -0700188 }
Thomas Vachuska27bee092015-06-23 19:03:10 -0700189 // FIXME: add retry logic using a random delay
190 Set<PacketRequest> newSet = new HashSet<>();
191 newSet.add(request);
192 if (old == null) {
193 return requests.putIfAbsent(request.selector(), newSet) == null;
194 }
195 newSet.addAll(old.value());
196 return requests.replace(request.selector(), old.version(), newSet);
alshabib42947782015-03-31 14:59:06 -0700197 }
198
199 public boolean remove(PacketRequest request) {
Thomas Vachuska27bee092015-06-23 19:03:10 -0700200 Versioned<Set<PacketRequest>> old = requests.get(request.selector());
201 if (old == null || !old.value().contains(request)) {
alshabib42947782015-03-31 14:59:06 -0700202 return false;
203 }
Thomas Vachuska27bee092015-06-23 19:03:10 -0700204 // FIXME: add retry logic using a random delay
205 Set<PacketRequest> newSet = new HashSet<>(old.value());
206 newSet.remove(request);
207 if (newSet.isEmpty()) {
208 return requests.remove(request.selector(), old.version());
209 }
210 return requests.replace(request.selector(), old.version(), newSet);
alshabib42947782015-03-31 14:59:06 -0700211 }
212
213 public Set<PacketRequest> requests() {
Thomas Vachuska27bee092015-06-23 19:03:10 -0700214 ImmutableSet.Builder<PacketRequest> builder = ImmutableSet.builder();
215 requests.values().forEach(v -> builder.addAll(v.value()));
216 return builder.build();
alshabib42947782015-03-31 14:59:06 -0700217 }
218
219 }
Jonathan Hart4f60f982014-10-27 08:11:17 -0700220}