【Java】Stream流水线实现

Stream流水线

流水线的执行

Stream的操作可以分为两类: 中间操作和终端操作, 中间操作只是一种标记, 而只有终端操作才会触发最终的计算. 而中间操作又可以分为有状态操作和无状态操作, 其中无状态操作指元素处理不受前面的元素影响, 而有状态操作会受前面元素的影响. 终端操作又可以分为短路操作和非短路操作, 短路操作指无需处理完全部元素即可返回结果, 而非短路操作必须处理完全部的元素, 如的例子

@Test
public void pipelineLimit() {IntStream.range(1,10).boxed().peek(x -> System.out.printf("A%d%n", x)).limit(3).peek(x -> System.out.printf("B%d%n", x)).forEach(x -> System.out.printf("C%d%n", x));
}

这段代码输出的是

A1
B1
C1
A2
B2
C2
A3
B3
C3

只输出三组值的原因在于只有终端操作才会触发流水线的执行, 即只有forEach执行才会执行整个流水线; 而当第四次触发流水线的时候, 触发了limit的条件, 即刻终止流, 因此只输出了三组数据.

在流中, skip类似于continue, 而limit类似于break; 它并不会中断整个流的执行, 而只会中断当前流水线的执行, 如下面的例子

@Test
public void pipelineSkip() {IntStream.range(1, 10).boxed().peek(x -> System.out.printf("A%d%n", x)).skip(6).peek(x -> System.out.printf("B%d%n", x)).forEach(x -> System.out.printf("C%d%n", x));
}

它的输出为:

A1
A2
A3
A4
A5
A6
A7
B7
C7
A8
B8
C8
A9
B9
C9

每一个forEach都触发了流水线的执行, 但是当流水线执行到skip就不会执行后面的内容了, 因此前六个元素只会打印A, 而后三个元素不会被skip, 因此才会打印B/C

对于带有有状态操作的场景, 有状态的操作会执行完该操作前面的所有操作

public void statefulPipeline() {Stream.of(1, 6, 2, 5, 4, 3, 9, 8, 7).peek(x -> System.out.printf("A%d%n", x)).sorted().peek(x -> System.out.printf("B%d%n", x)).forEach(x -> System.out.printf("C%d%n", x));
}

它会执行完标记为A的无序的peek, 然后在逐个执行sorted后面的操作

A3
A9
A8
A7
B1
C1
B2
C2
B3
C3
B4
C4
B5
C5
B6
C6
B7
C7
B8
C8
B9
C9

Stream流的实现过程会修改执行的范围, 如下面的例子中, 因为peekcount的结果没有任何影响, 所以Stream的实现过程中会将peek流程省略掉

	public void shouldOptimizeExecution() {List<String> l = Arrays.asList("A", "B", "C", "D");long count = l.stream().peek(System.out::println).count();System.out.println(count);}
// peek不会被执行, 因为中间操作不会影响count()结果

流水线构造

在第一次到达终端操作, 会调用java.util.stream.AbstractPipeline#wrapAndCopyInto构造流水线; 它会先调用java.util.stream.AbstractPipeline#wrapSink将操作包装为链表, 然后再调用java.util.stream.AbstractPipeline#copyInto; 此时若检查到了短路操作, 则会优化执行流程, 否则则调用java.util.stream.AbstractPipeline#copyIntoWithCancel执行正常的执行流程

而对于类似count这样的短路操作,它构建流水线的流程相对于forEach较为简单; 它直接在java.util.stream.AbstractPipeline#exactOutputSizeIfKnown中对每个操作判断是否会导致长度变化, 以此可以跳过某些非必要的操作, 进而达到性能优化的效果

流水线操作的实现

在流水线中, 若是无状态操作的实现则非常简单, 只需要将所有的操作形成一个线性表, 然后按顺序执行就可以了因此不过多介绍; 但是这对于有状态操作无能为力, 有状态操作(sort)需要等待前面所有的任务都执行完才能开始执行, 倘若每个链路都完全执行的话就会浪费许多计算资源, 而每个链路逐步执行的话有状态操作又会因为状态信息不是最新而产生错误的计算结果. 为了协调好相邻操作的关系, Stream引入了Sink接口完成交互.

