Stream常用操作

Stream常用操作

Scroll Down

Stream常用操作

1.简介

1.1介绍

Java应用程序很少不用到集合类和数组,对集合的操作却不像sql那样方便简捷,java8的流支持这种简明的数据库查询式编程——但用的是Java语法,而无需了解数据库。我们要学习的是这种流式风格更是流式思想。
流是一系列数据项,一次只生成一项。程序可以从输入流中一个一个读取数据项,然后以同样的方式将数据项写入输出流。我们获取的数据来源都转化成流,通过加工流水线后产生另一个流,基于这一思想的就是StreamAPI,你可以将它看成遍历数据集的高级迭代器,在一个更高的抽象层次上写更简洁的Java 8代码。
java8对流做一些集合所不能的优化操作,设计成无需同时将所有的数据调入内存,这样就可以处理无法装入计算机内存的流数据,使之更高效的聚合操作(过滤、排序、统计分组)或者大数据批量操作(多核并行流处理)。
集合与流之间的差异就在于什么时候进行计算。集合是一个内存中的数据结构, 它包含数据结构中目前所有的值——集合中的每个元素都得先算出来才能添加到集合中。
流则是在概念上固定的数据结构(你不能添加或删除元素),其元素则是按需计算的。流只能遍历一次。遍历完之后,我们就说这个流已经被消费掉了。

2.Stream实际应用

2.1运行效率与优劣

实验一为基本类型数据,上述三个实验的结果可以总结如下:
1.对于简单操作,比如最简单的遍历,Stream串行API性能明显差于外部迭代,但并行的Stream API能够发挥多核特性。
2.对于复杂操作,Stream串行API性能可以和手动实现的效果略好,在并行执行时Stream API效果远超手动实现。
所以,如果出于性能考虑,1. 对于简单操作推荐使用外部迭代手动实现,2. 对于复杂操作,推荐使用Stream API, 3. 在多核情况下,推荐使用并行Stream API来发挥多核优势,4.单核情况下不建议使用并行Stream API。
如果出于代码简洁性考虑,使用Stream API能够写出更短的代码。简单操作消耗也是很低的,从性能方面说使用Stream API另外一个优势,那就是只要Java Stream类库做了升级优化,代码不用做任何修改就能享受到升级带来的好处。
2.2代码示例
在我们项目中使用stream的场景也是有很多,最多的还是因为平台框架的原因,会把数据转化成数组或者集合。但在代码评审中发现些奇怪的stream用法
原代码:

优化后代码:
发现都不需要返回list,流可以同时做到映射、去重和统计,不需要创建list遍历添加元素,而且在下面代码只用到获取长度,而这和approvalDys是相等的。

以及foreach

终结操作以外的操作,尽量避免副作用,避免突变基于堆栈的引用;传递给流操作的数据源应该是互不干扰(避免修改数据源)。

3.Stream工作过程

Stream流是连续的数据,生成流的数据源可以从集合list、set、array等类型数据获取,然后用stream进行中间操作处理包括过滤、排序、去重、聚合、映射等,最后执行终端操作存储和返回行程闭环。只需要专注stream的处理操作,代码更统一规范,熟练后省心省事。

3.1数据源

stream能从各数据源汇集数据进行统计处理,源数据可以是集合、数组、io等。
常用的创建stream的方式:

其它的等等:
Files类的获取文件路径列表: find(), lines(), list(), walk();

3.2中间操作

惰性编程、延时求值
中间操作只是对操作进行了记录,只有结束操作才会触发实际的计算(即惰性求值),这也是Stream在迭代大集合时高效的原因之一。

常用的流操作:
无状态中间操作是指元素的处理不受前面元素的影响,而有状态的中间操作必须等到所有元素处理之后才知道最终结果,比如排序是有状态操作,在读取所有元素之前并不能确定排序结果;
filter:接收boolean表达式过滤元素
distin:去重
map:将流中元素1:1映射生成新元素
sorted:排序
Peek:调试用,输出Stream流水线操作之前和操作之后的中间值

有些人习惯多次过滤,其实可以同时过滤多个条件的

3.3终端操作

只能有一个,而且是最后一个,最终产生结果,常用的:
Collect:收集器,返回新的元素集合,可以返回list、set、map等
toArray:返回数组
forEach:消费流中的每个元素并对其应用 Lambda。这一操作返回 void
allMatch:查找与匹配
结束操作可以分为短路操作和非短路操作,短路操作是指不用处理全部元素就可以返回结果,找到第符合条件的元素就可以得到最终结果。而后者是指必须处理所有元素才能得到最终结果。

