当前位置: 首页 > news >正文

spring响应式编程系列:异步生产数据

目录

示例

大致流程

create

new MonoCreate

subscribe

new LambdaMonoSubscriber

monoCreate.subscribe

accept

success

onNext

时序图

类图

数据发布者

MonoCreate

数据订阅者

LambdaMonoSubscriber

订阅的消息体

DefaultMonoSink

        本篇文章我们来研究如何将现有异步 API(如回调式接口)适配到 Reactor 的响应式流中。

        默认情况下,Mono.create的代码块执行在订阅时的线程上,但如果在该代码块中启动其他线程或使用异步API,那么数据生产就会变成异步的。示例如下所示:

示例

Mono<String> mono = Mono.create(sink -> {
    // 模拟一个异步API操作
    new Thread(() -> {
        try {
            Thread.sleep(1000); // 模拟耗时操作
            log.info("success");
            sink.success("Hello, World!"); // 成功时发射数据
        } catch (InterruptedException e) {
            sink.error(e); // 发生错误时发射错误信号
        }
    }).start();
});
log.info("main start");
mono.subscribe(x -> log.info("main finish"));
Thread.sleep(5000);

        在这里,通过Mono.create模拟一个异步API操作,API操作成功后,调用sink.success("Hello, World!")进行数据发布者发送数据,从而触发数据的订阅。

        接下来,让我们一起看看程序的流程是怎么处理的。

        点击create()方法,如下所示:

大致流程

create

public static <T> Mono<T> create(Consumer<MonoSink<T>> callback) {
    return onAssembly(new MonoCreate<>(callback));
}

        在这里,new一个MonoCreate对象并返回。

        点击MonoCreate,如下所示:

new MonoCreate

final class MonoCreate<T> extends Mono<T> implements SourceProducer<T> {
   static final Disposable TERMINATED = OperatorDisposables.DISPOSED;
   static final Disposable CANCELLED = Disposables.disposed();
   final Consumer<MonoSink<T>> callback;
   MonoCreate(Consumer<MonoSink<T>> callback) {
      this.callback = callback;
   }

        在这里,将create()方法的回调接口参数赋值给callback属性。因此,Mono.create的参数就作为数据发布者的一个属性信息了。

        点击示例里的mono.subscribe(),如下所示:

subscribe

public final Disposable subscribe(
      @Nullable Consumer<? super T> consumer,
      @Nullable Consumer<? super Throwable> errorConsumer,
      @Nullable Runnable completeConsumer,
      @Nullable Context initialContext) {
   return subscribeWith(new LambdaMonoSubscriber<>(consumer, errorConsumer,
         completeConsumer, null, initialContext));
}

        在这里,new一个LambdaMonoSubscriber对象,如下所示:

new LambdaMonoSubscriber

LambdaMonoSubscriber(@Nullable Consumer<? super T> consumer,
      @Nullable Consumer<? super Throwable> errorConsumer,
      @Nullable Runnable completeConsumer,
      @Nullable Consumer<? super Subscription> subscriptionConsumer,
      @Nullable Context initialContext) {
   this.consumer = consumer;
   this.errorConsumer = errorConsumer;
   this.completeConsumer = completeConsumer;
   this.subscriptionConsumer = subscriptionConsumer;
   this.initialContext = initialContext == null ? Context.empty() : initialContext;
}

        在这里,将subscribe的回调接口参数赋值给consumer 属性,因此,mono.subscribe的参数就作为数据消费者的属性了。

        点击上一步的subscribeWith()方法,如下所示:

monoCreate.subscribe

@Override
public void subscribe(CoreSubscriber<? super T> actual) {
   DefaultMonoSink<T> emitter = new DefaultMonoSink<>(actual);
   actual.onSubscribe(emitter);
   try {
      callback.accept(emitter);
   }
   catch (Throwable ex) {
      emitter.error(Operators.onOperatorError(ex, actual.currentContext()));
   }
}

        在这里,首先调用了数据消费者的onSubscribe()方法,这个与《spring响应式编程系列:总体流程》一样。

        另外,调用了callback.accept()方法,也就是Mono.create()的回调接口参数。

accept

Mono<String> mono = Mono.create(sink -> {
    // 模拟一个异步操作
    new Thread(() -> {
        try {
            Thread.sleep(1000); // 模拟耗时操作
            log.info("success");
            sink.success("Hello, World!"); // 成功时发射数据
        } catch (InterruptedException e) {
            sink.error(e); // 发生错误时发射错误信号
        }
    }).start();
});

        在这里,模拟了耗时操作,然后调用sink.success()方法。

      通常,可以将sink对象保存在线程共享环境里,等其它的业务操作执行完成后,再调用sink.success()方法,即可发射数据发布者数据,从而触发消费者订阅。

