SpringBoot3响应式编程全套-Reactor核心

目录

  • 传送门
  • 前言
  • 一、前置知识
    • 1、Lambda
    • 2、Function
    • 3、StreamAPI
    • 4、Reactive-Stream
  • 二、Reactor
    • 1、快速上手
      • 1.1、介绍
      • 1.2、依赖
    • 2、响应式编程
      • 2.1、阻塞是对资源的浪费
      • 2.2、异步可以解决问题吗?
      • 2.3、从命令式编程到响应式编程
  • 三、核心特性
    • 1、Mono和Flux
    • 2、subscribe()
    • 3、流的取消
    • 4、BaseSubscriber
    • 5、背压(Backpressure )和请求重塑(Reshape Requests)
      • 5.1、buffer:缓冲
      • 5.2、limit:限流
    • 6、以编程方式创建序列-Sink
    • 7、 handle()
    • 8、自定义线程调度
    • 9、错误处理
      • 9.1、Catch and return a static default value. 捕获异常返回一个静态默认值
      • 9.2、Catch and execute an alternative path with a fallback method.
      • 9.3、Catch and dynamically compute a fallback value. 捕获并动态计算一个返回值
      • 9.4、Catch, wrap to a BusinessException, and re-throw.
      • 9.5、Catch, log an error-specific message, and re-throw.
      • 9.6、Use the finally block to clean up resources or a Java 7 “try-with-resource” construct.
      • 9.7、忽略当前异常,仅通知记录,继续推进
    • 10、常用操作

传送门

SpringMVC的源码解析(精品)
Spring6的源码解析(精品)
SpringBoot3框架(精品)
MyBatis框架(精品)
MyBatis-Plus
SpringDataJPA
SpringCloudNetflix
SpringCloudAlibaba(精品)
Shiro
SpringSecurity
java的LOG日志框架
Activiti(敬请期待)
JDK8新特性
JDK9新特性
JDK10新特性
JDK11新特性
JDK12新特性
JDK13新特性
JDK14新特性
JDK15新特性
JDK16新特性
JDK17新特性
JDK18新特性
JDK19新特性
JDK20新特性
JDK21新特性
其他技术文章传送门入口

前言

由于面试问到的比较多,而且做java开发这块还是需要真正掌握的。
现有笔记尚硅谷雷锋阳老师的:SpringBoot3全栈指南,是我目前见过的最好笔记了。
参考视频尚硅谷雷锋阳老师的:SpringBoot零基础教程,面试&加薪必会,视频是24小时31分钟的高质量教程。
参考代码:https://gitee.com/leifengyang/spring-boot-3

最经典的20个Spring Boot面试题,95%以上会被问到,不服来战

为了防止雷锋阳老师的日志查看不到,这里分类整理一下。下面文章不定时更新

SpringBoot3核心特性-快速入门
SpringBoot3核心特性-Web开发
SpringBoot3核心特性-数据访问
SpringBoot3核心特性-基础特性
SpringBoot3核心特性-核心原理
SpringBoot3场景整合
SpringBoot3响应式编程全套-Reactor核心
SpringBoot3响应式编程全套-Spring Webflux
SpringBoot3响应式编程全套-R2DBC
SpringBoot3响应式编程全套-Spring Security Reactive

一、前置知识

在这里插入图片描述

1、Lambda

Java8语法糖:

