Clover icon

sunshower-sdk

  1. Project Clover database Sat May 12 2018 05:15:40 UTC
  2. Package io.sunshower.sdk.channel

File ReactiveChannelSelector.java

 

Coverage histogram

../../../../img/srcFileCovDistChart9.png
14% of files have more coverage

Code metrics

6
20
6
1
70
60
10
0.5
3.33
6
1.67

Classes

Class Line # Actions
ReactiveChannelSelector 14 20 10
0.812581.2%
 

Contributing tests

No tests hitting this source file were found.

Source view

1    package io.sunshower.sdk.channel;
2   
3    import lombok.Synchronized;
4    import org.reactivestreams.Publisher;
5    import org.reactivestreams.Subscriber;
6   
7    import java.util.HashSet;
8    import java.util.Map;
9    import java.util.Set;
10    import java.util.concurrent.ConcurrentHashMap;
11    import java.util.concurrent.ExecutorService;
12    import java.util.concurrent.Future;
13   
 
14    public class ReactiveChannelSelector implements ChannelSelector {
15   
16    final ExecutorService service;
17    final Set<Object> active;
18    final Set<Object> inactive;
19    final Map<Object, Channel<?>> channels;
20   
 
21  7 toggle public ReactiveChannelSelector(ExecutorService service) {
22  7 channels = new ConcurrentHashMap<>();
23  7 this.service = service;
24  7 this.active = new HashSet<>();
25  7 this.inactive = new HashSet<>();
26    }
27   
 
28  2 toggle @Override
29    @Synchronized
30    public <I, T> void create(I id, Channel<T> publisher) {
31  2 if (!channels.containsKey(id)) {
32  2 final Channel<T> channel;
33  2 if(publisher instanceof ManagedChannel) {
34  1 channel = publisher;
35    } else {
36  1 channel = new ManagedChannel<>(this, id, publisher);
37    }
38  2 channels.put(id, channel);
39  2 inactive.add(id);
40    }
41    }
42   
 
43  1 toggle @Override
44    public <I, T> void create(I id, Publisher<T> publisher) {
45  1 create(id, new ManagedChannel<>(this, id, new PublisherChannel<>(publisher)));
46    }
47   
 
48  4 toggle @Override
49    @SuppressWarnings("unchecked")
50    public <T, I> Channel<T> select(I id) {
51  4 return (Channel<T>) channels.get(id);
52    }
53   
 
54  0 toggle @Synchronized
55    <I> void remove(I id) {
56  0 active.remove(id);
57  0 channels.remove(id);
58  0 inactive.remove(id);
59    }
60   
 
61  2 toggle @Synchronized
62    @SuppressWarnings("unchecked")
63    <I, T> void submit(ManagedChannel<I, T> channel) {
64  2 if (!active.contains(channel.id) && inactive.contains(channel.id)) {
65  2 inactive.remove(channel.id);
66  2 channel.setFuture((Future<T>) service.submit(channel));
67  2 active.add(channel);
68    }
69    }
70    }