目录
示例
大致流程
create
new MonoCreate
subscribe
new LambdaMonoSubscriber
monoCreate.subscribe
accept
success
onNext
时序图
类图
数据发布者
MonoCreate
数据订阅者
LambdaMonoSubscriber
订阅的消息体
DefaultMonoSink
本篇文章我们来研究如何将现有异步 API(如回调式接口)适配到 Reactor 的响应式流中。
默认情况下,Mono.create的代码块执行在订阅时的线程上,但如果在该代码块中启动其他线程或使用异步API,那么数据生产就会变成异步的。示例如下所示:
示例
Mono<String> mono = Mono.create(sink -> { // 模拟一个异步API操作 new Thread(() -> { try { Thread.sleep(1000); // 模拟耗时操作 log.info("success"); sink.success("Hello, World!"); // 成功时发射数据 } catch (InterruptedException e) { sink.error(e); // 发生错误时发射错误信号 } }).start(); }); log.info("main start"); mono.subscribe(x -> log.info("main finish")); Thread.sleep(5000); |
在这里,通过Mono.create模拟一个异步API操作,API操作成功后,调用sink.success("Hello, World!")进行数据发布者发送数据,从而触发数据的订阅。
接下来,让我们一起看看程序的流程是怎么处理的。
点击create()方法,如下所示:
大致流程
create
public static <T> Mono<T> create(Consumer<MonoSink<T>> callback) { return onAssembly(new MonoCreate<>(callback)); } |
在这里,new一个MonoCreate对象并返回。
点击MonoCreate,如下所示:
new MonoCreate
final class MonoCreate<T> extends Mono<T> implements SourceProducer<T> { static final Disposable TERMINATED = OperatorDisposables.DISPOSED; static final Disposable CANCELLED = Disposables.disposed(); final Consumer<MonoSink<T>> callback; MonoCreate(Consumer<MonoSink<T>> callback) { this.callback = callback; } |
在这里,将create()方法的回调接口参数赋值给callback属性。因此,Mono.create的参数就作为数据发布者的一个属性信息了。
点击示例里的mono.subscribe(),如下所示:
subscribe
public final Disposable subscribe( @Nullable Consumer<? super T> consumer, @Nullable Consumer<? super Throwable> errorConsumer, @Nullable Runnable completeConsumer, @Nullable Context initialContext) { return subscribeWith(new LambdaMonoSubscriber<>(consumer, errorConsumer, completeConsumer, null, initialContext)); } |
在这里,new一个LambdaMonoSubscriber对象,如下所示:
new LambdaMonoSubscriber
LambdaMonoSubscriber(@Nullable Consumer<? super T> consumer, @Nullable Consumer<? super Throwable> errorConsumer, @Nullable Runnable completeConsumer, @Nullable Consumer<? super Subscription> subscriptionConsumer, @Nullable Context initialContext) { this.consumer = consumer; this.errorConsumer = errorConsumer; this.completeConsumer = completeConsumer; this.subscriptionConsumer = subscriptionConsumer; this.initialContext = initialContext == null ? Context.empty() : initialContext; } |
在这里,将subscribe的回调接口参数赋值给consumer 属性,因此,mono.subscribe的参数就作为数据消费者的属性了。
点击上一步的subscribeWith()方法,如下所示:
monoCreate.subscribe
@Override public void subscribe(CoreSubscriber<? super T> actual) { DefaultMonoSink<T> emitter = new DefaultMonoSink<>(actual); actual.onSubscribe(emitter); try { callback.accept(emitter); } catch (Throwable ex) { emitter.error(Operators.onOperatorError(ex, actual.currentContext())); } } |
在这里,首先调用了数据消费者的onSubscribe()方法,这个与《spring响应式编程系列:总体流程》一样。
另外,调用了callback.accept()方法,也就是Mono.create()的回调接口参数。
accept
Mono<String> mono = Mono.create(sink -> { // 模拟一个异步操作 new Thread(() -> { try { Thread.sleep(1000); // 模拟耗时操作 log.info("success"); sink.success("Hello, World!"); // 成功时发射数据 } catch (InterruptedException e) { sink.error(e); // 发生错误时发射错误信号 } }).start(); }); |
在这里,模拟了耗时操作,然后调用sink.success()方法。
通常,可以将sink对象保存在线程共享环境里,等其它的业务操作执行完成后,再调用sink.success()方法,即可发射数据发布者数据,从而触发消费者订阅。
点击sink.success(),如下所示:
success
public void success(@Nullable T value) { ... ... for (; ; ) { int s = state; if (s == HAS_REQUEST_HAS_VALUE || s == NO_REQUEST_HAS_VALUE) { Operators.onNextDropped(value, actual.currentContext()); return; } if (s == HAS_REQUEST_NO_VALUE) { if (STATE.compareAndSet(this, s, HAS_REQUEST_HAS_VALUE)) { try { actual.onNext(value); actual.onComplete(); } catch (Throwable t) { actual.onError(t); } finally { disposeResource(false); } } else { Operators.onNextDropped(value, actual.currentContext()); } return; } ... ... } } |
在这里,调用了数据订阅者的onNext()方法,如下所示:
onNext
public final void onNext(T x) { Subscription s = S.getAndSet(this, Operators.cancelledSubscription()); if (s == Operators.cancelledSubscription()) { Operators.onNextDropped(x, this.initialContext); return; } if (consumer != null) { try { consumer.accept(x); } catch (Throwable t) { Exceptions.throwIfFatal(t); s.cancel(); doError(t); } } if (completeConsumer != null) { try { completeConsumer.run(); } catch (Throwable t) { Operators.onErrorDropped(t, this.initialContext); } } } |
时序图

- 类关系的设计,与《spring响应式编程系列:总体流程》类似,主要包括数据发布者对象、数据订阅者对象及订阅的消息体对象;
- Mono和MonoCreate是数据发布者,LambdaMonoSubscriber是数据订阅者,DefaultMonoSink是订阅的消息体;
- 不同点在于,DefaultMonoSink可以通过示例里的Mono.create暴露给业务侧,业务侧的相关业务执行完成之后,可以通过调用该对象success方法,来触发订阅者的回调函数。
类图
数据发布者
MonoCreate

MonoCreate与《spring响应式编程系列:总体流程》介绍的类似,都是继承于Mono类,并且实现了CorePublisher和Publisher接口。
不同点在于,该数据发布者多了一个属性,如下所示:
final Consumer<MonoSink<T>> callback;
该属性是一个可以接收所订阅消息体(类型为MonoSink<T>)参数的回调函数,在这里可以将该消息体与对应的业务建立绑定关系,为后续业务执行结束后的回调做准备。
数据订阅者
LambdaMonoSubscriber
LambdaMonoSubscriber与《spring响应式编程系列:总体流程》介绍的一样。
订阅的消息体
DefaultMonoSink

DefaultMonoSink与《spring响应式编程系列:总体流程》介绍的类似,都实现了Subscription接口。
不同点在于,DefaultMonoSink实现了MonoSink接口,该接口提供了供业务侧调用 的接口方法,如下所示:
void success(@Nullable T value);
业务侧的相关业务执行完成之后,可以通过调用该接口方法,来触发订阅者的回调函数。