功能分析
代码示例
prefetch
delayError
源码分析
Flux#publishOn()
Flux#subscribe()
FluxPublishOn#subscribeOrReturn()
FluxPublishOn#onSubscribe()
非融合
FluxPublishOn#onNext()
FluxPublishOn#trySchedule()
FluxPublishOn#run()
FluxPublishOn#runAsync()
FluxPublishOn#checkTerminated()
FluxPublishOn#onComplete()
小结
同步队列融合
SynchronousSubscription#requestFusion()
FluxPublishOn#request()
FluxPublishOn#runSync()
小结
异步队列融合
WindowPredicateMain#requestFusion()
FluxPublishOn#onNext()
总结
功能分析相关示例源码:github.com/chentianmin…
public final Flux<T> publishOn(Scheduler scheduler, boolean delayError, int prefetch)
在onNext()
、onComplete()
、onError()
方法进行线程切换,publishOn()
使得它下游的消费阶段异步执行。
scheduler:线程切换的调度器,Scheduler
用来生成实际执行异步任务的Worker
。
delayError:是否延时转发Error
。如果为true
,当收到上游的Error
时,会等队列中的元素消费完毕后再向下游转发Error
。否则会立即转发Error
,可能导致队列中的元素丢失。默认为true
。
prefetch:预取元素的数量,同时也是队列的容量。默认值为Queues.SMALL_BUFFER_SIZE
,该值通过配置进行修改。
/**
* 每隔delayMillis生产一个元素
*/
protected Flux<Integer> delayPublishFlux(int delayMillis, int startInclusive, int endExclusive) {
return Flux.create(fluxSink -> {
IntStream.range(startInclusive, endExclusive)
.forEach(i -> {
// 同步next
sleep(delayMillis);
logInt(i, "生产");
fluxSink.next(i);
});
fluxSink.complete();
});
}
@Test
public void testPreFetch() {
delayPublishFlux(1000, 1, 5)
.doOnRequest(i -> logLong(i, "request"))
.publishOn(Schedulers.boundedElastic(), 2)
.subscribe(i -> logInt(i, "消费"));
sleep(10000);
}
每次会都向上游请求2个元素。另外还能发现,从第二个request开始,线程发生了切换。
delayError/**
* 每隔delayMillis生产一个元素,最后发送Error
*/
protected Flux<Integer> delayPublishFluxError(int delayMillis, int startInclusive, int endExclusive) {
return Flux.create(fluxSink -> {
IntStream.range(startInclusive, endExclusive)
.forEach(i -> {
// 同步next
sleep(delayMillis);
logInt(i, "生产");
fluxSink.next(i);
});
fluxSink.error(new RuntimeException("发布错误!"));
});
}
@Test
public void testDelayError() {
delayPublishFluxError(500, 1, 5)
.publishOn(Schedulers.boundedElastic())
// 只是为了消费慢一点
.doOnNext(i -> sleep(1000))
.subscribe(i -> logInt(i, "消费"));
sleep(10000);
}
元素消费完才触发Error
!
@Test
public void testNotDelayError() {
delayPublishFluxError(500, 1, 5)
.publishOn(Schedulers.boundedElastic(), false, 256)
// 只是为了消费慢一点
.doOnNext(i -> sleep(1000))
.subscribe(i -> logInt(i, "消费"));
sleep(10000);
}
元素还没消费完就触发Error
!
首先看一下publishOn()
操作符在装配阶段做了什么,直接查看Flux#publishOn()
源码。
publishOn()
装配阶段重点是创建了FluxPublishOn
对象。
接下来,我们分析订阅阶段发生了什么。一个Publisher
在订阅的时候调用的是其subscribe()
方法,因此我们继续看Flux#subscribe()
源码。
在Flux#subscribe()
方法的实现中,如果上游Publisher
是OptimizableOperator
类型,实际的Subscriber
是通过调用该InternalFluxOperator#subscribeOrReturn()
方法返回的。如果返回值为null
,直接return
。
对于publishOn()
操作符来说,装配阶段创建的FluxPublishOn
就是OptimizableOperator
类型。所以继续查看FluxPublishOn#subscribeOrReturn()
源码。
可以看到,方法返回的是PublishOnSubscriber
,它包装了原始的Subscriber
。
在后续的订阅阶段一定会调用其onSubscribe()
方法,在运行阶段一定会调用其onNext()
方法。我们先看FluxPublishOn#onSubscribe()
源码。
在onSubscribe()
实现中,分为同步队列融合、异步队列融合以及非融合方式处理。
如果上游的Subscription
是QueueSubscription
类型,则会进行队列融合。具体采用同步还是异步,取决于该QueueSubscription#requestFusion()
实现。
同步队列融合:复用当前队列,继续调用下游onSubscribe()
方法,但不会继续调用上游request()
方法。
异步队列融合:复用当前队列,然后继续调用下游onSubscribe()
以及上游request()
方法,请求数量是prefetch
。
非融合:创建一个新的队列,然后继续调用下游onSubscribe()
以及上游request()
方法,请求数量是prefetch
。
接下来,我们从源码角度分别介绍上述三种方式的处理逻辑,首先介绍非融合方式。
非融合先看如下代码示例,该代码会以非融合方式执行。
@Test
public void testNoFuse() {
delayPublishFlux(1000, 1, 5)
.publishOn(Schedulers.boundedElastic())
.subscribe(i -> logInt(i, "消费"));
sleep(10000);
}
间隔1s生产消费元素!
在消费阶段,一定会调用FluxPublishOn#onNext()
方法。
我们重点关注非融合方式执行逻辑,其实只做了2件事:
将下发的元素添加到队列中,该队列就是onSubscribe()
阶段创建的新队列。
调用trySchedule()
方法进行调度。
继续看FluxPublishOn#trySchedule()
源码。
这里其实就是交由woker
异步执行,后续会执行FluxPublishOn.run()
方法。
在run()方法执行的时候,分为3段逻辑:
如果是输出融合,执行runBackfused()
方法。
如果是同步队列融合,执行runSync()
方法。
否则,执行runAsync()
方法。
对于当前例子,实际执行的是runAsync()
方法,继续查看其源码。
runAsync()
做的事情比较简单,就是排空队列中的元素下发给下游。同时在这里会继续调用request()
向上游请求数据,这也是前面说的从第二个request()
开始会进行线程切换的原因。
另外这里还会调用checkTerminated()
,检查终止情况。
如果delayError=true
,必须当前队列为空是才会转发Error
。如果delayError=false
,则直接转发Error
。继续查看onComplete()
方法。
如果未结束,将done
标记设置为true
,然后再次调用trySchedule()
进行调度。后续再被调度到的时候,如果队列已经排空,才会调用下游onComplete()
,触发完成。
简单总结一下非融合执行过程:
在onSubscribe()
时创建一个队列,在onNext()
时将上游下发的元素添加到队列中,然后异步排空队列中的元素,继续下发给下游。
以下代码会以同步队列融合方式执行。
@Test
public void testSyncFuse() {
Flux.just(1, 2 ,3, 4, 5)
.publishOn(Schedulers.boundedElastic())
.subscribe(this::logInt);
sleep(10000);
}
因为Flux.just()
对应的Subscription
是SynchronousSubscription
,其requestFusion()
方法实现如下:
此时返回的是SYNC
,执行同步队列融合。
前面提到过,同步队列融合会复用当前队列,继续调用下游onSubscribe()
方法,但不会继续调用上游request()
方法。
这意味着,此时FluxPublishOn#onNext()
和FluxPublishOn#onComplete()
方法并不会调用。但是FluxPublishOn#request()
依然会被下游调用到。
在request()
方法中还是会调用trySchedule()
,后续会异步调用runSync()
方法(前面已经分析了)。
对于非融合方式,trySchedule()
也会执行,只是这次调度的时候,队列中还没有数据被添加进去。
runSync()
实现上runAsync()
差不多,也是排空队列的元素,继续下发给下游。不同的点是少了request()
调用,以及取消完成控制有差异。
简单总结一下同步队列融合执行过程:
在onSubsrribe()
时直接复用上游QueueSubscription
作为队列,不会调用上游request()
请求数据,在自身request()
时异步排空队列中的元素,继续下发给下游。
以下代码会以异步队列融合方式执行。
@Test
public void testAsyncFuse() {
Flux.just(1, 2, 3, 4, 5)
.windowUntil(i -> i % 3 == 0)
.publishOn(Schedulers.boundedElastic())
.flatMap(Function.identity())
.subscribe(this::logInt);
sleep(10000);
}
因为windowUntil()
对应的Subscription
是WindowPredicateMain
,其requestFusion()
方法实现如下:
此时返回ASYNC
,执行异步队列融合。接下来再看一下FluxPublishOn#onNext()
源码。
注意,此时onNext()
方法参数是null
,表明上游并没有真正下发元素,可以将其看做是一个触发Worker
调度的信号。后续还是会异步执行runAsync()
方法,这里就不再分析了。
这其实也很容易理解:异步队列融合直接复用了上游的QueueSubscription
作为队列,真正的数据应该由这个队列下发。
简单总结一下同步队列融合执行过程:
在onSubsrribe()
时直接复用上游QueueSubscription
作为队列,在onNext()
时接收上游信号,异步排空队列中的元素,继续下发给下游。
非融合、同步队列融合、异步队列融合比较如下:
以上就是Project Reactor源码解析publishOn使用示例的详细内容,更多关于Project Reactor publishOn的资料请关注易知道(ezd.cc)其它相关文章!