blob: bab5cdf42e826de03e2a477e79f81c63aeaec582 [file] [log] [blame]
Jonathan Hart4f60f982014-10-27 08:11:17 -07001/*
Ray Milkey34c95902015-04-15 09:47:53 -07002 * Copyright 2014-2015 Open Networking Laboratory
Jonathan Hart4f60f982014-10-27 08:11:17 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Brian O'Connorabafb502014-12-02 22:26:20 -080016package org.onosproject.store.packet.impl;
Jonathan Hart4f60f982014-10-27 08:11:17 -070017
Brian O'Connor21b028e2015-10-08 22:50:02 -070018import com.google.common.collect.ImmutableSet;
Thomas Vachuska7f171b22015-08-21 12:49:08 -070019import com.google.common.collect.Lists;
Brian O'Connor21b028e2015-10-08 22:50:02 -070020import com.google.common.collect.Sets;
Jonathan Hart4f60f982014-10-27 08:11:17 -070021import org.apache.felix.scr.annotations.Activate;
22import org.apache.felix.scr.annotations.Component;
23import org.apache.felix.scr.annotations.Deactivate;
24import org.apache.felix.scr.annotations.Reference;
25import org.apache.felix.scr.annotations.ReferenceCardinality;
26import org.apache.felix.scr.annotations.Service;
Brian O'Connor5eb77c82015-03-02 18:09:39 -080027import org.onlab.util.KryoNamespace;
Brian O'Connorabafb502014-12-02 22:26:20 -080028import org.onosproject.cluster.ClusterService;
29import org.onosproject.cluster.NodeId;
30import org.onosproject.mastership.MastershipService;
Thomas Vachuska27bee092015-06-23 19:03:10 -070031import org.onosproject.net.flow.TrafficSelector;
Brian O'Connorabafb502014-12-02 22:26:20 -080032import org.onosproject.net.packet.OutboundPacket;
33import org.onosproject.net.packet.PacketEvent;
34import org.onosproject.net.packet.PacketEvent.Type;
alshabib42947782015-03-31 14:59:06 -070035import org.onosproject.net.packet.PacketRequest;
Brian O'Connorabafb502014-12-02 22:26:20 -080036import org.onosproject.net.packet.PacketStore;
37import org.onosproject.net.packet.PacketStoreDelegate;
38import org.onosproject.store.AbstractStore;
39import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
Brian O'Connorabafb502014-12-02 22:26:20 -080040import org.onosproject.store.cluster.messaging.MessageSubject;
41import org.onosproject.store.serializers.KryoNamespaces;
42import org.onosproject.store.serializers.KryoSerializer;
alshabib42947782015-03-31 14:59:06 -070043import org.onosproject.store.service.ConsistentMap;
Thomas Vachuska40e63e62015-10-13 16:16:20 -070044import org.onosproject.store.service.ConsistentMapException;
alshabib42947782015-03-31 14:59:06 -070045import org.onosproject.store.service.Serializer;
46import org.onosproject.store.service.StorageService;
Jonathan Hart4f60f982014-10-27 08:11:17 -070047import org.slf4j.Logger;
48
Thomas Vachuska7f171b22015-08-21 12:49:08 -070049import java.util.List;
alshabib42947782015-03-31 14:59:06 -070050import java.util.Set;
Brian O'Connor5eb77c82015-03-02 18:09:39 -080051import java.util.concurrent.ExecutorService;
52import java.util.concurrent.Executors;
Brian O'Connor21b028e2015-10-08 22:50:02 -070053import java.util.concurrent.atomic.AtomicBoolean;
Brian O'Connor5eb77c82015-03-02 18:09:39 -080054
55import static org.onlab.util.Tools.groupedThreads;
Thomas Vachuska40e63e62015-10-13 16:16:20 -070056import static org.onlab.util.Tools.retryable;
Brian O'Connor5eb77c82015-03-02 18:09:39 -080057import static org.slf4j.LoggerFactory.getLogger;
58
Jonathan Hart4f60f982014-10-27 08:11:17 -070059/**
60 * Distributed packet store implementation allowing packets to be sent to
61 * remote instances.
62 */
63@Component(immediate = true)
64@Service
65public class DistributedPacketStore
66 extends AbstractStore<PacketEvent, PacketStoreDelegate>
67 implements PacketStore {
68
69 private final Logger log = getLogger(getClass());
70
Madan Jampanid61f2b12015-12-10 17:01:52 -080071 private static final int MAX_BACKOFF = 50;
Thomas Vachuska40e63e62015-10-13 16:16:20 -070072
Madan Jampani2af244a2015-02-22 13:12:01 -080073 // TODO: make this configurable.
74 private static final int MESSAGE_HANDLER_THREAD_POOL_SIZE = 4;
75
Jonathan Hart4f60f982014-10-27 08:11:17 -070076 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Thomas Vachuskaff965232015-03-17 14:10:52 -070077 protected MastershipService mastershipService;
Jonathan Hart4f60f982014-10-27 08:11:17 -070078
79 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Thomas Vachuskaff965232015-03-17 14:10:52 -070080 protected ClusterService clusterService;
Jonathan Hart4f60f982014-10-27 08:11:17 -070081
82 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Thomas Vachuskaff965232015-03-17 14:10:52 -070083 protected ClusterCommunicationService communicationService;
Jonathan Hart4f60f982014-10-27 08:11:17 -070084
alshabib42947782015-03-31 14:59:06 -070085 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
86 protected StorageService storageService;
87
88 private PacketRequestTracker tracker;
89
Jonathan Hart4f60f982014-10-27 08:11:17 -070090 private static final MessageSubject PACKET_OUT_SUBJECT =
91 new MessageSubject("packet-out");
92
93 private static final KryoSerializer SERIALIZER = new KryoSerializer() {
94 @Override
95 protected void setupKryoPool() {
96 serializerPool = KryoNamespace.newBuilder()
97 .register(KryoNamespaces.API)
Yuta HIGUCHI91768e32014-11-22 05:06:35 -080098 .nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID)
99 .build();
Jonathan Hart4f60f982014-10-27 08:11:17 -0700100 }
101 };
102
Madan Jampani2af244a2015-02-22 13:12:01 -0800103 private ExecutorService messageHandlingExecutor;
104
Jonathan Hart4f60f982014-10-27 08:11:17 -0700105 @Activate
106 public void activate() {
Thomas Vachuska27bee092015-06-23 19:03:10 -0700107 messageHandlingExecutor = Executors.newFixedThreadPool(
Madan Jampani2af244a2015-02-22 13:12:01 -0800108 MESSAGE_HANDLER_THREAD_POOL_SIZE,
Madan Jampani6b5b7172015-02-23 13:02:26 -0800109 groupedThreads("onos/store/packet", "message-handlers"));
Jonathan Hart4f60f982014-10-27 08:11:17 -0700110
Madan Jampani01e05fb2015-08-13 13:29:36 -0700111 communicationService.<OutboundPacket>addSubscriber(PACKET_OUT_SUBJECT,
112 SERIALIZER::decode,
113 packet -> notifyDelegate(new PacketEvent(Type.EMIT, packet)),
114 messageHandlingExecutor);
Madan Jampani2af244a2015-02-22 13:12:01 -0800115
Thomas Vachuska27bee092015-06-23 19:03:10 -0700116 tracker = new PacketRequestTracker();
alshabib42947782015-03-31 14:59:06 -0700117
Madan Jampani2af244a2015-02-22 13:12:01 -0800118 log.info("Started");
Jonathan Hart4f60f982014-10-27 08:11:17 -0700119 }
120
121 @Deactivate
122 public void deactivate() {
Madan Jampani2af244a2015-02-22 13:12:01 -0800123 communicationService.removeSubscriber(PACKET_OUT_SUBJECT);
124 messageHandlingExecutor.shutdown();
Brian O'Connor21b028e2015-10-08 22:50:02 -0700125 tracker = null;
Jonathan Hart4f60f982014-10-27 08:11:17 -0700126 log.info("Stopped");
127 }
128
129 @Override
130 public void emit(OutboundPacket packet) {
131 NodeId myId = clusterService.getLocalNode().id();
132 NodeId master = mastershipService.getMasterFor(packet.sendThrough());
133
Jonathan Hart7466d612014-11-24 17:09:53 -0800134 if (master == null) {
135 return;
136 }
137
Jonathan Hart4f60f982014-10-27 08:11:17 -0700138 if (myId.equals(master)) {
139 notifyDelegate(new PacketEvent(Type.EMIT, packet));
140 return;
141 }
142
Madan Jampani01e05fb2015-08-13 13:29:36 -0700143 communicationService.unicast(packet, PACKET_OUT_SUBJECT, SERIALIZER::encode, master)
144 .whenComplete((r, error) -> {
145 if (error != null) {
146 log.warn("Failed to send packet-out to {}", master, error);
147 }
148 });
Jonathan Hart4f60f982014-10-27 08:11:17 -0700149 }
150
alshabib42947782015-03-31 14:59:06 -0700151 @Override
Brian O'Connor21b028e2015-10-08 22:50:02 -0700152 public void requestPackets(PacketRequest request) {
153 tracker.add(request);
alshabib42947782015-03-31 14:59:06 -0700154 }
155
156 @Override
Brian O'Connor21b028e2015-10-08 22:50:02 -0700157 public void cancelPackets(PacketRequest request) {
158 tracker.remove(request);
Thomas Vachuska27bee092015-06-23 19:03:10 -0700159 }
160
161 @Override
Thomas Vachuska7f171b22015-08-21 12:49:08 -0700162 public List<PacketRequest> existingRequests() {
alshabib42947782015-03-31 14:59:06 -0700163 return tracker.requests();
164 }
165
Thomas Vachuska40e63e62015-10-13 16:16:20 -0700166 private final class PacketRequestTracker {
alshabib42947782015-03-31 14:59:06 -0700167
Thomas Vachuska27bee092015-06-23 19:03:10 -0700168 private ConsistentMap<TrafficSelector, Set<PacketRequest>> requests;
alshabib42947782015-03-31 14:59:06 -0700169
Thomas Vachuska40e63e62015-10-13 16:16:20 -0700170 private PacketRequestTracker() {
Thomas Vachuska27bee092015-06-23 19:03:10 -0700171 requests = storageService.<TrafficSelector, Set<PacketRequest>>consistentMapBuilder()
172 .withName("onos-packet-requests")
Thomas Vachuska19f12292015-04-20 16:29:15 -0700173 .withPartitionsDisabled()
Thomas Vachuska27bee092015-06-23 19:03:10 -0700174 .withSerializer(Serializer.using(KryoNamespaces.API))
175 .build();
alshabib42947782015-03-31 14:59:06 -0700176 }
177
Thomas Vachuska40e63e62015-10-13 16:16:20 -0700178 private void add(PacketRequest request) {
179 AtomicBoolean firstRequest =
Madan Jampanid61f2b12015-12-10 17:01:52 -0800180 retryable(this::addInternal, ConsistentMapException.ConcurrentModification.class,
181 Integer.MAX_VALUE, MAX_BACKOFF).apply(request);
Thomas Vachuska40e63e62015-10-13 16:16:20 -0700182 if (firstRequest.get() && delegate != null) {
183 // The instance that makes the first request will push to all devices
184 delegate.requestPackets(request);
185 }
186 }
187
188 private AtomicBoolean addInternal(PacketRequest request) {
Brian O'Connor21b028e2015-10-08 22:50:02 -0700189 AtomicBoolean firstRequest = new AtomicBoolean(false);
190 requests.compute(request.selector(), (s, existingRequests) -> {
191 if (existingRequests == null) {
192 firstRequest.set(true);
193 return ImmutableSet.of(request);
194 } else if (!existingRequests.contains(request)) {
195 return ImmutableSet.<PacketRequest>builder()
196 .addAll(existingRequests)
197 .add(request)
198 .build();
199 } else {
200 return existingRequests;
201 }
202 });
Thomas Vachuska40e63e62015-10-13 16:16:20 -0700203 return firstRequest;
204 }
Brian O'Connor21b028e2015-10-08 22:50:02 -0700205
Thomas Vachuska40e63e62015-10-13 16:16:20 -0700206 private void remove(PacketRequest request) {
207 AtomicBoolean removedLast =
Madan Jampanid61f2b12015-12-10 17:01:52 -0800208 retryable(this::removeInternal, ConsistentMapException.ConcurrentModification.class,
209 Integer.MAX_VALUE, MAX_BACKOFF).apply(request);
Thomas Vachuska40e63e62015-10-13 16:16:20 -0700210 if (removedLast.get() && delegate != null) {
211 // The instance that removes the last request will remove from all devices
212 delegate.cancelPackets(request);
alshabib42947782015-03-31 14:59:06 -0700213 }
alshabib42947782015-03-31 14:59:06 -0700214 }
215
Thomas Vachuska40e63e62015-10-13 16:16:20 -0700216 private AtomicBoolean removeInternal(PacketRequest request) {
Brian O'Connor21b028e2015-10-08 22:50:02 -0700217 AtomicBoolean removedLast = new AtomicBoolean(false);
218 requests.computeIfPresent(request.selector(), (s, existingRequests) -> {
219 if (existingRequests.contains(request)) {
220 Set<PacketRequest> newRequests = Sets.newHashSet(existingRequests);
221 newRequests.remove(request);
222 if (newRequests.size() > 0) {
223 return ImmutableSet.copyOf(newRequests);
224 } else {
225 removedLast.set(true);
226 return null;
227 }
228 } else {
229 return existingRequests;
230 }
231 });
Thomas Vachuska40e63e62015-10-13 16:16:20 -0700232 return removedLast;
alshabib42947782015-03-31 14:59:06 -0700233 }
234
Thomas Vachuska40e63e62015-10-13 16:16:20 -0700235 private List<PacketRequest> requests() {
Thomas Vachuska7f171b22015-08-21 12:49:08 -0700236 List<PacketRequest> list = Lists.newArrayList();
237 requests.values().forEach(v -> list.addAll(v.value()));
238 list.sort((o1, o2) -> o1.priority().priorityValue() - o2.priority().priorityValue());
239 return list;
alshabib42947782015-03-31 14:59:06 -0700240 }
alshabib42947782015-03-31 14:59:06 -0700241 }
Jonathan Hart4f60f982014-10-27 08:11:17 -0700242}