blob: d4c89c93ec3913f4b53f5e8ce851bea7a56981bc [file] [log] [blame]
Jonathan Hart4f60f982014-10-27 08:11:17 -07001/*
Ray Milkey34c95902015-04-15 09:47:53 -07002 * Copyright 2014-2015 Open Networking Laboratory
Jonathan Hart4f60f982014-10-27 08:11:17 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Brian O'Connorabafb502014-12-02 22:26:20 -080016package org.onosproject.store.packet.impl;
Jonathan Hart4f60f982014-10-27 08:11:17 -070017
Thomas Vachuska7f171b22015-08-21 12:49:08 -070018import com.google.common.collect.Lists;
Jonathan Hart4f60f982014-10-27 08:11:17 -070019import org.apache.felix.scr.annotations.Activate;
20import org.apache.felix.scr.annotations.Component;
21import org.apache.felix.scr.annotations.Deactivate;
22import org.apache.felix.scr.annotations.Reference;
23import org.apache.felix.scr.annotations.ReferenceCardinality;
24import org.apache.felix.scr.annotations.Service;
Brian O'Connor5eb77c82015-03-02 18:09:39 -080025import org.onlab.util.KryoNamespace;
Brian O'Connorabafb502014-12-02 22:26:20 -080026import org.onosproject.cluster.ClusterService;
27import org.onosproject.cluster.NodeId;
28import org.onosproject.mastership.MastershipService;
Thomas Vachuska27bee092015-06-23 19:03:10 -070029import org.onosproject.net.flow.TrafficSelector;
Brian O'Connorabafb502014-12-02 22:26:20 -080030import org.onosproject.net.packet.OutboundPacket;
31import org.onosproject.net.packet.PacketEvent;
32import org.onosproject.net.packet.PacketEvent.Type;
alshabib42947782015-03-31 14:59:06 -070033import org.onosproject.net.packet.PacketRequest;
Brian O'Connorabafb502014-12-02 22:26:20 -080034import org.onosproject.net.packet.PacketStore;
35import org.onosproject.net.packet.PacketStoreDelegate;
36import org.onosproject.store.AbstractStore;
37import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
Brian O'Connorabafb502014-12-02 22:26:20 -080038import org.onosproject.store.cluster.messaging.MessageSubject;
39import org.onosproject.store.serializers.KryoNamespaces;
40import org.onosproject.store.serializers.KryoSerializer;
alshabib42947782015-03-31 14:59:06 -070041import org.onosproject.store.service.ConsistentMap;
42import org.onosproject.store.service.Serializer;
43import org.onosproject.store.service.StorageService;
Thomas Vachuska27bee092015-06-23 19:03:10 -070044import org.onosproject.store.service.Versioned;
Jonathan Hart4f60f982014-10-27 08:11:17 -070045import org.slf4j.Logger;
46
Thomas Vachuska27bee092015-06-23 19:03:10 -070047import java.util.HashSet;
Thomas Vachuska7f171b22015-08-21 12:49:08 -070048import java.util.List;
alshabib42947782015-03-31 14:59:06 -070049import java.util.Set;
Brian O'Connor5eb77c82015-03-02 18:09:39 -080050import java.util.concurrent.ExecutorService;
51import java.util.concurrent.Executors;
52
53import static org.onlab.util.Tools.groupedThreads;
54import static org.slf4j.LoggerFactory.getLogger;
55
Jonathan Hart4f60f982014-10-27 08:11:17 -070056/**
57 * Distributed packet store implementation allowing packets to be sent to
58 * remote instances.
59 */
60@Component(immediate = true)
61@Service
62public class DistributedPacketStore
63 extends AbstractStore<PacketEvent, PacketStoreDelegate>
64 implements PacketStore {
65
66 private final Logger log = getLogger(getClass());
67
Madan Jampani2af244a2015-02-22 13:12:01 -080068 // TODO: make this configurable.
69 private static final int MESSAGE_HANDLER_THREAD_POOL_SIZE = 4;
70
Jonathan Hart4f60f982014-10-27 08:11:17 -070071 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Thomas Vachuskaff965232015-03-17 14:10:52 -070072 protected MastershipService mastershipService;
Jonathan Hart4f60f982014-10-27 08:11:17 -070073
74 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Thomas Vachuskaff965232015-03-17 14:10:52 -070075 protected ClusterService clusterService;
Jonathan Hart4f60f982014-10-27 08:11:17 -070076
77 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Thomas Vachuskaff965232015-03-17 14:10:52 -070078 protected ClusterCommunicationService communicationService;
Jonathan Hart4f60f982014-10-27 08:11:17 -070079
alshabib42947782015-03-31 14:59:06 -070080 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
81 protected StorageService storageService;
82
83 private PacketRequestTracker tracker;
84
Jonathan Hart4f60f982014-10-27 08:11:17 -070085 private static final MessageSubject PACKET_OUT_SUBJECT =
86 new MessageSubject("packet-out");
87
88 private static final KryoSerializer SERIALIZER = new KryoSerializer() {
89 @Override
90 protected void setupKryoPool() {
91 serializerPool = KryoNamespace.newBuilder()
92 .register(KryoNamespaces.API)
Yuta HIGUCHI91768e32014-11-22 05:06:35 -080093 .nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID)
94 .build();
Jonathan Hart4f60f982014-10-27 08:11:17 -070095 }
96 };
97
Madan Jampani2af244a2015-02-22 13:12:01 -080098 private ExecutorService messageHandlingExecutor;
99
Jonathan Hart4f60f982014-10-27 08:11:17 -0700100 @Activate
101 public void activate() {
Thomas Vachuska27bee092015-06-23 19:03:10 -0700102 messageHandlingExecutor = Executors.newFixedThreadPool(
Madan Jampani2af244a2015-02-22 13:12:01 -0800103 MESSAGE_HANDLER_THREAD_POOL_SIZE,
Madan Jampani6b5b7172015-02-23 13:02:26 -0800104 groupedThreads("onos/store/packet", "message-handlers"));
Jonathan Hart4f60f982014-10-27 08:11:17 -0700105
Madan Jampani01e05fb2015-08-13 13:29:36 -0700106 communicationService.<OutboundPacket>addSubscriber(PACKET_OUT_SUBJECT,
107 SERIALIZER::decode,
108 packet -> notifyDelegate(new PacketEvent(Type.EMIT, packet)),
109 messageHandlingExecutor);
Madan Jampani2af244a2015-02-22 13:12:01 -0800110
Thomas Vachuska27bee092015-06-23 19:03:10 -0700111 tracker = new PacketRequestTracker();
alshabib42947782015-03-31 14:59:06 -0700112
Madan Jampani2af244a2015-02-22 13:12:01 -0800113 log.info("Started");
Jonathan Hart4f60f982014-10-27 08:11:17 -0700114 }
115
116 @Deactivate
117 public void deactivate() {
Madan Jampani2af244a2015-02-22 13:12:01 -0800118 communicationService.removeSubscriber(PACKET_OUT_SUBJECT);
119 messageHandlingExecutor.shutdown();
Jonathan Hart4f60f982014-10-27 08:11:17 -0700120 log.info("Stopped");
121 }
122
123 @Override
124 public void emit(OutboundPacket packet) {
125 NodeId myId = clusterService.getLocalNode().id();
126 NodeId master = mastershipService.getMasterFor(packet.sendThrough());
127
Jonathan Hart7466d612014-11-24 17:09:53 -0800128 if (master == null) {
129 return;
130 }
131
Jonathan Hart4f60f982014-10-27 08:11:17 -0700132 if (myId.equals(master)) {
133 notifyDelegate(new PacketEvent(Type.EMIT, packet));
134 return;
135 }
136
Madan Jampani01e05fb2015-08-13 13:29:36 -0700137 communicationService.unicast(packet, PACKET_OUT_SUBJECT, SERIALIZER::encode, master)
138 .whenComplete((r, error) -> {
139 if (error != null) {
140 log.warn("Failed to send packet-out to {}", master, error);
141 }
142 });
Jonathan Hart4f60f982014-10-27 08:11:17 -0700143 }
144
alshabib42947782015-03-31 14:59:06 -0700145 @Override
146 public boolean requestPackets(PacketRequest request) {
147 return tracker.add(request);
148 }
149
150 @Override
Thomas Vachuska27bee092015-06-23 19:03:10 -0700151 public boolean cancelPackets(PacketRequest request) {
152 return tracker.remove(request);
153 }
154
155 @Override
Thomas Vachuska7f171b22015-08-21 12:49:08 -0700156 public List<PacketRequest> existingRequests() {
alshabib42947782015-03-31 14:59:06 -0700157 return tracker.requests();
158 }
159
alshabib42947782015-03-31 14:59:06 -0700160 private class PacketRequestTracker {
161
Thomas Vachuska27bee092015-06-23 19:03:10 -0700162 private ConsistentMap<TrafficSelector, Set<PacketRequest>> requests;
alshabib42947782015-03-31 14:59:06 -0700163
164 public PacketRequestTracker() {
Thomas Vachuska27bee092015-06-23 19:03:10 -0700165 requests = storageService.<TrafficSelector, Set<PacketRequest>>consistentMapBuilder()
166 .withName("onos-packet-requests")
Thomas Vachuska19f12292015-04-20 16:29:15 -0700167 .withPartitionsDisabled()
Thomas Vachuska27bee092015-06-23 19:03:10 -0700168 .withSerializer(Serializer.using(KryoNamespaces.API))
169 .build();
alshabib42947782015-03-31 14:59:06 -0700170 }
171
172 public boolean add(PacketRequest request) {
Thomas Vachuska27bee092015-06-23 19:03:10 -0700173 Versioned<Set<PacketRequest>> old = requests.get(request.selector());
174 if (old != null && old.value().contains(request)) {
175 return false;
alshabib42947782015-03-31 14:59:06 -0700176 }
Thomas Vachuska27bee092015-06-23 19:03:10 -0700177 // FIXME: add retry logic using a random delay
178 Set<PacketRequest> newSet = new HashSet<>();
179 newSet.add(request);
180 if (old == null) {
181 return requests.putIfAbsent(request.selector(), newSet) == null;
182 }
183 newSet.addAll(old.value());
184 return requests.replace(request.selector(), old.version(), newSet);
alshabib42947782015-03-31 14:59:06 -0700185 }
186
187 public boolean remove(PacketRequest request) {
Thomas Vachuska27bee092015-06-23 19:03:10 -0700188 Versioned<Set<PacketRequest>> old = requests.get(request.selector());
189 if (old == null || !old.value().contains(request)) {
alshabib42947782015-03-31 14:59:06 -0700190 return false;
191 }
Thomas Vachuska27bee092015-06-23 19:03:10 -0700192 // FIXME: add retry logic using a random delay
193 Set<PacketRequest> newSet = new HashSet<>(old.value());
194 newSet.remove(request);
195 if (newSet.isEmpty()) {
196 return requests.remove(request.selector(), old.version());
197 }
198 return requests.replace(request.selector(), old.version(), newSet);
alshabib42947782015-03-31 14:59:06 -0700199 }
200
Thomas Vachuska7f171b22015-08-21 12:49:08 -0700201 public List<PacketRequest> requests() {
202 List<PacketRequest> list = Lists.newArrayList();
203 requests.values().forEach(v -> list.addAll(v.value()));
204 list.sort((o1, o2) -> o1.priority().priorityValue() - o2.priority().priorityValue());
205 return list;
alshabib42947782015-03-31 14:59:06 -0700206 }
207
208 }
Jonathan Hart4f60f982014-10-27 08:11:17 -0700209}