流的流水线背后的理念类似于构建器模式。在构建器模式中有一个调用链用来设置一套配
置(对流来说这就是一个中间操作链),接着是调用built方法(对流来说就是终端操作)。

4.Stream原理

4.1Stream流与Collection区别

Stream操作特征
Pipelining: 中间操作都会返回流对象本身。 这样多个操作可以串联成一个管道, 如同流式风格(fluent style)。 这样做可以对操作进行优化, 比如延迟执行(laziness)和短路( short-circuiting)。
内部迭代: 以前对集合遍历都是通过Iterator或者For-Each的方式, 显式的在集合外部进行迭代, 这叫做外部迭代。 Stream提供了内部迭代的方式, 通过访问者模式(Visitor)实现。当终止操作时会将我们编写的代码和流自身的数据方法一同执行。

在访问者模式(Visitor Pattern)中,我们使用了一个访问者类,它改变了元素类的执行算法。通过这种方式,元素的执行算法可以随着访问者改变而改变。这种类型的设计模式属于行为型模式。根据模式,元素对象已接受访问者对象,这样访问者对象就可以处理元素对象上的操作。

从实现角度比较,Stream和Collection也有众多不同:
不存储数据。 流不是一个存储元素的数据结构。 它只是传递源(source)的数据。
功能性的(Functional in nature)。 在流上操作只是产生一个结果,不会修改源。 例如filter只是生成一个筛选后的stream,不会删除源里的元素。
延迟搜索。 许多流操作, 如filter, map等,都是延迟执行。 中间操作总是lazy的。
Stream可能是无界的。 而集合总是有界的(元素数量是有限大小)。 短路操作如limit(n) , findFirst()可以在有限的时间内完成在无界的stream
可消费的(Consumable)。 不是太好翻译, 意思流的元素在流的声明周期内只能访问一次。 再次访问只能再重新从源中生成一个Stream

4.2Stream流水线介绍

流是一串连续的数据操作过程,如果每一次函数调用都执一次迭代,并将处理中间结果放到某种数据结构中,这样做实现起来非常简单直观,但有两个明显的弊端,这些弊端使得效率底下
1.迭代次数多。迭代次数跟函数调用的次数相等。
2.频繁产生中间结果。每次函数调用都产生一次中间结果,存储开销无法接受。
Stream流减少了迭代次数,也避免了存储中间结果,会把多个操作放在了一次迭代当中。类似代码实现:

只要事先知道用户意图,总是能够采用上述方式实现跟Stream API等价的功能。如何在无法假设用户行为的前提下实现流水线,是类库的设计者要考虑的问题。

4.3Stream包结构分析

Stream包的结构

其中各个部分的主要功能为:
1.主要是各种操作的工厂类、数据的存储结构以及收集器的工厂类等;
2.主要用于Stream的惰性求值实现;
3.Stream的并行计算框架;
4.存储并行流的中间结果;
5.终结操作的定义;
我们单独把第二部分拎出来用于说明Stream的惰性求值实现,如图1-2所示,Java8针对Int、long、double进行了优化,主要用于频繁的拆装箱。我们以引用类型进行介绍,在图中已经标为绿色。
BaseStream规定了流的基本接口,比如iterator、spliterator、isParallel等;
Stream中定义了map、filter、flatmap等用户关注的常用操作;
PipelineHelper主要用于Stream执行过程中相关结构的构建;
Head、StatelessOp、StatefulOp为ReferencePipeline中的内部类。

4.4Stream流水线解决方案

我们大致能够想到,应该采用某种方式记录用户每一步的操作,当用户调用结束操作时将之前记录的操作叠加到一起在一次迭代中全部执行掉。沿着这个思路,有几个问题需要解决:
1.用户的操作如何记录?
2.操作如何叠加?
3.叠加之后的操作如何执行?
4.执行后的结果(如果有)在哪里?
操作记录
“操作(operation)”一词,指的是“Stream中间操作”的操作,很多Stream操作会需要一个回调函数(Lambda表达式),因此一个完整的操作是<数据来源,操作,回调函数>构成的三元组。Stream中使用Stage的概念来描述一个完整的操作,并用某种实例化后的PipelineHelper来代表Stage,将具有先后顺序的各个Stage连到一起,就构成了整个流水线。跟Stream相关类和接口的继承关系图示:

还有IntPipeline, LongPipeline, DoublePipeline没在图中画出,这三个类专门为三种基本类型(不是包装类型)而定制的,跟ReferencePipeline是并列关系。图中Head用于表示第一个Stage,即调用调用诸如Collection.stream()方法产生的Stage,很显然这个Stage里不包含任何操作;StatelessOp和StatefulOp分别表示无状态和有状态的Stage,对应于无状态和有状态的中间操作。