interface Sink<T> extends Consumer<T> {// 开始遍历元素前会调用此方法, 通知sink做好准备default void begin(long size) {}// 完成遍历后调用此方法, 通知sink没有更多的元素了, 并准备通知下游default void end() {}// 对于短路操作, 告诉流水线是否可以结束操作, 返回true即可结束操作default boolean cancellationRequested() {return false;}// 遍历元素时调用, 对当前的元素进行处理; // 前一个sink会调用当前sink的accept, 而当前sink会调用后一个sink的accept; 此过程会完成装啊提的传递// 这个方法继承于Consumer// sink中还对基本类型提供了accept, 这里就不展示了void accept(T t);
}

以下通过mappeek介绍无状态中间操作, 他们都是简单地调用方法然后通知下游即可

再通过sorted有状态非短路操作, 它会在开始执行前将所有的数据都准备好, 然后调用Arrays.sort进行排序

之后通过skiplimit介绍有状态短路操作, 之所以把他们放在一起的原因在于它们是通过一个类SliceOps实现的

最后通过forEach介绍终端操作, 它也是简单地对元素进行操作

mappeek的实现

map为例, 这是一个简单的无状态的中间操作, 它直接调用mapper中的方法, 然后传递给下游的Sink

public final <R> Stream<R> map(Function<? super P_OUT, ? extends R> mapper) {Objects.requireNonNull(mapper);return new StatelessOp<P_OUT, R>(this, StreamShape.REFERENCE,StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {@OverrideSink<P_OUT> opWrapSink(int flags, Sink<R> sink) {return new Sink.ChainedReference<P_OUT, R>(sink) {@Overridepublic void accept(P_OUT u) {// 这里直接通知下游接收当前操作的结果downstream.accept(mapper.apply(u));}};}};
}

map类似, peek通过action修改元素值, 然后再传递给下游

@Override
public final Stream<P_OUT> peek(Consumer<? super P_OUT> action) {Objects.requireNonNull(action);return new StatelessOp<P_OUT, P_OUT>(this, StreamShape.REFERENCE,0) {@OverrideSink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) {return new Sink.ChainedReference<P_OUT, P_OUT>(sink) {@Overridepublic void accept(P_OUT u) {// 与map十分相似, 只是从Function变为了Consumer// 使得流水线可以对元素的值修改生效action.accept(u);downstream.accept(u);}};}};
}

Sorted实现

sorted就是一个有状态的中间操作, 它需要所有元素都完成转换后才能够进行排序; 通过断点调试, 对于自定义Comparatorsorted操作, 调用的是SizedRefSortingSink, 它可以用于排序有大小的容器. 它本质上就是先将所有的数据都存放到一个集合中, 然后在调用Arrays.sort, 调用完成后再根据是否是短路操作判断是否要通知下游

private static final class SizedRefSortingSink<T> extends AbstractRefSortingSink<T> {private T[] array;private int offset;SizedRefSortingSink(Sink<? super T> sink, Comparator<? super T> comparator) {super(sink, comparator);}@Override@SuppressWarnings("unchecked")public void begin(long size) {if (size >= Nodes.MAX_ARRAY_SIZE)throw new IllegalArgumentException(Nodes.BAD_SIZE);// 1. 创建一个存放待排序元素的列表array = (T[]) new Object[(int) size];}@Overridepublic void end() {// 3. 存放完成后开始排序Arrays.sort(array, 0, offset, comparator);// 排序完成后调用下游操作, 且只将非短路操作下发downstream.begin(offset);if (!cancellationRequestedCalled) {for (int i = 0; i < offset; i++)downstream.accept(array[i]);}else {for (int i = 0; i < offset && !downstream.cancellationRequested(); i++)downstream.accept(array[i]);}downstream.end();array = null;}// 2. 将所有的元素都存放到临时列表当中@Overridepublic void accept(T t) {array[offset++] = t;}
}

limit/skip实现

limit的实现同前两者也类似, 它维护了一个计数器保存limit的参数值, 只有小于这个参数才会执行downstream

@Override
Sink<T> opWrapSink(int flags, Sink<T> sink) {return new Sink.ChainedReference<>(sink) {long n = skip;long m = normalizedLimit;@Overridepublic void begin(long size) {// 多个limit/slice之间通过调用链传递最终的长度downstream.begin(calcSize(size, skip, m));}@Overridepublic void accept(T t) {// n==0, 说明是limit模式, 根据m进行判断if (n == 0) {// 根据计数器来判断是否要执行下一步if (m > 0) {m--;downstream.accept(t);}}// 若是skip模式, 当skip计数器未将为0时, 跳过当前元素else {n--;}}@Overridepublic boolean cancellationRequested() {// limit是短路操作, 当m为0的时候短路整个流水线return m == 0 || downstream.cancellationRequested();}};
}

forEach实现

static final class OfRef<T> extends ForEachOp<T> {final Consumer<? super T> consumer;OfRef(Consumer<? super T> consumer, boolean ordered) {super(ordered);this.consumer = consumer;}@Overridepublic void accept(T t) {consumer.accept(t);}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.xdnf.cn/news/148018.html

如若内容造成侵权/违法违规/事实不符,请联系一条长河网进行投诉反馈,一经查实,立即删除!

相关文章

机器人中的数值优化|【六】线性共轭梯度法,牛顿共轭梯度法

机器人中的数值优化|【六】线性共轭梯度法&#xff0c;牛顿共轭梯度法 往期回顾 机器人中的数值优化|【一】数值优化基础 机器人中的数值优化|【二】最速下降法&#xff0c;可行牛顿法的python实现&#xff0c;以Rosenbrock function为例 机器人中的数值优化|【三】无约束优化…

stm32 - 中断

stm32 - 中断 概念中断向量表NVIC 嵌套中断向量控制器优先级 中断EXTI概念基本结构例子- 对射式红外传感器计次例子 - 旋转编码器 概念 stm32 支持的中断资源&#xff08;都属于外设&#xff09; EXTITIMADCUSARtSPII2C stm32支持的中断 内核中断 外设中断 中断通道与优先级 一…

C# 读取Execl文件3种方法

方法 1&#xff0c;使用OLEDB可以对excel文件进行读取 1.1C#提供的数据连接有哪些 对于不同的.net数据提供者&#xff0c;ADO.NET采用不同的Connection对象连接数据库。这些Connection对我们屏蔽了具体的实现细节&#xff0c;并提供了一种统一的实现方法。 Connection类有四…

【Linux】线程池

目录 一、线程池1.什么是线程池2.线程池图解3.实现代码 二、单例模式1.单例模式的概念2.饿汉方式实现单例模式3.懒汉方式实现单例模式4.懒汉方式实现单例模式的线程池 一、线程池 1.什么是线程池 线程虽然比进程轻量了很多&#xff0c;但是每创建一个线程时&#xff0c;需要向…

UCOS的任务创建和删除

一、任务创建和删除的API函数 1、任务创建和删除本质就是调用uC/OS的函数 API函数 描述 OSTaskCreate() 创建任务 OSTaskDel() 删除任务 注意&#xff1a; 1&#xff0c;使用OSTaskCreate() 创建任务&#xff0c;任务的任务控制块以及任务栈空间所需的内存&#xff0c…

算法——买卖股票问题

309. 买卖股票的最佳时机含冷冻期 - 力扣&#xff08;LeetCode&#xff09; 一、 究其就是个动态规划的问题 算法实现图 初始化 由于有三个阶段&#xff0c;买入&#xff0c;可交易&#xff0c;冷冻期&#xff0c;那么用dp表表示现在为止的最大利润&#xff0c;则有 dp[0][…

asp.net core 远程调试

大概说下过程&#xff1a; 1、站点发布使用Debug模式 2、拷贝到远程服务器&#xff0c;以及iis创建站点。 3、本地的VS2022的安装目录&#xff1a;C:\Program Files\Microsoft Visual Studio\2022\Professional\Common7\IDE下找Remote Debugger 你的服务器是64位就拷贝x64的目…

详解Linux的系统调用fork()函数

在Linux系统中&#xff0c;fork()是一个非常重要的系统调用&#xff0c;它的作用是创建一个新的进程。具体来说&#xff0c;fork()函数会在当前进程的地址空间中复制一份子进程&#xff0c;并且这个子进程几乎完全与父进程相同&#xff0c;包括进程代码、数据、堆栈以及打开的文…

WebSocket实战之四WSS配置

一、前言 上一篇文章WebSocket实战之三遇上PAC &#xff0c;碰到的问题只能上安全的WebSocket&#xff08;WSS&#xff09;才能解决&#xff0c;配置证书还是挺麻烦的&#xff0c;主要是每年都需要重新更新证书&#xff0c;我配置过的证书最长有效期也只有两年&#xff0c;搞不…

ElasticSearch第四讲:ES详解:ElasticSearch和Kibana安装

ElasticSearch第四讲&#xff1a;ES详解&#xff1a;ElasticSearch和Kibana安装 本文是ElasticSearch第四讲&#xff1a;ElasticSearch和Kibana安装&#xff0c;主要介绍ElasticSearch和Kibana的安装。了解完ElasticSearch基础和Elastic Stack生态后&#xff0c;我们便可以开始…

ctfshow—1024系列练习

1024 柏拉图 有点像rce远程执行&#xff0c;有四个按钮&#xff0c;分别对应四份php文件&#xff0c;开始搞一下。一开始&#xff0c;先要试探出 文件上传到哪里&#xff1f; 怎么读取上传的文件&#xff1f; 第一步&#xff1a;试探上传文件位置 直接用burp抓包&#xff0c;…

力扣练习——链表在线OJ

目录 提示&#xff1a; 一、移除链表元素 题目&#xff1a; 解答&#xff1a; 二、反转链表 题目&#xff1a; 解答&#xff1a; 三、找到链表的中间结点 题目&#xff1a; 解答&#xff1a; 四、合并两个有序链表&#xff08;经典&#xff09; 题目&#xff1a; 解…

【数据结构---排序】很详细的哦

本篇文章介绍数据结构中的几种排序哦~ 文章目录 前言一、排序是什么&#xff1f;二、排序的分类 1.直接插入排序2.希尔排序3.选择排序4.冒泡排序5.快速排序6.归并排序总结 前言 排序在我们的生活当中无处不在&#xff0c;当然&#xff0c;它在计算机程序当中也是一种很重要的操…

聊聊常见的IO模型 BIO/NIO/AIO 、DIO、多路复用等IO模型

聊聊常见的IO模型 BIO/NIO/AIO/DIO、IO多路复用等IO模型 文章目录 一、前言1. 什么是IO模型2. 为什么需要IO模型 二、常见的IO模型1. 同步阻塞IO&#xff08;Blocking IO&#xff0c;BIO&#xff09;2. 同步非阻塞IO&#xff08;Non-blocking IO&#xff0c;NIO&#xff09;3.…

排序算法之【希尔排序】

&#x1f4d9;作者简介&#xff1a; 清水加冰&#xff0c;目前大二在读&#xff0c;正在学习C/C、Python、操作系统、数据库等。 &#x1f4d8;相关专栏&#xff1a;C语言初阶、C语言进阶、C语言刷题训练营、数据结构刷题训练营、有感兴趣的可以看一看。 欢迎点赞 &#x1f44d…

八大排序源码(含优化)

文章目录 1、直接插入排序2、希尔排序3、选择排序4、冒泡排序5、堆排序6、快速排序快速排序递归实现霍尔法挖坑法前后指针法快速排序小区间优化 快速排序非递归实现 7、归并排序归并排序递归实现归并排序非递归 8、计数排序 大家好&#xff0c;我是纪宁&#xff0c;这篇文章是关…

java Spring Boot 自动启动热部署 (别再改点东西就要重启啦)

上文 java Spring Boot 手动启动热部署 我们实现了一个手动热部署的代码 但其实很多人会觉得 这叫说明热开发呀 这么捞 写完还要手动去点一下 很不友好 其实我们开发人员肯定是希望重启这种事不需要自己手动去做 那么 当然可以 我们就让它自己去做 Build Project 这个操作 我们…

Linux性能优化--性能工具:系统内存

3.0.概述 本章概述了系统级的Linux内存性能工具。本章将讨论这些工具可以测量的内存统计信息&#xff0c;以及如何使用各种工具收集这些统计结果。阅读本章后&#xff0c;你将能够&#xff1a; 理解系统级性能的基本指标&#xff0c;包括内存的使用情况。明白哪些工具可以检索…

Java21 新特性

文章目录 1. 概述2. JDK21 安装与配置3. 新特性3.1 switch模式匹配3.2 字符串模板3.3 顺序集合3.4 记录模式&#xff08;Record Patterns&#xff09;3.5 未命名类和实例的main方法&#xff08;预览版&#xff09;3.6 虚拟线程 1. 概述 2023年9月19日 &#xff0c;Oracle 发布了…

电子计算机核心发展(继电器-真空管-晶体管)

目录 继电器 最大的机电计算机之一——哈弗Mark1号&#xff0c;IBM1944年 背景 组成 性能 核心——继电器 简介 缺点 速度 齿轮磨损 Bug的由来 真空管诞生 组成 控制开关电流 继电器对比 磨损 速度 缺点 影响 代表 第一个可编程计算机 第一个真正通用&am…