blob: f0f3eb5e96f86f1125ce21f64ffa29077b249dba [file] [log] [blame]
Jonathan Hart4f60f982014-10-27 08:11:17 -07001/*
Ray Milkey34c95902015-04-15 09:47:53 -07002 * Copyright 2014-2015 Open Networking Laboratory
Jonathan Hart4f60f982014-10-27 08:11:17 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Brian O'Connorabafb502014-12-02 22:26:20 -080016package org.onosproject.store.packet.impl;
Jonathan Hart4f60f982014-10-27 08:11:17 -070017
Brian O'Connor21b028e2015-10-08 22:50:02 -070018import com.google.common.collect.ImmutableSet;
Thomas Vachuska7f171b22015-08-21 12:49:08 -070019import com.google.common.collect.Lists;
Brian O'Connor21b028e2015-10-08 22:50:02 -070020import com.google.common.collect.Sets;
Jonathan Hart4f60f982014-10-27 08:11:17 -070021import org.apache.felix.scr.annotations.Activate;
22import org.apache.felix.scr.annotations.Component;
23import org.apache.felix.scr.annotations.Deactivate;
24import org.apache.felix.scr.annotations.Reference;
25import org.apache.felix.scr.annotations.ReferenceCardinality;
26import org.apache.felix.scr.annotations.Service;
Brian O'Connor5eb77c82015-03-02 18:09:39 -080027import org.onlab.util.KryoNamespace;
Brian O'Connorabafb502014-12-02 22:26:20 -080028import org.onosproject.cluster.ClusterService;
29import org.onosproject.cluster.NodeId;
30import org.onosproject.mastership.MastershipService;
Thomas Vachuska27bee092015-06-23 19:03:10 -070031import org.onosproject.net.flow.TrafficSelector;
Brian O'Connorabafb502014-12-02 22:26:20 -080032import org.onosproject.net.packet.OutboundPacket;
33import org.onosproject.net.packet.PacketEvent;
34import org.onosproject.net.packet.PacketEvent.Type;
alshabib42947782015-03-31 14:59:06 -070035import org.onosproject.net.packet.PacketRequest;
Brian O'Connorabafb502014-12-02 22:26:20 -080036import org.onosproject.net.packet.PacketStore;
37import org.onosproject.net.packet.PacketStoreDelegate;
38import org.onosproject.store.AbstractStore;
39import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
Brian O'Connorabafb502014-12-02 22:26:20 -080040import org.onosproject.store.cluster.messaging.MessageSubject;
41import org.onosproject.store.serializers.KryoNamespaces;
42import org.onosproject.store.serializers.KryoSerializer;
alshabib42947782015-03-31 14:59:06 -070043import org.onosproject.store.service.ConsistentMap;
44import org.onosproject.store.service.Serializer;
45import org.onosproject.store.service.StorageService;
Jonathan Hart4f60f982014-10-27 08:11:17 -070046import org.slf4j.Logger;
47
Thomas Vachuska7f171b22015-08-21 12:49:08 -070048import java.util.List;
alshabib42947782015-03-31 14:59:06 -070049import java.util.Set;
Brian O'Connor5eb77c82015-03-02 18:09:39 -080050import java.util.concurrent.ExecutorService;
51import java.util.concurrent.Executors;
Brian O'Connor21b028e2015-10-08 22:50:02 -070052import java.util.concurrent.atomic.AtomicBoolean;
Brian O'Connor5eb77c82015-03-02 18:09:39 -080053
54import static org.onlab.util.Tools.groupedThreads;
55import static org.slf4j.LoggerFactory.getLogger;
56
Jonathan Hart4f60f982014-10-27 08:11:17 -070057/**
58 * Distributed packet store implementation allowing packets to be sent to
59 * remote instances.
60 */
61@Component(immediate = true)
62@Service
63public class DistributedPacketStore
64 extends AbstractStore<PacketEvent, PacketStoreDelegate>
65 implements PacketStore {
66
67 private final Logger log = getLogger(getClass());
68
Madan Jampani2af244a2015-02-22 13:12:01 -080069 // TODO: make this configurable.
70 private static final int MESSAGE_HANDLER_THREAD_POOL_SIZE = 4;
71
Jonathan Hart4f60f982014-10-27 08:11:17 -070072 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Thomas Vachuskaff965232015-03-17 14:10:52 -070073 protected MastershipService mastershipService;
Jonathan Hart4f60f982014-10-27 08:11:17 -070074
75 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Thomas Vachuskaff965232015-03-17 14:10:52 -070076 protected ClusterService clusterService;
Jonathan Hart4f60f982014-10-27 08:11:17 -070077
78 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Thomas Vachuskaff965232015-03-17 14:10:52 -070079 protected ClusterCommunicationService communicationService;
Jonathan Hart4f60f982014-10-27 08:11:17 -070080
alshabib42947782015-03-31 14:59:06 -070081 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
82 protected StorageService storageService;
83
84 private PacketRequestTracker tracker;
85
Jonathan Hart4f60f982014-10-27 08:11:17 -070086 private static final MessageSubject PACKET_OUT_SUBJECT =
87 new MessageSubject("packet-out");
88
89 private static final KryoSerializer SERIALIZER = new KryoSerializer() {
90 @Override
91 protected void setupKryoPool() {
92 serializerPool = KryoNamespace.newBuilder()
93 .register(KryoNamespaces.API)
Yuta HIGUCHI91768e32014-11-22 05:06:35 -080094 .nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID)
95 .build();
Jonathan Hart4f60f982014-10-27 08:11:17 -070096 }
97 };
98
Madan Jampani2af244a2015-02-22 13:12:01 -080099 private ExecutorService messageHandlingExecutor;
100
Jonathan Hart4f60f982014-10-27 08:11:17 -0700101 @Activate
102 public void activate() {
Thomas Vachuska27bee092015-06-23 19:03:10 -0700103 messageHandlingExecutor = Executors.newFixedThreadPool(
Madan Jampani2af244a2015-02-22 13:12:01 -0800104 MESSAGE_HANDLER_THREAD_POOL_SIZE,
Madan Jampani6b5b7172015-02-23 13:02:26 -0800105 groupedThreads("onos/store/packet", "message-handlers"));
Jonathan Hart4f60f982014-10-27 08:11:17 -0700106
Madan Jampani01e05fb2015-08-13 13:29:36 -0700107 communicationService.<OutboundPacket>addSubscriber(PACKET_OUT_SUBJECT,
108 SERIALIZER::decode,
109 packet -> notifyDelegate(new PacketEvent(Type.EMIT, packet)),
110 messageHandlingExecutor);
Madan Jampani2af244a2015-02-22 13:12:01 -0800111
Thomas Vachuska27bee092015-06-23 19:03:10 -0700112 tracker = new PacketRequestTracker();
alshabib42947782015-03-31 14:59:06 -0700113
Madan Jampani2af244a2015-02-22 13:12:01 -0800114 log.info("Started");
Jonathan Hart4f60f982014-10-27 08:11:17 -0700115 }
116
117 @Deactivate
118 public void deactivate() {
Madan Jampani2af244a2015-02-22 13:12:01 -0800119 communicationService.removeSubscriber(PACKET_OUT_SUBJECT);
120 messageHandlingExecutor.shutdown();
Brian O'Connor21b028e2015-10-08 22:50:02 -0700121 tracker = null;
Jonathan Hart4f60f982014-10-27 08:11:17 -0700122 log.info("Stopped");
123 }
124
125 @Override
126 public void emit(OutboundPacket packet) {
127 NodeId myId = clusterService.getLocalNode().id();
128 NodeId master = mastershipService.getMasterFor(packet.sendThrough());
129
Jonathan Hart7466d612014-11-24 17:09:53 -0800130 if (master == null) {
131 return;
132 }
133
Jonathan Hart4f60f982014-10-27 08:11:17 -0700134 if (myId.equals(master)) {
135 notifyDelegate(new PacketEvent(Type.EMIT, packet));
136 return;
137 }
138
Madan Jampani01e05fb2015-08-13 13:29:36 -0700139 communicationService.unicast(packet, PACKET_OUT_SUBJECT, SERIALIZER::encode, master)
140 .whenComplete((r, error) -> {
141 if (error != null) {
142 log.warn("Failed to send packet-out to {}", master, error);
143 }
144 });
Jonathan Hart4f60f982014-10-27 08:11:17 -0700145 }
146
alshabib42947782015-03-31 14:59:06 -0700147 @Override
Brian O'Connor21b028e2015-10-08 22:50:02 -0700148 public void requestPackets(PacketRequest request) {
149 tracker.add(request);
alshabib42947782015-03-31 14:59:06 -0700150 }
151
152 @Override
Brian O'Connor21b028e2015-10-08 22:50:02 -0700153 public void cancelPackets(PacketRequest request) {
154 tracker.remove(request);
Thomas Vachuska27bee092015-06-23 19:03:10 -0700155 }
156
157 @Override
Thomas Vachuska7f171b22015-08-21 12:49:08 -0700158 public List<PacketRequest> existingRequests() {
alshabib42947782015-03-31 14:59:06 -0700159 return tracker.requests();
160 }
161
alshabib42947782015-03-31 14:59:06 -0700162 private class PacketRequestTracker {
163
Thomas Vachuska27bee092015-06-23 19:03:10 -0700164 private ConsistentMap<TrafficSelector, Set<PacketRequest>> requests;
alshabib42947782015-03-31 14:59:06 -0700165
166 public PacketRequestTracker() {
Thomas Vachuska27bee092015-06-23 19:03:10 -0700167 requests = storageService.<TrafficSelector, Set<PacketRequest>>consistentMapBuilder()
168 .withName("onos-packet-requests")
Thomas Vachuska19f12292015-04-20 16:29:15 -0700169 .withPartitionsDisabled()
Thomas Vachuska27bee092015-06-23 19:03:10 -0700170 .withSerializer(Serializer.using(KryoNamespaces.API))
171 .build();
alshabib42947782015-03-31 14:59:06 -0700172 }
173
Brian O'Connor21b028e2015-10-08 22:50:02 -0700174 public void add(PacketRequest request) {
175 AtomicBoolean firstRequest = new AtomicBoolean(false);
176 requests.compute(request.selector(), (s, existingRequests) -> {
177 if (existingRequests == null) {
178 firstRequest.set(true);
179 return ImmutableSet.of(request);
180 } else if (!existingRequests.contains(request)) {
181 return ImmutableSet.<PacketRequest>builder()
182 .addAll(existingRequests)
183 .add(request)
184 .build();
185 } else {
186 return existingRequests;
187 }
188 });
189
190 if (firstRequest.get() && delegate != null) {
191 // The instance that makes the first request will push to all devices
192 delegate.requestPackets(request);
alshabib42947782015-03-31 14:59:06 -0700193 }
alshabib42947782015-03-31 14:59:06 -0700194 }
195
Brian O'Connor21b028e2015-10-08 22:50:02 -0700196 public void remove(PacketRequest request) {
197 AtomicBoolean removedLast = new AtomicBoolean(false);
198 requests.computeIfPresent(request.selector(), (s, existingRequests) -> {
199 if (existingRequests.contains(request)) {
200 Set<PacketRequest> newRequests = Sets.newHashSet(existingRequests);
201 newRequests.remove(request);
202 if (newRequests.size() > 0) {
203 return ImmutableSet.copyOf(newRequests);
204 } else {
205 removedLast.set(true);
206 return null;
207 }
208 } else {
209 return existingRequests;
210 }
211 });
212
213 if (removedLast.get() && delegate != null) {
214 // The instance that removes the last request will remove from all devices
215 delegate.cancelPackets(request);
alshabib42947782015-03-31 14:59:06 -0700216 }
Brian O'Connor21b028e2015-10-08 22:50:02 -0700217
alshabib42947782015-03-31 14:59:06 -0700218 }
219
Thomas Vachuska7f171b22015-08-21 12:49:08 -0700220 public List<PacketRequest> requests() {
221 List<PacketRequest> list = Lists.newArrayList();
222 requests.values().forEach(v -> list.addAll(v.value()));
223 list.sort((o1, o2) -> o1.priority().priorityValue() - o2.priority().priorityValue());
224 return list;
alshabib42947782015-03-31 14:59:06 -0700225 }
alshabib42947782015-03-31 14:59:06 -0700226 }
Jonathan Hart4f60f982014-10-27 08:11:17 -0700227}