图中通过Collection.stream()方法得到Head也就是stage0,紧接着调用一系列的中间操作,不断产生新的Stream。这些Stream对象以双向链表的形式组织在一起,构成整个流水线,由于每个Stage都记录了前一个Stage和本次的操作以及回调函数,依靠这种结构就能建立起对数据源的所有操作。这就是Stream记录操作的方式。

操作叠加
以上只是解决了操作记录的问题,要想让流水线起到应有的作用我们需要一种将所有操作叠加到一起的方案。前面的Stage并不知道后面Stage到底执行了哪种操作,以及回调函数是哪种形式。
在上一步已经在stage中记录了每一步操作,此时并没有执行。但是stage只是保存了当前的操作,并不能确定下一个stage需要何种操作,何种数据,其实JDK为此定义了Sink接口,其中只有begin()、end()、cancellationRequested()、accept()四个接口(如表1-2所示),其中中间操作的子类中包含一个指向下游sink的指针。
Sink接口包含的方法如下表所示:

有了上面的协议,相邻Stage之间调用就很方便了,每个Stage都会将自己的操作封装到一个Sink里,前一个Stage只需调用后一个Stage的accept()方法即可,并不需要知道其内部是如何处理的。当然对于有状态的操作,Sink的begin()和end()方法也是必须实现的。比如Stream.sorted()是一个有状态的中间操作,其对应的Sink.begin()方法可能创建一个乘放结果的容器,而accept()方法负责将元素添加到该容器,最后end()负责对容器进行排序。对于短路操作,Sink.cancellationRequested()也是必须实现的,比如Stream.findFirst()是短路操作,只要找到一个元素,cancellationRequested()就应该返回true,以便调用者尽快结束查找。Sink的四个接口方法常常相互协作,共同完成计算任务。实际上Stream API内部实现的的本质,就是如何重载Sink的这四个接口方法。
有了Sink对操作的包装,Stage之间的调用问题就解决了,执行时只需要从流水线的head开始对数据源依次调用每个Stage对应的Sink.{begin(), accept(), cancellationRequested(), end()}方法就可以了。一种可能的Sink.accept()方法流程是这样的:

Sink接口的其他几个方法也是按照这种[处理->转发]的模型实现。下面我们结合具体例子看看Stream的中间操作是如何将自身的操作包装成Sink以及Sink是如何将处理结果转发给下一个Sink的。先看Stream.map()方法:

上述代码逻辑其实就是将回调函数mapper包装到一个Sink当中。由于Stream.map()是一个无状态的中间操作,所以map()方法返回了一个StatelessOp内部类对象(一个新的Stream),调用这个新Stream的opWripSink()方法将得到一个包装了当前回调函数的Sink。
再来看一个复杂一点的例子。Stream.sorted()方法将对Stream中的元素进行排序,显然这是一个有状态的中间操作,因为读取所有元素之前是没法得到最终顺序的。抛开模板代码直接进入问题本质,sorted()方法是如何将操作封装成Sink的呢?sorted()一种可能封装的Sink代码如下:

上述代码完美的展现了Sink的四个接口方法是如何协同工作的:
1.首先beging()方法告诉Sink参与排序的元素个数,方便确定中间结果容器的的大小;
2.之后通过accept()方法将元素添加到中间结果当中,最终执行时调用者会不断调用该方法,直到遍历所有元素;
3.最后end()方法告诉Sink所有元素遍历完毕,启动排序步骤,排序完成后将结果传递给下游的Sink;
4.如果下游的Sink是短路操作,将结果传递给下游时不断询问下游cancellationRequested()是否可以结束处理。
叠加之后的操作执行

Sink完美封装了Stream每一步操作,并给出了[处理->转发]的模式来叠加操作。这一连串的齿轮已经咬合,就差最后一步拨动齿轮启动执行。是什么启动这一连串的操作呢?也许你已经想到了启动的原始动力就是结束操作(Terminal Operation),一旦调用某个结束操作,就会触发整个流水线的执行。
结束操作之后不能再有别的操作,所以结束操作不会创建新的流水线阶段(Stage),直观的说就是流水线的链表不会在往后延伸了。结束操作会创建一个包装了自己操作的Sink,这也是流水线中最后一个Sink,这个Sink只需要处理数据而不需要将结果传递给下游的Sink(因为没有下游)。对于Sink的[处理->转发]模型,结束操作的Sink就是调用链的出口。
我们再来考察一下上游的Sink是如何找到下游Sink的。一种可选的方案是在PipelineHelper中设置一个Sink字段,在流水线中找到下游Stage并访问Sink字段即可。但Stream类库的设计者没有这么做,而是设置了一个Sink AbstractPipeline.opWrapSink(int flags, Sink downstream)方法来得到Sink,该方法的作用是返回一个新的包含了当前Stage代表的操作以及能够将结果传递给downstream的Sink对象。为什么要产生一个新对象而不是返回一个Sink字段?这是因为使用opWrapSink()可以将当前操作与下游Sink(上文中的downstream参数)结合成新Sink。试想只要从流水线的最后一个Stage开始,不断调用上一个Stage的opWrapSink()方法直到最开始(不包括stage0,因为stage0代表数据源,不包含操作),就可以得到一个代表了流水线上所有操作的Sink,用代码表示就是这样:

