blob: 6095fab08566824554600b100cd168f7a6831a63 [file] [log] [blame]
Madan Jampani63c659f2015-06-11 00:52:58 -07001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.consistent.impl;
17
18import static com.google.common.base.Preconditions.checkArgument;
19import static com.google.common.base.Preconditions.checkState;
20
21import java.util.Set;
22import java.util.function.Consumer;
23
24import org.onosproject.cluster.NodeId;
25import org.onosproject.store.service.DistributedQueue;
26import org.onosproject.store.service.DistributedQueueBuilder;
27import org.onosproject.store.service.Serializer;
28
29import com.google.common.base.Charsets;
30
31/**
32 * Default implementation of a {@code DistributedQueueBuilder}.
33 *
34 * @param <E> queue entry type
35 */
36public class DefaultDistributedQueueBuilder<E> implements DistributedQueueBuilder<E> {
37
38 private Serializer serializer;
39 private String name;
40 private boolean persistenceEnabled = true;
41 private final DatabaseManager databaseManager;
42
43 public DefaultDistributedQueueBuilder(
44 DatabaseManager databaseManager) {
45 this.databaseManager = databaseManager;
46 }
47
48 @Override
49 public DistributedQueueBuilder<E> withName(String name) {
50 checkArgument(name != null && !name.isEmpty());
51 this.name = name;
52 return this;
53 }
54
55 @Override
56 public DistributedQueueBuilder<E> withSerializer(Serializer serializer) {
57 checkArgument(serializer != null);
58 this.serializer = serializer;
59 return this;
60 }
61
62 @Override
63 public DistributedQueueBuilder<E> withPersistenceDisabled() {
64 persistenceEnabled = false;
65 return this;
66 }
67
68 private boolean validInputs() {
69 return name != null && serializer != null;
70 }
71
72 @Override
73 public DistributedQueue<E> build() {
74 checkState(validInputs());
75 Consumer<Set<NodeId>> notifyOthers = nodes -> databaseManager.clusterCommunicator.multicast(name,
76 DatabaseManager.QUEUE_UPDATED_TOPIC,
77 s -> s.getBytes(Charsets.UTF_8),
78 nodes);
79 DefaultDistributedQueue<E> queue = new DefaultDistributedQueue<>(
80 name,
81 persistenceEnabled ? databaseManager.partitionedDatabase : databaseManager.inMemoryDatabase,
82 serializer,
83 databaseManager.localNodeId,
84 notifyOthers);
85 databaseManager.registerQueue(queue);
86 return queue;
87 }
88}