package com.atguiggu.lambda;import java.util.*;
import java.util.function.*;
import java.util.stream.Collectors;/*** @author lfy* @Description* @create 2023-11-16 20:07*///函数式接口;只要是函数式接口就可以用Lambda表达式简化
//函数式接口: 接口中有且只有一个未实现的方法,这个接口就叫函数式接口interface MyInterface {int sum(int i, int j);
}interface MyHaha {int haha();default int heihei() {return 2;}; //默认实现
}interface My666 {void aaa(int i,int j,int k);
}@FunctionalInterface //检查注解,帮我们快速检查我们写的接口是否函数式接口
interface MyHehe {int hehe(int i);}//1、自己写实现类
class MyInterfaceImpl implements MyInterface {@Overridepublic int sum(int i, int j) {return i + j;}
}public class Lambda {public static void main(String[] args) {//声明一个函数BiConsumer<String,String> consumer = (a,b)->{System.out.println("哈哈:"+a+";呵呵:"+b);};consumer.accept("1","2");//声明一个函数Function<String,Integer> function = (String x) -> Integer.parseInt(x);System.out.println(function.apply("2"));Supplier<String> supplier = ()-> UUID.randomUUID().toString();String s = supplier.get();System.out.println(s);BiFunction<String,Integer,Long> biFunction = (a,b)-> 888L;Predicate<Integer> even = (t)-> t%2 ==0;//        even.test()//正向判断
//        even.negate().test(2) //反向判断System.out.println(even.negate().test(2));}public static void bbbbb(String[] args) {var names = new ArrayList<String>();names.add("Alice");names.add("Bob");names.add("Charlie");names.add("David");//比较器
//        Collections.sort(names, new Comparator<String>() {
//            @Override
//            public int compare(String o1, String o2) {
//                return o2.compareTo(o1);
//            }
//        });//直接写函数式接口就方便   (o1,o2)->o1.compareTo(o2)
//        Collections.sort(names,(o1,o2)->o1.compareTo(o2));System.out.println(names);// 类::方法; 引用类中的实例方法; 忽略lambda的完整写法Collections.sort(names,String::compareTo);System.out.println(names);new  Thread(new Runnable() {@Overridepublic void run() {System.out.println("哈哈啊");}}).start();Runnable runnable = () -> System.out.println("aaa");new Thread(runnable).start();//最佳实战://1、以后调用某个方法传入参数,这个参数实例是一个接口对象,且只定义了一个方法,就直接用lambda简化写法}/*** lambda简化函数式接口实例创建** @param args*/public static void aaaa(String[] args) {//1、自己创建实现类对象MyInterface myInterface = new MyInterfaceImpl();System.out.println(myInterface.sum(1, 2));//2、创建匿名实现类MyInterface myInterface1 = new MyInterface() {@Overridepublic int sum(int i, int j) {return i * i + j * j;}};
//        System.out.println(myInterface1.sum(2, 3));//冗余写法//3、lambda表达式:语法糖  参数列表  + 箭头 + 方法体MyInterface myInterface2 = (x, y) -> {return x * x + y * y;};System.out.println(myInterface2.sum(2, 3));//参数位置最少情况MyHaha myHaha = () -> {return 1;};MyHehe myHehe = y -> {return y * y;};MyHehe hehe2 = y -> y - 1;//完整写法如上://简化写法://1)、参数类型可以不写,只写(参数名),参数变量名随意定义;//    参数表最少可以只有一个 (),或者只有一个参数名;//2、方法体如果只有一句话,{} 可以省略MyHehe hehe3 = y -> y + 1;System.out.println(hehe3.hehe(7));//以上Lambda表达式简化了实例的创建。//总结:// 1、Lambda表达式: (参数表) -> {方法体}// 2、分辨出你的接口是否函数式接口。 函数式接口就可以lambda简化}}

2、Function

函数式接口的出入参定义:
1、有入参,无出参【消费者】: function.accept

        BiConsumer<String,String> function = (a,b)->{ //能接受两个入参System.out.println("哈哈:"+a+";呵呵:"+b);};function.accept("1","2");

2、有入参,有出参【多功能函数】: function.apply

        Function<String,Integer> function = (String x) -> Integer.parseInt(x);System.out.println(function.apply("2"));

3、无入参,无出参【普通函数】:

        Runnable runnable = () -> System.out.println("aaa");new Thread(runnable).start();

4、无入参 ,有出参【提供者】: supplier.get()

        Supplier<String> supplier = ()-> UUID.randomUUID().toString();String s = supplier.get();System.out.println(s);

java.util.function包下的所有function定义:
● Consumer: 消费者
● Supplier: 提供者
● Predicate: 断言
get/test/apply/accept调用的函数方法;

3、StreamAPI

最佳实战:以后凡是你写for循环处理数据的统一全部用StreamAPI进行替换;
Stream所有数据和操作被组合成流管道流管道组成:
● 一个数据源(可以是一个数组、集合、生成器函数、I/O管道)
● 零或多个中间操作(将一个流变形成另一个流)
● 一个终止操作(产生最终结果)

中间操作:Intermediate Operations
● filter:过滤; 挑出我们用的元素
● map: 映射: 一一映射,a 变成 b
○ mapToInt、mapToLong、mapToDouble
● flatMap:打散、散列、展开、扩维:一对多映射

filter、
map、mapToInt、mapToLong、mapToDouble
flatMap、flatMapToInt、flatMapToLong、flatMapToDouble
mapMulti、mapMultiToInt、mapMultiToLong、mapMultiToDouble、
parallel、unordered、onClose、sequential
distinct、sorted、peek、limit、skip、takeWhile、dropWhile、

终止操作:Terminal Operation

forEach、forEachOrdered、toArray、reduce、collect、toList、min、
max、count、anyMatch、allMatch、noneMatch、findFirst、findAny、iterator

4、Reactive-Stream

在这里插入图片描述

二、Reactor

1、快速上手

1.1、介绍

Reactor 是一个用于JVM的完全非阻塞的响应式编程框架,具备高效的需求管理(即对 “背压(backpressure)”的控制)能力。它与 Java 8 函数式 API 直接集成,比如 CompletableFuture, Stream, 以及 Duration。它提供了异步序列 API Flux(用于[N]个元素)和 Mono(用于 [0|1]个元素),并完全遵循和实现了“响应式扩展规范”(Reactive Extensions Specification)。
Reactor 的 reactor-ipc 组件还支持非阻塞的进程间通信(inter-process communication, IPC)。 Reactor IPC 为 HTTP(包括 Websockets)、TCP 和 UDP 提供了支持背压的网络引擎,从而适合 应用于微服务架构。并且完整支持响应式编解码(reactive encoding and decoding)。

1.2、依赖

<dependencyManagement> <dependencies><dependency><groupId>io.projectreactor</groupId><artifactId>reactor-bom</artifactId><version>2023.0.0</version><type>pom</type><scope>import</scope></dependency></dependencies>
</dependencyManagement>
<dependencies><dependency><groupId>io.projectreactor</groupId><artifactId>reactor-core</artifactId> </dependency><dependency><groupId>io.projectreactor</groupId><artifactId>reactor-test</artifactId> <scope>test</scope></dependency>
</dependencies>

2、响应式编程

响应式编程是一种关注于数据流(data streams)和变化传递(propagation of change)的异步编程方式。 这意味着它可以用既有的编程语言表达静态(如数组)或动态(如事件源)的数据流。

了解历史:
● 在响应式编程方面,微软跨出了第一步,它在 .NET 生态中创建了响应式扩展库(Reactive Extensions library, Rx)。接着 RxJava 在JVM上实现了响应式编程。后来,在 JVM 平台出现了一套标准的响应式 编程规范,它定义了一系列标准接口和交互规范。并整合到 Java 9 中(使用 Flow 类)。
● 响应式编程通常作为面向对象编程中的“观察者模式”(Observer design pattern)的一种扩展。 响应式流(reactive streams)与“迭代子模式”(Iterator design pattern)也有相通之处, 因为其中也有 Iterable-Iterator 这样的对应关系。主要的区别在于,Iterator 是基于 “拉取”(pull)方式的,而响应式流是基于“推送”(push)方式的。
● 使用 iterator 是一种“命令式”(imperative)编程范式,即使访问元素的方法是 Iterable 的唯一职责。关键在于,什么时候执行 next() 获取元素取决于开发者。在响应式流中,相对应的 角色是 Publisher-Subscriber,但是 当有新的值到来的时候 ,却反过来由发布者(Publisher) 通知订阅者(Subscriber),这种“推送”模式是响应式的关键。此外,对推送来的数据的操作 是通过一种声明式(declaratively)而不是命令式(imperatively)的方式表达的:开发者通过 描述“控制流程”来定义对数据流的处理逻辑。
● 除了数据推送,对错误处理(error handling)和完成(completion)信号的定义也很完善。 一个 Publisher 可以推送新的值到它的 Subscriber(调用 onNext 方法), 同样也可以推送错误(调用 onError 方法)和完成(调用 onComplete 方法)信号。 错误和完成信号都可以终止响应式流。可以用下边的表达式描述:

onNext x 0..N [onError | onComplete]

2.1、阻塞是对资源的浪费

现代应用需要应对大量的并发用户,而且即使现代硬件的处理能力飞速发展,软件性能仍然是关键因素。
广义来说我们有两种思路来提升程序性能:

  1. 并行化(parallelize) :使用更多的线程和硬件资源。[异步]
  2. 基于现有的资源来 提高执行效率 。
    通常,Java开发者使用阻塞式(blocking)编写代码。这没有问题,在出现性能瓶颈后, 我们可以增加处理线程,线程中同样是阻塞的代码。但是这种使用资源的方式会迅速面临 资源竞争和并发问题。
    更糟糕的是,阻塞会浪费资源。具体来说,比如当一个程序面临延迟(通常是I/O方面, 比如数据库读写请求或网络调用),所在线程需要进入 idle 状态等待数据,从而浪费资源。
    所以,并行化方式并非银弹。这是挖掘硬件潜力的方式,但是却带来了复杂性,而且容易造成浪费。

2.2、异步可以解决问题吗?

第二种思路——提高执行效率——可以解决资源浪费问题。通过编写 异步非阻塞 的代码, (任务发起异步调用后)执行过程会切换到另一个 使用同样底层资源 的活跃任务,然后等 异步调用返回结果再去处理。

但是在 JVM 上如何编写异步代码呢?Java 提供了两种异步编程方式:
● 回调(Callbacks) :异步方法没有返回值,而是采用一个 callback 作为参数(lambda 或匿名类),当结果出来后回调这个 callback。常见的例子比如 Swings 的 EventListener。
● Futures :异步方法 立即 返回一个 Future,该异步方法要返回结果的是 T 类型,通过 Future封装。这个结果并不是 立刻 可以拿到,而是等实际处理结束才可用。比如, ExecutorService 执行 Callable 任务时会返回 Future 对象。

这些技术够用吗?并非对于每个用例都是如此,两种方式都有局限性。
回调很难组合起来,因为很快就会导致代码难以理解和维护(即所谓的“回调地狱(callback hell)”)。
考虑这样一种情景:
● 在用户界面上显示用户的5个收藏,或者如果没有任何收藏提供5个建议。
● 这需要3个 服务(一个提供收藏的ID列表,第二个服务获取收藏内容,第三个提供建议内容):
回调地狱(Callback Hell)的例子:

userService.getFavorites(userId, new Callback<List<String>>() { public void onSuccess(List<String> list) { if (list.isEmpty()) { suggestionService.getSuggestions(new Callback<List<Favorite>>() {public void onSuccess(List<Favorite> list) { UiUtils.submitOnUiThread(() -> { list.stream().limit(5).forEach(uiList::show); });}public void onError(Throwable error) { UiUtils.errorPopup(error);}});} else {list.stream() .limit(5).forEach(favId -> favoriteService.getDetails(favId, new Callback<Favorite>() {public void onSuccess(Favorite details) {UiUtils.submitOnUiThread(() -> uiList.show(details));}public void onError(Throwable error) {UiUtils.errorPopup(error);}}));}}public void onError(Throwable error) {UiUtils.errorPopup(error);}
});

Reactor改造后为:

userService.getFavorites(userId) .flatMap(favoriteService::getDetails) .switchIfEmpty(suggestionService.getSuggestions()) .take(5) .publishOn(UiUtils.uiThreadScheduler()) .subscribe(uiList::show, UiUtils::errorPopup); 

如果你想确保“收藏的ID”的数据在800ms内获得(如果超时,从缓存中获取)呢?在基于回调的代码中, 会比较复杂。但 Reactor 中就很简单,在处理链中增加一个 timeout 的操作符即可。

userService.getFavorites(userId).timeout(Duration.ofMillis(800)) .onErrorResume(cacheService.cachedFavoritesFor(userId)) .flatMap(favoriteService::getDetails) .switchIfEmpty(suggestionService.getSuggestions()).take(5).publishOn(UiUtils.uiThreadScheduler()).subscribe(uiList::show, UiUtils::errorPopup);

额外扩展:
Futures 比回调要好一点,但即使在 Java 8 引入了 CompletableFuture,它对于多个处理的组合仍不够好用。 编排多个 Futures 是可行的,但却不易。此外,Future 还有一个问题:当对 Future 对象最终调用 get() 方法时,仍然会导致阻塞,并且缺乏对多个值以及更进一步对错误的处理。
考虑另外一个例子,我们首先得到 ID 的列表,然后通过它进一步获取到“对应的 name 和 statistics” 为元素的列表,整个过程用异步方式来实现。
CompletableFuture 处理组合的例子

CompletableFuture<List<String>> ids = ifhIds(); CompletableFuture<List<String>> result = ids.thenComposeAsync(l -> { Stream<CompletableFuture<String>> zip =l.stream().map(i -> { CompletableFuture<String> nameTask = ifhName(i); CompletableFuture<Integer> statTask = ifhStat(i); return nameTask.thenCombineAsync(statTask, (name, stat) -> "Name " + name + " has stats " + stat); });List<CompletableFuture<String>> combinationList = zip.collect(Collectors.toList()); CompletableFuture<String>[] combinationArray = combinationList.toArray(new CompletableFuture[combinationList.size()]);CompletableFuture<Void> allDone = CompletableFuture.allOf(combinationArray); return allDone.thenApply(v -> combinationList.stream().map(CompletableFuture::join) .collect(Collectors.toList()));
});List<String> results = result.join(); 
assertThat(results).contains("Name NameJoe has stats 103","Name NameBart has stats 104","Name NameHenry has stats 105","Name NameNicole has stats 106","Name NameABSLAJNFOAJNFOANFANSF has stats 121");

2.3、从命令式编程到响应式编程

类似 Reactor 这样的响应式库的目标就是要弥补上述“经典”的 JVM 异步方式所带来的不足, 此外还会关注一下几个方面:
● 可编排性(Composability) 以及 可读性(Readability)
● 使用丰富的 操作符 来处理形如 流 的数据
● 在 订阅(subscribe) 之前什么都不会发生
● 背压(backpressure) 具体来说即 消费者能够反向告知生产者生产内容的速度的能力
● 高层次 (同时也是有高价值的)的抽象,从而达到 并发无关 的效果

可编排性与可读性
可编排性,指的是编排多个异步任务的能力。比如我们将前一个任务的结果传递给后一个任务作为输入, 或者将多个任务以分解再汇总(fork-join)的形式执行,或者将异步的任务作为离散的组件在系统中 进行重用。
这种编排任务的能力与代码的可读性和可维护性是紧密相关的。随着异步处理任务数量和复杂度 的提高,编写和阅读代码都变得越来越困难。就像我们刚才看到的,回调模式是简单的,但是缺点 是在复杂的处理逻辑中,回调中会层层嵌入回调,导致 回调地狱(Callback Hell) 。你能猜到 (或有过这种痛苦经历),这样的代码是难以阅读和分析的。
Reactor 提供了丰富的编排操作,从而代码直观反映了处理流程,并且所有的操作保持在同一层次 (尽量避免了嵌套)。

就像装配流水线
你可以想象数据在响应式应用中的处理,就像流过一条装配流水线。Reactor 既是传送带, 又是一个个的装配工或机器人。原材料从源头(最初的 Publisher)流出,最终被加工为成品, 等待被推送到消费者(或者说 Subscriber)。
原材料会经过不同的中间处理过程,或者作为半成品与其他半成品进行组装。如果某处有齿轮卡住, 或者某件产品的包装过程花费了太久时间,相应的工位就可以向上游发出信号来限制或停止发出原材料。

操作符(Operators)
在 Reactor 中,操作符(operator)就像装配线中的工位(操作员或装配机器人)。每一个操作符 对 Publisher 进行相应的处理,然后将 Publisher 包装为一个新的 Publisher。就像一个链条, 数据源自第一个 Publisher,然后顺链条而下,在每个环节进行相应的处理。最终,一个订阅者 (Subscriber)终结这个过程。请记住,在订阅者(Subscriber)订阅(subscribe)到一个 发布者(Publisher)之前,什么都不会发生。
理解了操作符会创建新的 Publisher 实例这一点,能够帮助你避免一个常见的问题, 这种问题会让你觉得处理链上的某个操作符没有起作用。
虽然响应式流规范(Reactive Streams specification)没有规定任何操作符, 类似 Reactor 这样的响应式库所带来的最大附加价值之一就是提供丰富的操作符。包括基础的转换操作, 到过滤操作,甚至复杂的编排和错误处理操作。

subscribe() 之前什么都不会发生

在 Reactor 中,当你创建了一条 Publisher 处理链,数据还不会开始生成。事实上,你是创建了 一种抽象的对于异步处理流程的描述(从而方便重用和组装)。
当真正“订阅(subscrib)”的时候,你需要将 Publisher 关联到一个 Subscriber 上,然后 才会触发整个链的流动。这时候,Subscriber 会向上游发送一个 request 信号,一直到达源头 的 Publisher。

背压

向上游传递信号这一点也被用于实现 背压 ,就像在装配线上,某个工位的处理速度如果慢于流水线 速度,会对上游发送反馈信号一样。
在响应式流规范中实际定义的机制同刚才的类比非常接近:订阅者可以无限接受数据并让它的源头 “满负荷”推送所有的数据,也可以通过使用 request 机制来告知源头它一次最多能够处理 n 个元素。
中间环节的操作也可以影响 request。想象一个能够将每10个元素分批打包的缓存(buffer)操作。 如果订阅者请求一个元素,那么对于源头来说可以生成10个元素。此外预取策略也可以使用了, 比如在订阅前预先生成元素。
这样能够将“推送”模式转换为“推送+拉取”混合的模式,如果下游准备好了,可以从上游拉取 n 个元素;但是如果上游元素还没有准备好,下游还是要等待上游的推送。

热(Hot) vs 冷(Cold)

在 Rx 家族的响应式库中,响应式流分为“热”和“冷”两种类型,区别主要在于响应式流如何 对订阅者进行响应:
● 一个“冷”的序列,指对于每一个 Subscriber,都会收到从头开始所有的数据。如果源头 生成了一个 HTTP 请求,对于每一个订阅都会创建一个新的 HTTP 请求。
● 一个“热”的序列,指对于一个 Subscriber,只能获取从它开始 订阅 之后 发出的数据。不过注意,有些“热”的响应式流可以缓存部分或全部历史数据。 通常意义上来说,一个“热”的响应式流,甚至在即使没有订阅者接收数据的情况下,也可以 发出数据(这一点同 “Subscribe() 之前什么都不会发生”的规则有冲突)。

三、核心特性

1、Mono和Flux

Mono: 0|1 数据流
Flux: N数据流

响应式流:元素(内容) + 信号(完成/异常);

2、subscribe()

自定义流的信号感知回调

flux.subscribe(v-> System.out.println("v = " + v), //流元素消费throwable -> System.out.println("throwable = " + throwable), //感知异常结束()-> System.out.println("流结束了...") //感知正常结束
);

自定义消费者

flux.subscribe(new BaseSubscriber<String>() {// 生命周期钩子1: 订阅关系绑定的时候触发@Overrideprotected void hookOnSubscribe(Subscription subscription) {// 流被订阅的时候触发System.out.println("绑定了..."+subscription);//找发布者要数据request(1); //要1个数据
//                requestUnbounded(); //要无限数据}@Overrideprotected void hookOnNext(String value) {System.out.println("数据到达,正在处理:"+value);request(1); //要1个数据}//  hookOnComplete、hookOnError 二选一执行@Overrideprotected void hookOnComplete() {System.out.println("流正常结束...");}@Overrideprotected void hookOnError(Throwable throwable) {System.out.println("流异常..."+throwable);}@Overrideprotected void hookOnCancel() {System.out.println("流被取消...");}@Overrideprotected void hookFinally(SignalType type) {System.out.println("最终回调...一定会被执行");}});

3、流的取消

消费者调用 cancle() 取消流的订阅;
Disposable

        Flux<String> flux = Flux.range(1, 10).map(i -> {System.out.println("map..."+i);if(i==9) {i = 10/(9-i); //数学运算异常;  doOnXxx}return "哈哈:" + i;}); //流错误的时候,把错误吃掉,转为正常信号//        flux.subscribe(); //流被订阅; 默认订阅;
//        flux.subscribe(v-> System.out.println("v = " + v));//指定订阅规则: 正常消费者:只消费正常元素//        flux.subscribe(
//                v-> System.out.println("v = " + v), //流元素消费
//                throwable -> System.out.println("throwable = " + throwable), //感知异常结束
//                ()-> System.out.println("流结束了...") //感知正常结束
//        );// 流的生命周期钩子可以传播给订阅者。//  a() {//      data = b();//  }flux.subscribe(new BaseSubscriber<String>() {// 生命周期钩子1: 订阅关系绑定的时候触发@Overrideprotected void hookOnSubscribe(Subscription subscription) {// 流被订阅的时候触发System.out.println("绑定了..."+subscription);//找发布者要数据request(1); //要1个数据
//                requestUnbounded(); //要无限数据}@Overrideprotected void hookOnNext(String value) {System.out.println("数据到达,正在处理:"+value);if(value.equals("哈哈:5")){cancel(); //取消流}request(1); //要1个数据}//  hookOnComplete、hookOnError 二选一执行@Overrideprotected void hookOnComplete() {System.out.println("流正常结束...");}@Overrideprotected void hookOnError(Throwable throwable) {System.out.println("流异常..."+throwable);}@Overrideprotected void hookOnCancel() {System.out.println("流被取消...");}@Overrideprotected void hookFinally(SignalType type) {System.out.println("最终回调...一定会被执行");}});

4、BaseSubscriber

自定义消费者,推荐直接编写 BaseSubscriber 的逻辑;

5、背压(Backpressure )和请求重塑(Reshape Requests)

5.1、buffer:缓冲

Flux<List<Integer>> flux = Flux.range(1, 10)  //原始流10个.buffer(3).log();//缓冲区:缓冲3个元素: 消费一次最多可以拿到三个元素; 凑满数批量发给消费者
//
//        //一次发一个,一个一个发;
// 10元素,buffer(3);消费者请求4次,数据消费完成

5.2、limit:限流

Flux.range(1, 1000).log()//限流触发,看上游是怎么限流获取数据的.limitRate(100) //一次预取30个元素; 第一次 request(100),以后request(75).subscribe();

6、以编程方式创建序列-Sink

Sink.next
Sink.complete

1、同步环境-generate
2、多线程-create

7、 handle()

自定义流中元素处理规则

   //Flux.range(1,10).handle((value,sink)->{System.out.println("拿到的值:"+value);sink.next("张三:"+value); //可以向下发送数据的通道}).log() //日志.subscribe();

8、自定义线程调度

响应式:响应式编程: 全异步、消息、事件回调
默认还是用当前线程,生成整个流、发布流、流操作

public void thread1(){Scheduler s = Schedulers.newParallel("parallel-scheduler", 4);final Flux<String> flux = Flux.range(1, 2).map(i -> 10 + i).log().publishOn(s).map(i -> "value " + i);//只要不指定线程池,默认发布者用的线程就是订阅者的线程;new Thread(() -> flux.subscribe(System.out::println)).start();
}

9、错误处理

命令式编程:常见的错误处理方式

9.1、Catch and return a static default value. 捕获异常返回一个静态默认值

try {return doSomethingDangerous(10);
}
catch (Throwable error) {return "RECOVERED";
}

onErrorReturn: 实现上面效果,错误的时候返回一个值
● 1、吃掉异常,消费者无异常感知
● 2、返回一个兜底默认值
● 3、流正常完成;

        Flux.just(1, 2, 0, 4).map(i -> "100 / " + i + " = " + (100 / i)).onErrorReturn(NullPointerException.class,"哈哈-6666").subscribe(v-> System.out.println("v = " + v),err -> System.out.println("err = " + err),()-> System.out.println("流结束")); // error handling example

9.2、Catch and execute an alternative path with a fallback method.

吃掉异常,执行一个兜底方法;

try {return doSomethingDangerous(10);
}
catch (Throwable error) {return doOtherthing(10);
}

onErrorResume
● 1、吃掉异常,消费者无异常感知
● 2、调用一个兜底方法
● 3、流正常完成

        Flux.just(1, 2, 0, 4).map(i -> "100 / " + i + " = " + (100 / i)).onErrorResume(err -> Mono.just("哈哈-777")).subscribe(v -> System.out.println("v = " + v),err -> System.out.println("err = " + err),() -> System.out.println("流结束"));

9.3、Catch and dynamically compute a fallback value. 捕获并动态计算一个返回值

根据错误返回一个新值

try {Value v = erroringMethod();return MyWrapper.fromValue(v);
}
catch (Throwable error) {return MyWrapper.fromError(error);
}
.onErrorResume(err -> Flux.error(new BusinessException(err.getMessage()+":炸了")))

● 1、吃掉异常,消费者有感知
● 2、调用一个自定义方法
● 3、流异常完成

9.4、Catch, wrap to a BusinessException, and re-throw.

捕获并包装成一个业务异常,并重新抛出

try {return callExternalService(k);
}
catch (Throwable error) {throw new BusinessException("oops, SLA exceeded", error);
}

包装重新抛出异常: 推荐用 .onErrorMap
● 1、吃掉异常,消费者有感知
● 2、抛新异常
● 3、流异常完成

.onErrorResume(err -> Flux.error(new BusinessException(err.getMessage()+":炸了")))Flux.just(1, 2, 0, 4).map(i -> "100 / " + i + " = " + (100 / i)).onErrorMap(err-> new BusinessException(err.getMessage()+": 又炸了...")).subscribe(v -> System.out.println("v = " + v),err -> System.out.println("err = " + err),() -> System.out.println("流结束"));

9.5、Catch, log an error-specific message, and re-throw.

捕获异常,记录特殊的错误日志,重新抛出

try {return callExternalService(k);
}
catch (RuntimeException error) {//make a record of the errorlog("uh oh, falling back, service failed for key " + k);throw error;
}
        Flux.just(1, 2, 0, 4).map(i -> "100 / " + i + " = " + (100 / i)).doOnError(err -> {System.out.println("err已被记录 = " + err);}).subscribe(v -> System.out.println("v = " + v),err -> System.out.println("err = " + err),() -> System.out.println("流结束"));

● 异常被捕获、做自己的事情
● 不影响异常继续顺着流水线传播
● 1、不吃掉异常,只在异常发生的时候做一件事,消费者有感知

9.6、Use the finally block to clean up resources or a Java 7 “try-with-resource” construct.

        Flux.just(1, 2, 3, 4).map(i -> "100 / " + i + " = " + (100 / i)).doOnError(err -> {System.out.println("err已被记录 = " + err);}).doFinally(signalType -> {System.out.println("流信号:"+signalType);})

9.7、忽略当前异常,仅通知记录,继续推进

Flux.just(1,2,3,0,5).map(i->10/i).onErrorContinue((err,val)->{System.out.println("err = " + err);System.out.println("val = " + val);System.out.println("发现"+val+"有问题了,继续执行其他的,我会记录这个问题");}) //发生.subscribe(v-> System.out.println("v = " + v),err-> System.out.println("err = " + err));

10、常用操作

filter、flatMap、concatMap、flatMapMany、transform、defaultIfEmpty、switchIfEmpty、concat、concatWith、merge、mergeWith、mergeSequential、zip、zipWith…

今日内容:
● 常用操作
● 错误处理
● 超时与重试
● Sinks工具类
○ 单播
○ 多播
○ 重放
○ 背压
○ 缓存
● 阻塞式API
○ block
● Context-API:响应式中的ThreadLocal
○ ThreadLocal机制失效

        Flux.just(1,2,3).transformDeferredContextual((flux,context)->{System.out.println("flux = " + flux);System.out.println("context = " + context);return flux.map(i->i+"==>"+context.get("prefix"));})//上游能拿到下游的最近一次数据.contextWrite(Context.of("prefix","哈哈"))//ThreadLocal共享了数据,上游的所有人能看到; Context由下游传播给上游.subscribe(v-> System.out.println("v = " + v));

● ParallelFlux:
○ 并发流

        Flux.range(1,1000000).buffer(100).parallel(8).runOn(Schedulers.newParallel("yy"))
.log()
.subscribe();

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.xdnf.cn/news/1551999.html

如若内容造成侵权/违法违规/事实不符,请联系一条长河网进行投诉反馈,一经查实,立即删除!

相关文章

在 AI 大模型时代,了解 Agentic RAG 的核心理念至关重要

&#x1f349; CSDN 叶庭云&#xff1a;https://yetingyun.blog.csdn.net/ Agentic RAG&#xff0c;即基于智能体的检索增强生成技术&#xff0c;融合了 AI Agent 与 RAG 技术的优势。该技术通过集成 AI Agent&#xff0c;显著提升了 RAG 系统的智能水平与自主能力&#xff0c;…

1.7 编码与调制

欢迎大家订阅【计算机网络】学习专栏&#xff0c;开启你的计算机网络学习之旅&#xff01; 文章目录 前言前言1 基本术语2 常用的编码方法2.1 不归零编码2.2 归零编码2.3 反向归零编码2.4 曼彻斯特编码2.5 差分曼彻斯特编码 3 常用的调制方法3.1 调幅&#xff08;AM&#xff09…

Linux网络操作命令与函数全面总结

1. 引言 Linux作为服务器和开发平台&#xff0c;网络操作是其核心功能之一。本文旨在全面总结Linux系统中的网络操作方法&#xff0c;包括命令行工具和编程接口&#xff0c;帮助读者深入理解Linux网络管理的机制。 2. 命令行工具 2.1 ping 命令 ping 命令用于测试网络连接和…

【初阶数据结构】详解插入排序 希尔排序(内含排序的概念和意义)

文章目录 前言1. 排序的概念及其应用1.1 排序的概念1.2 排序的应用 2. 插入排序2.1 基本思想2.2 插入排序的代码实现2.3 插入排序算法总结 3. 希尔排序3.1 基本思想3.2 希尔排序的代码实现3.3 希尔排序的特征总结 前言 初级数据结构系列已经进入到了排序的部分了。相信大家听到…

DolphinScheduler 资源中心无法上传大文件

服务&#xff1a;dolphinscheduler 版本&#xff1a;v3.16 问题描述&#xff1a;资源中心-文件管理中使用文件上传是出现中断或上传失败 排除思路&#xff1a; 测试小文件或其他类型文件时是否正常&#xff1b;F12查看接口调用成功以及失败时的对比&#xff0c;发现接口调用…

内核级理解套接字和全连接队列

一、全连接队列 listen 函数第二个参数 backlog 是输入全连接队列的长度&#xff0c;一般不会太大。那如何理解全连接队列呢&#xff1f; 首先三次握手建立连接的过程和服务器是否 accept 无关&#xff0c;accept 的本质就是把已经建立的连接以文件描述符的形式返回。 那么在…

[含文档+PPT+源码等]精品大数据项目-基于Django实现的高校图书馆智能推送系统的设计与实现

大数据项目——基于Django实现的高校图书馆智能推送系统的设计与实现背景&#xff0c;可以从以下几个方面进行详细阐述&#xff1a; 一、信息技术的发展背景 随着信息技术的飞速发展和互联网的广泛普及&#xff0c;大数据已经成为现代社会的重要资源。在大数据背景下&#xf…

言语理解(3)

如果选项中填写的第一句话是文言文&#xff0c;那么尤其要注意它后面的第一句话 D B 要注意要填写的句子后面最近的一句话 文艺和时代和文章中的主题词&#xff0c;B和D的区别就是文艺带动时代向前发展&#xff0c;D是文艺和时代互相影响&#xff0c;从全文可知是文艺影响时代带…

墙绘艺术市场的数字化转型:SpringBoot案例

1 绪论 1.1 研究背景 当前社会各行业领域竞争压力非常大&#xff0c;随着当前时代的信息化&#xff0c;科学化发展&#xff0c;让社会各行业领域都争相使用新的信息技术&#xff0c;对行业内的各种相关数据进行科学化&#xff0c;规范化管理。这样的大环境让那些止步不前&#…

常州威雅学校:欢迎探访校园,共赴全人教育之旅!

自2012年创校起&#xff0c;我们践行着“每一个孩子都卓越”的全人教育理念&#xff0c;见证了常州威雅发展至今天的方兴未艾。在岁月不居&#xff0c;时节如流间&#xff0c;我们用点点滴滴的耕耘&#xff0c;为学生的成长穿针引线&#xff0c;也在学校建设中精益求精。 一百次…

计算机毕业设计 服装生产信息管理系统的设计与实现 Java实战项目 附源码+文档+视频讲解

博主介绍&#xff1a;✌从事软件开发10年之余&#xff0c;专注于Java技术领域、Python人工智能及数据挖掘、小程序项目开发和Android项目开发等。CSDN、掘金、华为云、InfoQ、阿里云等平台优质作者✌ &#x1f345;文末获取源码联系&#x1f345; &#x1f447;&#x1f3fb; 精…

Golang | Leetcode Golang题解之第449题序列化和反序列化二叉搜索树

题目&#xff1a; 题解&#xff1a; type Codec struct{}func Constructor() (_ Codec) { return }func (Codec) serialize(root *TreeNode) string {arr : []string{}var postOrder func(*TreeNode)postOrder func(node *TreeNode) {if node nil {return}postOrder(node.Le…

SQL第10课挑战题

1. 从OrderItems表中返回每个订单号order_num各有多少行数order_lines&#xff0c;并按order_lines对结果进行排序 2. 返回名为cheapest_item的字段&#xff0c;该字段包含每个供应商成本最低的产品&#xff08;使用products表中的prod_price)&#xff0c;然后从最低成本到最高…

CMOS Sensor调试笔记

最近在调CMOS Sensor&#xff1b;基于无ISP的芯片。 第一步&#xff0c;找模组厂要到对应Sensor对应分辨率&#xff0c;YUV信息的驱动。 第二步&#xff0c;确认信号的极性&#xff0c;VSYNC&#xff0c;SYNC, PCLK。 第三步&#xff0c;开始测试。 问题解决&#xff1a; 1&am…

Unity Asset Store的默认下载位置及更改下载路径的方法

修改Unity Asset Store的默认下载路径 Unity Asset Store默认下载位置 Unity Asset Store里下载资源&#xff0c;默认是下载到C盘里的&#xff0c;如果你不想做C盘战士的话&#xff0c;记得将下载的资源转移到其他盘。 Unity商城默认下载路径是C:\用户\用户名&#xff08;一般…

ZYNQ: GPIO 之 MIO 控制 LED 实验

GPIO 之 MIO 控制 LED 实验目的 使用 GPIO 通过两个 MIO 引脚控制 PS 端两个 LED 的亮灭&#xff0c;实现底板上 PS_LED0、PS_LED1 两个 LED 灯同亮同灭的效果。 简介 ZYNQ PS 中的外设&#xff08;如 USB 控制器、UART 控制器、I2C 控制器以及 GPIO 等等&#xff09;可以通…

哈希表和字符串哈希算法

哈希 哈希表&#xff08;Hash Table&#xff09;是一种数据结构&#xff0c;它可以通过一个哈希函数将键&#xff08;key&#xff09;映射到存储位置&#xff0c;从而实现高效的数据查找、插入和删除操作。哈希表的特点是能够在常数时间&#xff08;O(1)&#xff09;内完成查找…

【韩顺平Java笔记】第4章:运算符

文章目录 61. 上一章总结62. 算术运算符介绍62.1 运算符介绍62.2 算术运算符介绍62.3 算术运算符一览 63. 算术运算符使用64. 算术运算符练习165. 算术运算符练习266. 67. 算术运算符练习3,468. 关系运算符介绍68.1 关系运算符介绍68.2 关系运算符一览 69. 关系运算符使用70. 逻…

仿真设计|基于51单片机的温湿度及PM2.5监测系统仿真

目录 具体实现功能 设计介绍 51单片机简介 资料内容 仿真实现&#xff08;protues8.7&#xff09; 程序&#xff08;Keil5&#xff09; 全部内容 资料获取 具体实现功能 &#xff08;1&#xff09;LCD1602液晶第一行显示当前的PM2.5值&#xff0c;第二行显示当前的温度…

观测云对接 SkyWalking 最佳实践

简介 SkyWalking 是一个开源的 APM&#xff08;应用性能监控&#xff09;和可观测性分析平台&#xff0c;专为微服务、云原生架构和基于容器的架构设计。它提供了分布式追踪、服务网格遥测分析、度量聚合和可视化一体化的解决方案。如果您的应用中正在使用SkyWalking &#xf…