现在转向code_3,可以看出,在satge链中,每一步都包含了opWrapSink()。当调用终结操作时,将会触发code_5从最后一个stage(终结操作产生的satge)开始,递归产生图1-4所示的结构。
现在流水线上从开始到结束的所有的操作都被包装到了一个Sink里,执行这个Sink就相当于执行整个流水线,执行Sink的代码如下:

执行后的结果
所有的操作已经形成了图1-4的结构,接下来就会触发code_6,此时结果就会产生对应的结果

最后一个问题是流水线上所有操作都执行后,用户所需要的结果(如果有)在哪里?首先要说明的是不是所有的Stream结束操作都需要返回结果,有些操作只是为了使用其副作用(Side-effects),比如使用Stream.forEach()方法将结果打印出来就是常见的使用副作用的场景,大多数使用副作用的地方都可以使用归约操作更安全和有效的完成。对于真正需要返回结果的结束操作结果存在哪里呢?

有返回值的结束操作表

1.对于表中返回boolean或者Optional的操作(Optional是存放 一个 值的容器)的操作,由于值返回一个值,只需要在对应的Sink中记录这个值,等到执行结束时返回就可以了。
2.对于归约操作,最终结果放在用户调用时指定的容器中(容器类型通过收集器指定)。collect(), reduce(), max(), min()都是归约操作,虽然max()和min()也是返回一个Optional,但事实上底层是通过调用reduce()方法实现的。
3.对于返回是数组的情况,毫无疑问的结果会放在数组当中。这么说当然是对的,但在最终返回数组之前,结果其实是存储在一种叫做Node的数据结构中的。Node是一种多叉树结构,元素存储在树的叶子当中,并且一个叶子节点可以存放多个元素。这样做是为了并行执行方便。关于Node的具体结构,我们会在下一节探究Stream如何并行执行时给出详细说明。

4.5并行原理

并行原理
那么,Stream是如何并行执行的呢?其实产生stage链的过程和串行并没有区别,只是在最终执行时进行了相应的调整改变为code_7

那么最终产生的stage链与sink的结构如图1-5所示,因为此时stage链中有一个有状态操作(sorted()),也就是说在这里必须处理完所有元素才能进行下一步操作。那么此时无论是并行还是串行,此时都会产生两个sink链,也就是代表了两次迭代,才产生了最终结果。

那么,究竟是如何并行的呢?其实当调用collect操作时会调用code_8,其中的evaluateParallel()如code_9所示。

其实Stream的并行处理是基于ForkJoin框架的,相关类与接口的结构如图1-6所示。其中AbstractShortCircuitTask用于处理短路操作,其他相关操作类似,会产生对应的Task。

关于code_8中获取源Spliterator,如code_10所示,

并行执行
关于各个task就行是如何并行执行,其实最终调用的是code_11所示,对应的流程如图1-7所示,其中交替fork子节点是为了缓和数据分片不均造成的性能退化。

影响并行流的因素
数据大小;源数据结构(分割越容易越好),arraylist、数组比较好,hashSet、treeSet次之,linked最差;装箱;核的数量(可使用);单元处理开销(越大越好)
建议:
终结操作以外的操作,尽量避免副作用,避免突变基于堆栈的引用,或者在执行过程中进行任何I/O;传递给流操作的数据源应该是互不干扰(避免修改数据源)。

5.总结

5.1总结

这里主要介绍了Stream实现原理,以及流水线的结构和执行过程,有助于理解原理并写出正确的Stream代码,同时打消你对Stream API效率方面的顾虑。如你所见,Stream API实现如此巧妙,即使我们使用外部迭代手动编写等价代码,也未必更加高效。

5.2参考资料

Java 8 in Action(Java8实战中文版)
https://www.cnblogs.com/CarpenterLee/p/6637118.html(深入理解stream流水线)
https://www.cnblogs.com/Dorae/p/7779246.html(stream原理解析)
https://colobu.com/2014/11/18/Java-8-Stream/(stream探秘)