本文源码基于: JDK13
Flow 官方注释翻译 一些接口和静态方法,为了建立流式组件, Publisher
生成元素,被一个或者多个Subscriber
消费,每一个Subscriber
被Subscription
管理.
接口介绍: reactive-streams . 他们适用于并发和分布式的环境. 所有的方法都定义为吴晓的单向消息风格.
通信依赖于一个流的简单形式控制. 他可以用来避免在push
类型的系统中的资源管理问题.
示例:
一个Flow.Publisher
通常定义了他自己的Subscription
实现,在subscribe
方法中创建一个,然后叫他交给Flow.Subscriber
。
桶异步的发布消息,通常使用一个线程池. 下面是一个简单的发布者,仅仅发布一个TRUE
给单个的订阅者. 因为订阅者只收到一个简单的元素,这个类不需要使用缓冲以及 顺序控制.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 class OneShotPublisher implements Publisher <Boolean> { private final ExecutorService executor = ForkJoinPool.commonPool(); private boolean subscribed; public synchronized void subscribe (Subscriber<? super Boolean> subscriber) { if (subscribed) subscriber.onError(new IllegalStateException ()); else { subscribed = true ; subscriber.onSubscribe(new OneShotSubscription (subscriber, executor)); } } static class OneShotSubscription implements Subscription { private final Subscriber<? super Boolean> subscriber; private final ExecutorService executor; private Future<?> future; private boolean completed; OneShotSubscription(Subscriber<? super Boolean> subscriber, ExecutorService executor) { this .subscriber = subscriber; this .executor = executor; } public synchronized void request (long n) { if (!completed) { completed = true ; if (n <= 0 ) { IllegalArgumentException ex = new IllegalArgumentException (); executor.execute(() -> subscriber.onError(ex)); } else { future = executor.submit(() -> { subscriber.onNext(Boolean.TRUE); subscriber.onComplete(); }); } } } public synchronized void cancel () { completed = true ; if (future != null ) future.cancel(false ); } } }
这是一个很简单的应用场景,单个的发布者发布消息给单个的消费者.
一个Flow.Subscriber
安排元素的请求和处理. 元素在调用request
之前不会被发布, 但是多个元素可能被request
.
很多Subscriber
的实现可以按照下面这种风格管理元素,缓冲区大小通常为1个单步,更大的缓冲区大小通常允许更加高效的重叠处理. 同时进行更少的通信.
比如给定数量为64,则未完成的请求总数将保持在32-64之间. 因为Subscriber
方法的调用是严格有序的,不需要这些方法使用锁或者volatile
除非订阅服务器维护了多个订阅.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 class SampleSubscriber <T> implements Subscriber <T> { final Consumer<? super T> consumer; Subscription subscription; final long bufferSize; long count; SampleSubscriber(long bufferSize, Consumer<? super T> consumer) { this .bufferSize = bufferSize; this .consumer = consumer; } public void onSubscribe (Subscription subscription) { long initialRequestSize = bufferSize; count = bufferSize - bufferSize / 2 ; (this .subscription = subscription).request(initialRequestSize); } public void onNext (T item) { if (--count <= 0 ) subscription.request(count = bufferSize - bufferSize / 2 ); consumer.accept(item); } public void onError (Throwable ex) { ex.printStackTrace(); } public void onComplete () {} }
defaultBufferSize
的默认值通常提供一个有用的七点,用于根据预期的速率,资源使用情况选择Flow组件中的请求大小和容量. 或者,当不需要使用流式控制时,订阅者可以初始化无界的队列集合.
1 2 3 4 5 6 7 8 9 class UnboundedSubscriber <T> implements Subscriber <T> { public void onSubscribe (Subscription subscription) { subscription.request(Long.MAX_VALUE); } public void onNext (T item) { use(item); } public void onError (Throwable ex) { ex.printStackTrace(); } public void onComplete () {} void use (T item) { ... } }
源码 Publisher 发布者 1 2 3 public static interface Publisher <T> { public void subscribe (Subscriber<? super T> subscriber) ; }
定义了向Publisher
中添加一个订阅者.
Subscriber 订阅者 1 2 3 4 5 6 7 8 9 10 public static interface Subscriber <T> { public void onSubscribe (Subscription subscription) ; public void onNext (T item) ; public void onError (Throwable throwable) ; public void onComplete () ; }
订阅者的接口,分别定义了:
onSubscribe 添加一个订阅 TODO 不对
onNext 处理一个元素
onError 出错
onComplete 完成
Subscription 订阅 发布者和订阅者之间链接的消息管理器.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 public static interface Subscription { public void request (long n) ; public void cancel () ; }
request 添加给定数量的元素
cancel 取消
Processor 同时实现了生产者和消费者的一个组件类~.
SubmissionPublisher 官方注释翻译 一个Flow.Publisher
, 异步的提交非空元素给他的订阅者,知道订阅者关闭. 每一个订阅者按照相同的顺序,接受新提交的元素.除非遇到异常.SubmissionPublisher
允许元素生成以兼容reactive-streams
, 发布者依赖于dop或者阻塞来进行流的控制.
SubmissionPublisher
使用线程池提交给他的订阅者. 线程池的选择根据它的使用场.
如果提交的元素在独立的线程中运行,且订阅者的数量可以预估, 那可以使用Executors.newFixedThreadPool
. 否则的话, 默认使用的是ForkJoinPoll.commonPool
.
缓冲区允许生产者和消费者暂时性的以不同的速率运行. 每个订阅者使用独立的缓冲区. 缓冲区在第一次使用时重建以及根据需要进行扩容.
request
的调用不直接导致缓冲区的扩容. 但是如果为填充的请求超过最大容量,则有饱和的风险. Flow.defaultBufferSize
提供了一个容量的七点,基于期望的速度,资源和使用情况.
发布方法支持关于缓冲区饱和时的不同策略. submit
代码阻塞知道资源可用. 这是最简单的策略,但是最慢. offer
方法可能丢弃元素,但是提供了插入处理然后重试的机会.
如果一些订阅者的方法抛出异常了,他的订阅会被取消. 如果在构造方法中提交了一个handler
,onNext
方法如果发生了异常,会调用该处理方法,但是onSubscribe``OnError
和OnComplete
方法是不记录和处理异常的.
如果提交到线程池发生了RejectedExecutionException
或者其他的一些运行时异常,或者一个丢弃处理器抛出了一个异常.不是全部的订阅者能够接收到发布的元素.
consume
方法简化了对一些常见情况的支持,在这种情况下,订阅者的唯一操作是使用提供的函数请求和处理所有项.
这个类还可以作为生成项的子类的一个基础,并使用这个类中的方法来发布他们。 比如:
这里有一个周期性发布发布元素的类.(实际上,您可以添加方法来独立的启动和停止,在发布者之间共享线程池等等,或者使用SubmissionPublisher
作为一个组件而不是超类.)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 class PeriodicPublisher <T> extends SubmissionPublisher <T> { final ScheduledFuture<?> periodicTask; final ScheduledExecutorService scheduler; PeriodicPublisher(Executor executor, int maxBufferCapacity, Supplier<? extends T > supplier, long period, TimeUnit unit) { super (executor, maxBufferCapacity); scheduler = new ScheduledThreadPoolExecutor (1 ); periodicTask = scheduler.scheduleAtFixedRate( () -> submit(supplier.get()), 0 , period, unit); } public void close () { periodicTask.cancel(false ); scheduler.shutdown(); super .close(); } }
这里有一个Flow.Processor
的实现例子. 它使用单步请求他的发布者, 适应性更强的版本可以使用提交返回的延迟及其他方法来监控流.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 class TransformProcessor <S,T> extends SubmissionPublisher <T> implements Flow .Processor<S,T> { final Function<? super S, ? extends T > function; Flow.Subscription subscription; TransformProcessor(Executor executor, int maxBufferCapacity, Function<? super S, ? extends T > function) { super (executor, maxBufferCapacity); this .function = function; } public void onSubscribe (Flow.Subscription subscription) { (this .subscription = subscription).request(1 ); } public void onNext (S item) { subscription.request(1 ); submit(function.apply(item)); } public void onError (Throwable ex) { closeExceptionally(ex); } public void onComplete () { close(); } }
简直晦涩难懂。。。翻译之后更加难懂了.
这里强烈推荐下这篇文章,我看完清晰了许多:
Java9 reactive stream
源码简介 SubmissionPublisher 发布者功能 这个类也是最外层的类.
属性 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 BufferedSubscription<T> clients; volatile boolean closed;volatile Throwable closedException;final Executor executor;final BiConsumer<? super Subscriber<? super T>, ? super Throwable> onNextHandler;final int maxBufferCapacity;
一个发布者可以被多个订阅者订阅,这些订阅者使用一个链表进行保存. 此外记录了当前发布者的一些状态,具体在注释里.
构造方法 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 public SubmissionPublisher (Executor executor, int maxBufferCapacity, BiConsumer<? super Subscriber<? super T>, ? super Throwable> handler) { if (executor == null ) throw new NullPointerException (); if (maxBufferCapacity <= 0 ) throw new IllegalArgumentException ("capacity must be positive" ); this .executor = executor; this .onNextHandler = handler; this .maxBufferCapacity = roundCapacity(maxBufferCapacity); } public SubmissionPublisher (Executor executor, int maxBufferCapacity) { this (executor, maxBufferCapacity, null ); } public SubmissionPublisher () { this (ASYNC_POOL, Flow.defaultBufferSize(), null ); }
进行参数校验后进行赋值操作.
subscribe 订阅方法 这是作为发布者接口的实现方法.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 public void subscribe (Subscriber<? super T> subscriber) { if (subscriber == null ) throw new NullPointerException (); int max = maxBufferCapacity; Object[] array = new Object [max < INITIAL_CAPACITY ? max : INITIAL_CAPACITY]; BufferedSubscription<T> subscription = new BufferedSubscription <T>(subscriber, executor, onNextHandler, array, max); synchronized (this ) { if (!subscribed) { subscribed = true ; owner = Thread.currentThread(); } for (BufferedSubscription<T> b = clients, pred = null ;;) { if (b == null ) { Throwable ex; subscription.onSubscribe(); if ((ex = closedException) != null ) subscription.onError(ex); else if (closed) subscription.onComplete(); else if (pred == null ) clients = subscription; else pred.next = subscription; break ; } BufferedSubscription<T> next = b.next; if (b.isClosed()) { b.next = null ; if (pred == null ) clients = next; else pred.next = next; } else if (subscriber.equals(b.subscriber)) { b.onError(new IllegalStateException ("Duplicate subscribe" )); break ; } else pred = b; b = next; } } }
首先根据当前的订阅者构造订阅令牌
找到链表的尾部,将当前订阅者插入
之后调用订阅令牌的OnSubscribe
方法. 稍后联系订阅者及令牌的代码一起看 .
submit 提交元素 由发布者发布 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 public int submit (T item) { return doOffer(item, Long.MAX_VALUE, null ); } private int doOffer (T item, long nanos, BiPredicate<Subscriber<? super T>, ? super T> onDrop) { if (item == null ) throw new NullPointerException (); int lag = 0 ; boolean complete, unowned; synchronized (this ) { Thread t = Thread.currentThread(), o; BufferedSubscription<T> b = clients; if ((unowned = ((o = owner) != t)) && o != null ) owner = null ; if (b == null ) complete = closed; else { complete = false ; boolean cleanMe = false ; BufferedSubscription<T> retries = null , rtail = null , next; do { next = b.next; int stat = b.offer(item, unowned); if (stat == 0 ) { b.nextRetry = null ; if (rtail == null ) retries = b; else rtail.nextRetry = b; rtail = b; } else if (stat < 0 ) cleanMe = true ; else if (stat > lag) lag = stat; } while ((b = next) != null ); if (retries != null || cleanMe) lag = retryOffer(item, nanos, onDrop, retries, lag, cleanMe); } } if (complete) throw new IllegalStateException ("Closed" ); else return lag; }
ConsumerSubscriber 订阅者实现 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 static final class ConsumerSubscriber <T> implements Subscriber <T> { final CompletableFuture<Void> status; final Consumer<? super T> consumer; Subscription subscription; ConsumerSubscriber(CompletableFuture<Void> status, Consumer<? super T> consumer) { this .status = status; this .consumer = consumer; } public final void onSubscribe (Subscription subscription) { this .subscription = subscription; status.whenComplete((v, e) -> subscription.cancel()); if (!status.isDone()) subscription.request(Long.MAX_VALUE); } public final void onError (Throwable ex) { status.completeExceptionally(ex); } public final void onComplete () { status.complete(null ); } public final void onNext (T item) { try { consumer.accept(item); } catch (Throwable ex) { subscription.cancel(); status.completeExceptionally(ex); } } }
这个类比较简单,因为没有设计具体的业务实现,只是实现了接受令牌,处理错误,完成,以及在每次接收到发布者发的消息之后,调用初始化时的Consumer进行消费即可》
BufferedSubscription 订阅令牌实现 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 long timeout; int head; int tail; final int maxCapacity; volatile int ctl; Object[] array; final Subscriber<? super T> subscriber;final BiConsumer<? super Subscriber<? super T>, ? super Throwable> onNextHandler;Executor executor; Thread waiter; Throwable pendingError; BufferedSubscription<T> next; BufferedSubscription<T> nextRetry; @jdk .internal.vm.annotation.Contended("c" ) volatile long demand; @jdk .internal.vm.annotation.Contended("c" )volatile int waiting; static final int CLOSED = 0x01 ; static final int ACTIVE = 0x02 ; static final int REQS = 0x04 ; static final int ERROR = 0x08 ; static final int COMPLETE = 0x10 ; static final int RUN = 0x20 ; static final int OPEN = 0x40 ; static final long INTERRUPTED = -1L ;
这个实际上就是发布者中保存的订阅实现,是链表节点.
array 保存了当前订阅令牌中的消息
next 实现了链表节点的下一个节点指针
offer 接受消息 在发布者中,消息通过内部链表节点的offer来进行发布,也就是这里了.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 final int offer (T item, boolean unowned) { Object[] a; int stat = 0 , cap = ((a = array) == null ) ? 0 : a.length; int t = tail, i = t & (cap - 1 ), n = t + 1 - head; if (cap > 0 ) { boolean added; if (n >= cap && cap < maxCapacity) added = growAndOffer(item, a, t); else if (n >= cap || unowned) added = QA.compareAndSet(a, i, null , item); else { QA.setRelease(a, i, item); added = true ; } if (added) { tail = t + 1 ; stat = n; } } return startOnOffer(stat); } final int startOnOffer (int stat) { int c; if (((c = ctl) & (REQS | ACTIVE)) == REQS && ((c = getAndBitwiseOrCtl(RUN | ACTIVE)) & (RUN | CLOSED)) == 0 ) tryStart(); else if ((c & CLOSED) != 0 ) stat = -1 ; return stat; } final void tryStart () { try { Executor e; ConsumerTask<T> task = new ConsumerTask <T>(this ); if ((e = executor) != null ) e.execute(task); } catch (RuntimeException | Error ex) { getAndBitwiseOrCtl(ERROR | CLOSED); throw ex; } }
总结 比较复杂,没有认真看代码,主要是了解一下大体上的实现即可.
SubmissionPublisher实现了Flow
类中定义的接口,提供了一套响应式的API. 其调用链大概是:
注意,全是异步操作
Subscriber
向Publisher
注册自己,调用Publisher.subscribe()
.
Publisher
接受注册,生成令牌,返回给Subscriber
, 调用Subscriber.onSubscribe()
.
Subscriber
通过令牌Subscription.request()
,告诉Publisher
自己需要多少消息(注意,这一步可以一次性告知最大值,也可以分批次告知).
程序通过Publisher.submit()
发布一条消息,Publisher
通过内部保存的Subscription
链表,逐个调用他们的offer
方法. 需要考虑每个订阅者需要的消息数量
Subscription
根据自己的策略,是否缓冲等,启动任务,任务中调用Subscriber.onNext
执行方法.
参考文章
完。
联系我 最后,欢迎关注我的个人公众号【 呼延十 】,会不定期更新很多后端工程师的学习笔记。 也欢迎直接公众号私信或者邮箱联系我,一定知无不言,言无不尽。
以上皆为个人所思所得,如有错误欢迎评论区指正。
欢迎转载,烦请署名并保留原文链接。
联系邮箱:huyanshi2580@gmail.com
更多学习笔记见个人博客或关注微信公众号 <呼延十 >——>呼延十