blob: 17b8ef82d9bf01f3391e176ed1411831ddb79d9f [file] [log] [blame]
Jordan Halterman2bf177c2017-06-29 01:49:08 -07001/*
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2017-present Open Networking Foundation
Jordan Halterman2bf177c2017-06-29 01:49:08 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.primitives.resources.impl;
17
18import java.util.Collection;
19import java.util.Set;
20import java.util.function.Consumer;
21import java.util.function.Function;
22
23import com.google.common.collect.ImmutableSet;
24import com.google.common.collect.Sets;
25import io.atomix.protocols.raft.proxy.RaftProxy;
26import org.onosproject.store.service.DistributedPrimitive;
27
28import static com.google.common.base.MoreObjects.toStringHelper;
29import static com.google.common.base.Preconditions.checkNotNull;
30
31/**
32 * Abstract base class for primitives that interact with Raft replicated state machines via proxy.
33 */
34public abstract class AbstractRaftPrimitive implements DistributedPrimitive {
35 private final Function<RaftProxy.State, Status> mapper = state -> {
36 switch (state) {
37 case CONNECTED:
38 return Status.ACTIVE;
39 case SUSPENDED:
40 return Status.SUSPENDED;
41 case CLOSED:
42 return Status.INACTIVE;
43 default:
44 throw new IllegalStateException("Unknown state " + state);
45 }
46 };
47
48 protected final RaftProxy proxy;
49 private final Set<Consumer<Status>> statusChangeListeners = Sets.newCopyOnWriteArraySet();
50
51 public AbstractRaftPrimitive(RaftProxy proxy) {
52 this.proxy = checkNotNull(proxy, "proxy cannot be null");
53 proxy.addStateChangeListener(this::onStateChange);
54 }
55
56 @Override
57 public String name() {
58 return proxy.name();
59 }
60
61 /**
62 * Handles a Raft session state change.
63 *
64 * @param state the updated Raft session state
65 */
66 private void onStateChange(RaftProxy.State state) {
67 statusChangeListeners.forEach(listener -> listener.accept(mapper.apply(state)));
68 }
69
70 @Override
71 public void addStatusChangeListener(Consumer<Status> listener) {
72 statusChangeListeners.add(listener);
73 }
74
75 @Override
76 public void removeStatusChangeListener(Consumer<Status> listener) {
77 statusChangeListeners.remove(listener);
78 }
79
80 @Override
81 public Collection<Consumer<Status>> statusChangeListeners() {
82 return ImmutableSet.copyOf(statusChangeListeners);
83 }
84
85 @Override
86 public String toString() {
87 return toStringHelper(this)
88 .add("proxy", proxy)
89 .toString();
90 }
91}