        点击sink.success(),如下所示:

​​​​​​​success

public void success(@Nullable T value) {

... ...
     for (; ; ) {
      int s = state;
      if (s == HAS_REQUEST_HAS_VALUE || s == NO_REQUEST_HAS_VALUE) {
         Operators.onNextDropped(value, actual.currentContext());
         return;
      }
      if (s == HAS_REQUEST_NO_VALUE) {
         if (STATE.compareAndSet(this, s, HAS_REQUEST_HAS_VALUE)) {
            try {
               actual.onNext(value);
               actual.onComplete();
            }
            catch (Throwable t) {
               actual.onError(t);
            }
            finally {
               disposeResource(false);
            }
         } else {
            Operators.onNextDropped(value, actual.currentContext());
         }
         return;
      }
      ... ...
   }
}

        在这里,调用了数据订阅者的onNext()方法,如下所示:

​​​​​​​onNext

public final void onNext(T x) {
   Subscription s = S.getAndSet(this, Operators.cancelledSubscription());
   if (s == Operators.cancelledSubscription()) {
      Operators.onNextDropped(x, this.initialContext);
      return;
   }
   if (consumer != null) {
      try {
         consumer.accept(x);
      }
      catch (Throwable t) {
         Exceptions.throwIfFatal(t);
         s.cancel();
         doError(t);
      }
   }
   if (completeConsumer != null) {
      try {
         completeConsumer.run();
      }
      catch (Throwable t) {
         Operators.onErrorDropped(t, this.initialContext);
      }
   }
}

时序图

  1. 类关系的设计,与《spring响应式编程系列:总体流程》类似,主要包括数据发布者对象、数据订阅者对象及订阅的消息体对象;
  2. Mono和MonoCreate是数据发布者,LambdaMonoSubscriber是数据订阅者,DefaultMonoSink是订阅的消息体;
  3. 不同点在于,DefaultMonoSink可以通过示例里的Mono.create暴露给业务侧,业务侧的相关业务执行完成之后,可以通过调用该对象success方法,来触发订阅者的回调函数。

​​​​​​​类图

数据发布者

MonoCreate

        MonoCreate与《spring响应式编程系列:总体流程》介绍的类似,都是继承于Mono类,并且实现了CorePublisher和Publisher接口。

        不同点在于,该数据发布者多了一个属性,如下所示:

        final Consumer<MonoSink<T>> callback;

        该属性是一个可以接收所订阅消息体(类型为MonoSink<T>)参数的回调函数,在这里可以将该消息体与对应的业务建立绑定关系,为后续业务执行结束后的回调做准备。

数据订阅者

LambdaMonoSubscriber

        LambdaMonoSubscriber与《spring响应式编程系列:总体流程》介绍的一样。

订阅的消息体

DefaultMonoSink

        DefaultMonoSink与《spring响应式编程系列:总体流程​​​​​​​》介绍的类似,都实现了Subscription接口。

        不同点在于,DefaultMonoSink实现了MonoSink接口,该接口提供了供业务侧调用 的接口方法,如下所示:

void success(@Nullable T value);

        业务侧的相关业务执行完成之后,可以通过调用该接口方法,来触发订阅者的回调函数。

http://www.xdnf.cn/news/165259.html

相关文章:

  • 第八课四则运算 设计运算器
  • 三维重建(二十)——思路整理与第一步的进行
  • 2025上海车展| 和芯星通发布覆盖车载全场景的产品方案
  • [Windows] 易剪媒 v0.0.8 绿色版 —— 跨平台AI批量自动剪辑视频工具
  • 罗技Flow跨电脑控制
  • 三菱PLC软元件 定时器 计数器 状态继电器 编码器
  • Volcano 进阶实战 (二) - (网络拓扑/负载感知)调度
  • 深入解析Dify中的文本清洗处理器:CleanProcessor详解
  • 玩转Pygame绘图:从简单图形到炫酷精灵
  • 构造函数有哪些种类?
  • 敦普水性低温烤漆:金属涂装80℃烘烤的防护体系
  • 牛客:BM1 反转链表
  • 利用 functools.lru_cache 优化递归算法
  • GPU 加速库(CUDA/cuDNN)
  • 每日面试实录·滴滴·校招·JAVA
  • MIL、SIL、HIL与Back-to-Back测试详解:从模型到硬件的完整验证链
  • ultralytics 目标检测 混淆矩阵 背景图像 没被记录
  • docker 常用配置
  • 信息系统项目管理工程师备考计算类真题讲解十
  • 数位 DP 详解
  • Python并行计算:2.Python多线程编程:threading模块详解与守护线程实战
  • B3791 [信息与未来 2023] 电路布线
  • c++-模板
  • 2.4.5goweb项目上传到csdn的git仓库
  • 【量化交易笔记】17.多因子的线性回归模型策略
  • 提取office最强悍的软件
  • asammdf 库的文件操作和数据导出:高效管理 MDF 文件
  • 刚体运动 (位置向量 - 旋转矩阵) 笔记 1.1~1.3 (台大机器人学-林沛群)
  • 职场十二法则-马方
  • AnimateCC教学:元件旋转当中平移