Project Reactor
提供了很多创建Mono/Flux的静态方法,而最常用的就是Mono#create方法,通过该方法能把以前命令式的程序转化为Reactive的编程方式。
众所周知,Reactive Programming是一种Pull-Push模型,其中Pull用于实现back-pressure,push则是常见的推模型,也是reactive programming的重点(这里不再深入讲解pull/push模型两者的区别)。下面以一个常见的Pull模型迭代器Iterator来说明如何将传统代码转为Reactive的代码。
Iterator -> Flux
1 2 3 4 5 6 7 8
| Iterator it = Arrays.asList<>(1,2,3).iterator();
while(it.hasNext()) { System.out.println(it.next()); }
|
上面是一个常见的迭代器使用方式,下面看看是如何将迭代器转换成Flux的:
1 2 3 4 5 6 7 8 9 10 11 12
| Iterator it = Arrays.asList<>(1,2,3).iterator();
Flux<Integer> iteratorFlux = Flux.create(sink -> { while (it.hasNext()) { sink.next(it.next()); } sink.complete(); });
iteratorFlux.subscribe(System.out::println);
|
MonoCreate常见的两者使用方式
传统命令式编程除了Iterator的Pull模式外,通常还有Observable以及Callback这两种Push模式,下面分别举例讲讲这两种模式。
Observable -> MonoCreate
Observable原始代码举例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| Observable observable = new Observable() { @Override public void notifyObservers(Object arg) { setChanged(); super.notifyObservers(arg); } }; Observer first = (ob,value) -> { System.out.println("value is " + value); }; observable.addObserver(first); observable.notifyObservers("42");
observable.deleteObserver(first);
|
MonoCreate的转化示例:
1 2 3 4 5 6 7 8 9
| Mono<Object> observableMono = Mono.create(sink -> { Observer first = (ob, value) -> { sink.success(value); }; observable.addObserver(first); observable.notifyObservers("42"); sink.onDispose(() -> observable.deleteObserver(first)); }); observableMono.subscribe(v -> System.out.println("value is " + v));
|
Callback -> MonoCreate
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| FutureCallback<HttpResponse> callback = new FutureCallback<HttpResponse>() { @Override public void completed(HttpResponse result) { System.out.println("Response: " + result.getStatusLine()); }
@Override public void failed(Exception ex) { System.out.println("Fail in " + ex); }
@Override public void cancelled() { System.out.println("Cancelled"); } };
CloseableHttpAsyncClient httpclient = HttpAsyncClients.createDefault(); httpclient.start();
HttpGet request = new HttpGet("http://www.example.com/"); httpclient.execute(request, callback);
|
MonoCreate的转化示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
| CloseableHttpAsyncClient httpclient = HttpAsyncClients.createDefault(); httpclient.start();
Mono<HttpResponse> responseMono = Mono.create(monoSink -> { CallbackHandler callbackHandler = new CallbackHandler(monoSink); HttpGet getRequest = new HttpGet("http://www.example.com/"); httpclient.execute(getRequest, callbackHandler.getResponseCallback()); }); responseMono.subscribe(response -> System.out.println("Response: " + response.getStatusLine()));
@Data static class CallbackHandler { private MonoSink monoSink; private FutureCallback<HttpResponse> responseCallback;
public CallbackHandler(MonoSink monoSink) { this.monoSink = monoSink; responseCallback = new FutureCallback<HttpResponse>() { @Override public void completed(HttpResponse result) { monoSink.success(result); }
@Override public void failed(Exception ex) { monoSink.error(ex); }
@Override public void cancelled() { monoSink.onDispose(() -> System.out.println("cancelled")); } }; } }
|
MonoSink
从前面已经可以看到,将传统代码转变为Reactive方式的关键是在于sink,在创建Mono/FluxCreate的时候,Mono/Flux都会提供相应的sink给使用方来使用。MonoSink相比FluxSink要简单的多,为了简单起见,我们先从MonoSink来了解sink的运行原理(FluxSink会专门另开一篇来说明)。下面就来探探Mono下的MonoSink究竟到底是什么。
再深入MonoSink之前,我们先来看看MonoCreate是怎么使用MonoSink的,对于Reactor来说,所有的入口都是subscribe
方法,所以先来看看MonoCreate的subscribe方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| public void subscribe(CoreSubscriber<? super T> actual) { DefaultMonoSink<T> emitter = new DefaultMonoSink<>(actual);
actual.onSubscribe(emitter);
try { callback.accept(emitter); } catch (Throwable ex) { emitter.error(Operators.onOperatorError(ex, actual.currentContext())); } }
|
从上面的源代码可以看出,整个MonoCreate订阅过程很简单,主要是分为三个步骤:
- 创建DefaultMonoSink (通过这一步可以看出,一个Subscriber是独占一个MonoSink的)
- 实现Subscriber的onSubscribe的方法
- 调用Mono#create的构造器函数
以上三个步骤是从整体视角来看的,我们再进一步进入DefaultMonoSink,以它的内部视角,来看看到底作为signal emitter的MonoSink做了些什么。
MonoSink 内部状态
MonoSink内部主要有4个状态:
1 2 3 4 5
| volatile int state;
static final int NO_REQUEST_HAS_VALUE = 1; static final int HAS_REQUEST_NO_VALUE = 2; static final int HAS_REQUEST_HAS_VALUE = 3;
|
这三个状态主要取决于request和success(或者error)的调用时机,调用了request方法则会是HAS_REQUEST
,调用了success(或者error)方法则会是HAS_VALUE
,其中request方法调用是由Subscriber#onSubscribe调用的,success或者error则是由具体使用者来调用的,如Callback。由于success或者error调用时机往往不可能确定(通常是异步的),所以才产生了上述4种状态。
以同步的角度思考,通常是先调用request然后再调用success或者error方法,其中success会对应调用Subscriber的onNext与onComplete方法,error方法则会调用对应的Subscriber#onError方法。但事情往往没这么简单,就如前面提到的,request方法与success/error方法是乱序的,很有可能在request的时候,success/error方法已经调用结束了。为了解决这个问题,每个方法都引入了for-loop加CAS的多线程操作,变得相对复杂了,但只要知道其内部原理,再复杂的代码看起来就都有线索了,下面以request方法为例,来讲讲是MonoSink是如何解决多线程问题的。
MonoSink request方法解释
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
| public void request(long n) { if (Operators.validate(n)) { LongConsumer consumer = requestConsumer; if (consumer != null) { consumer.accept(n); } for (; ; ) { int s = state; if (s == HAS_REQUEST_NO_VALUE || s == HAS_REQUEST_HAS_VALUE) { return; } if (s == NO_REQUEST_HAS_VALUE) { if (STATE.compareAndSet(this, s, HAS_REQUEST_HAS_VALUE)) { try { actual.onNext(value); actual.onComplete(); } finally { disposeResource(false); } } return; } if (STATE.compareAndSet(this, s, HAS_REQUEST_NO_VALUE)) { return; } } } }
|
MonoSink回调方法
MonoSink除了request、success、error方法外,还提供了几个回调函数,以供使用者使用,主要有:
1 2 3 4 5 6 7 8
| MonoSink<T> onRequest(LongConsumer consumer);
MonoSink<T> onCancel(Disposable d);
MonoSink<T> onDispose(Disposable d);
|
这里简单讲一下Reactor的代码命名规范,对于回调函数都是以onXXX方式命名,注意调用该onXXX方式的时候,并不是直接调用,而只是传入该回调方法,待对应的事件信号发生时,才会真的被调用。这也是声明式编程的一个特色,先声明再执行。
总结
本文首先描述了传统命令式的代码如何转换为Reactive方式的代码,然后就其内部MonoSink就行了深入的了解,重点讲解了其实现形式,通过对MonoSink的剖析,能够更具体的对Mono整体的使用方式的了解。