Java8 Stream编程源码分析
本文章主要是记录自己初步学习,所以部分资料非本人原创,我会将引用到的相关资料放在最后的
参考资料
中。
前期准备
Spliterator源码分析
Spliterator是一个可分割迭代器(splitable iterator),即可将迭代器进行分割,从则实现并行处理效果,在多核处理器场景下相比串行处理有很大优势。
以ArrayList.stream();为入口
Spliterator源码分析
package java.util;
import java.util.function.Consumer;
import java.util.function.DoubleConsumer;
import java.util.function.IntConsumer;
import java.util.function.LongConsumer;
public interface Spliterator<T> {
//通过characteristics()方法返回的值,用来标识实现类所具有的的特征
//表示元素是有序的(每一次遍历结果相同)
public static final int ORDERED = 0x00000010;
//表示元素不重复
public static final int DISTINCT = 0x00000001;
//表示元素是按一定规律进行排列(有指定比较器)
public static final int SORTED = 0x00000004;
//是否确定大小
public static final int SIZED = 0x00000040;
//表示迭代器中没有null元素
public static final int NONNULL = 0x00000100;
//表示元素不可变
public static final int IMMUTABLE = 0x00000400;
//表示迭代器可以多线程操作
public static final int CONCURRENT = 0x00001000;
//表示子Spliterators都具有SIZED特性
public static final int SUBSIZED = 0x00004000;
//如果有剩余的元素存在,执行参数给定的操作,并返回true,否则就返回false。
//如果Spliterator对象具有ORDERED属性,那么tryAdvance也会按照相应的顺序去执行。
boolean tryAdvance(Consumer<? super T> var1);
//对剩余元素执行给定的动作,依次处理,直到所有元素已被处理或被异常终止。默认方法调用tryAdvance方法
default void forEachRemaining(Consumer<? super T> var1) {
while(this.tryAdvance(var1)) {
;
}
}
//对任务分割,返回一个新的Spliterator迭代器
Spliterator<T> trySplit();
//用于估算(estimate)还剩下多少个元素需要遍历
long estimateSize();
//当迭代器拥有SIZED特征时,返回剩余元素个数;否则返回-1
default long getExactSizeIfKnown() {
return (this.characteristics() & 64) == 0 ? -1L : this.estimateSize();
}
//返回当前对象有哪些特征值
int characteristics();
//是否具有当前特征值
default boolean hasCharacteristics(int var1) {
return (this.characteristics() & var1) == var1;
}
//如果Spliterator的list是通过Comparator排序的,则返回Comparator
//如果Spliterator的list是自然排序的 ,则返回null
//其他情况下抛错
default Comparator<? super T> getComparator() {
throw new IllegalStateException();
}
public interface OfDouble extends Spliterator.OfPrimitive<Double, DoubleConsumer, Spliterator.OfDouble> {
...
}
public interface OfInt extends Spliterator.OfPrimitive<Integer, IntConsumer, Spliterator.OfInt> {
...
}
public interface OfLong extends Spliterator.OfPrimitive<Long, LongConsumer, Spliterator.OfLong> {
...
}
public interface OfPrimitive<T, T_CONS, T_SPLITR extends Spliterator.OfPrimitive<T, T_CONS, T_SPLITR>> extends Spliterator<T> {
...
}
}
获取对比度Spliterator
//1、Collection
default Stream<E> stream() {
return StreamSupport.stream(spliterator(), false);
}
//2、Collection
default Spliterator<E> spliterator() {
//this为当前的ArrayList引用
return Spliterators.spliterator(this, 0);
}
//3、Spliterator工具类Spliterators
public static <T> Spliterator<T> spliterator(Collection<? extends T> c,
int characteristics) {
return new IteratorSpliterator<>(Objects.requireNonNull(c),
characteristics);
}
//IteratorSpliterator为Spliterators内部类,实现了Spliterator
public IteratorSpliterator(Collection<? extends T> collection, int characteristics) {
//保留对集合的引用
this.collection = collection;
this.it = null;
this.characteristics = (characteristics & Spliterator.CONCURRENT) == 0
? characteristics | Spliterator.SIZED | Spliterator.SUBSIZED
: characteristics;
}
Stream编程思想
一个Stream 包含一个流来源、0 或多个中间操作,以及一个终止操作。
流来源可以是集合、数组、生成器函数或其他任何适当地提供了其元素的访问权的数据源。
中间操作将流转换为其他流 — 通过过滤元素 (filter()
),转换元素 (map()
),排序元素 (sorted()
),将流截断为一定大小 (limit()
),等等。
终止操作包括聚合(reduce()
、collect()
),搜索 (findFirst()
) 和迭代 (forEach()
)。
流管道是惰性构造的。构造流来源不会计算流的元素,而是会确定在必要时如何找到元素。类似地,调用中间操作不会在元素上执行任何计算;只会将另一个操作添加到流描述的末尾。仅在调用终止操作时,管道才会实际执行相应的工作:计算元素,应用中间操作,以及应用终止操作。这种执行方法使得执行多项有趣的优化成为可能。
Java中的Stream
借助 java.util.stream
包,您可以简明地、声明性地表达集合、数组和其他数据源上可能的并行批量操作。
如何使用Java中Stream
- 创建流
- 执行中间操作
- 执行终止操作
Stream流水线组织结构示意图如下:
简要继承
详细继承
PipelineHelper源码
package java.util.stream;
import java.util.Spliterator;
import java.util.function.IntFunction;
import java.util.stream.Node.Builder;
/**
*为了计算,“流”操作组成了一个流管道。一个流管道包括数据源、中间操作和终端操作。数据源可以是数组、集合、*I/O通道和生成函数。而中间操作则是像过滤filter 或者map这种将一个流转换为另一个流的操作。那终端操作呢,就*是产生一个结果或者别的副作用(转为集合或者统计成一个数字)。流是惰性的,源数据的计算只在终端操作启动时操
*作,流只在需要时消费。
* PipelineHelper为管道抽象
**/
abstract class PipelineHelper<P_OUT> {
PipelineHelper() {
}
/**获取输出形状 StreamShape 是个枚举类型:
REFERENCE,INT_VALUE,LONG_VALUE,DOUBLE_VALUE
正常都是引用类型 REFERENCE
*/
abstract StreamShape getSourceShape();
//获取组合标志,包含流标志和操作标志
abstract int getStreamAndOpFlags();
/** 如果已知,则返回将这个 PipelineHelper 所描述的管道阶段应用到提供的Spliterator 所描述的输入部 分所产生的输出部分的精确大小。如果不知道或不知道无穷大,则返回 -1。
确定“知不知道”,则判断提供的Spliterator 是否有特征值SIZED
*/
abstract <P_IN> long exactOutputSizeIfKnown(Spliterator<P_IN> var1);
/**该方法先调用上面的wrapSink 方法,然后使用返回的Sink 处理Spliterator中的数据*/
abstract <P_IN, S extends Sink<P_OUT>> S wrapAndCopyInto(S var1, Spliterator<P_IN> var2);
abstract <P_IN> void copyInto(Sink<P_IN> var1, Spliterator<P_IN> var2);
abstract <P_IN> void copyIntoWithCancel(Sink<P_IN> var1, Spliterator<P_IN> var2);
/**当终端操作初始化的时候调用该方法将所有实现PipelineHelper类型的中间操作包装到给定的Sink中 ,并返回这个Sink*/
abstract <P_IN> Sink<P_IN> wrapSink(Sink<P_OUT> var1);
abstract <P_IN> Spliterator<P_OUT> wrapSpliterator(Spliterator<P_IN> var1);
abstract Builder<P_OUT> makeNodeBuilder(long var1, IntFunction<P_OUT[]> var3);
/** 在遇到终端操作时执行该方法*/
abstract <P_IN> Node<P_OUT> evaluate(Spliterator<P_IN> var1, boolean var2, IntFunction<P_OUT[]> var3);
}
AbstractPipeline源码
package java.util.stream;
import java.util.Objects;
import java.util.Spliterator;
import java.util.function.IntFunction;
import java.util.function.Supplier;
import java.util.stream.Node.Builder;
abstract class AbstractPipeline<E_IN, E_OUT, S extends BaseStream<E_OUT, S>> extends PipelineHelper<E_OUT> implements BaseStream<E_OUT, S> {
private static final String MSG_STREAM_LINKED = "stream has already been operated upon or closed";
private static final String MSG_CONSUMED = "source already consumed or closed";
private final AbstractPipeline sourceStage;
private final AbstractPipeline previousStage;
protected final int sourceOrOpFlags;
private AbstractPipeline nextStage;
private int depth;
private int combinedFlags;
private Spliterator<?> sourceSpliterator;
private Supplier<? extends Spliterator<?>> sourceSupplier;
private boolean linkedOrConsumed;
private boolean sourceAnyStateful;
private Runnable sourceCloseAction;
private boolean parallel;
AbstractPipeline(Supplier<? extends Spliterator<?>> var1, int var2, boolean var3) {
this.previousStage = null;
this.sourceSupplier = var1;
this.sourceStage = this;
this.sourceOrOpFlags = var2 & StreamOpFlag.STREAM_MASK;
this.combinedFlags = ~(this.sourceOrOpFlags << 1) & StreamOpFlag.INITIAL_OPS_VALUE;
this.depth = 0;
this.parallel = var3;
}
AbstractPipeline(Spliterator<?> var1, int var2, boolean var3) {
this.previousStage = null;
this.sourceSpliterator = var1;
this.sourceStage = this;
this.sourceOrOpFlags = var2 & StreamOpFlag.STREAM_MASK;
this.combinedFlags = ~(this.sourceOrOpFlags << 1) & StreamOpFlag.INITIAL_OPS_VALUE;
this.depth = 0;
this.parallel = var3;
}
AbstractPipeline(AbstractPipeline<?, E_IN, ?> var1, int var2) {
if (var1.linkedOrConsumed) {
throw new IllegalStateException("stream has already been operated upon or closed");
} else {
var1.linkedOrConsumed = true;
var1.nextStage = this;
this.previousStage = var1;
this.sourceOrOpFlags = var2 & StreamOpFlag.OP_MASK;
this.combinedFlags = StreamOpFlag.combineOpFlags(var2, var1.combinedFlags);
this.sourceStage = var1.sourceStage;
if (this.opIsStateful()) {
this.sourceStage.sourceAnyStateful = true;
}
this.depth = var1.depth + 1;
}
}
final <R> R evaluate(TerminalOp<E_OUT, R> var1) {
assert this.getOutputShape() == var1.inputShape();
if (this.linkedOrConsumed) {
throw new IllegalStateException("stream has already been operated upon or closed");
} else {
this.linkedOrConsumed = true;
return this.isParallel() ? var1.evaluateParallel(this, this.sourceSpliterator(var1.getOpFlags())) : var1.evaluateSequential(this, this.sourceSpliterator(var1.getOpFlags()));
}
}
final Node<E_OUT> evaluateToArrayNode(IntFunction<E_OUT[]> var1) {
if (this.linkedOrConsumed) {
throw new IllegalStateException("stream has already been operated upon or closed");
} else {
this.linkedOrConsumed = true;
if (this.isParallel() && this.previousStage != null && this.opIsStateful()) {
this.depth = 0;
return this.opEvaluateParallel(this.previousStage, this.previousStage.sourceSpliterator(0), var1);
} else {
return this.evaluate(this.sourceSpliterator(0), true, var1);
}
}
}
final Spliterator<E_OUT> sourceStageSpliterator() {
if (this != this.sourceStage) {
throw new IllegalStateException();
} else if (this.linkedOrConsumed) {
throw new IllegalStateException("stream has already been operated upon or closed");
} else {
this.linkedOrConsumed = true;
Spliterator var1;
if (this.sourceStage.sourceSpliterator != null) {
var1 = this.sourceStage.sourceSpliterator;
this.sourceStage.sourceSpliterator = null;
return var1;
} else if (this.sourceStage.sourceSupplier != null) {
var1 = (Spliterator)this.sourceStage.sourceSupplier.get();
this.sourceStage.sourceSupplier = null;
return var1;
} else {
throw new IllegalStateException("source already consumed or closed");
}
}
}
public final S sequential() {
this.sourceStage.parallel = false;
return this;
}
public final S parallel() {
this.sourceStage.parallel = true;
return this;
}
public void close() {
this.linkedOrConsumed = true;
this.sourceSupplier = null;
this.sourceSpliterator = null;
if (this.sourceStage.sourceCloseAction != null) {
Runnable var1 = this.sourceStage.sourceCloseAction;
this.sourceStage.sourceCloseAction = null;
var1.run();
}
}
public S onClose(Runnable var1) {
Runnable var2 = this.sourceStage.sourceCloseAction;
this.sourceStage.sourceCloseAction = var2 == null ? var1 : Streams.composeWithExceptions(var2, var1);
return this;
}
public Spliterator<E_OUT> spliterator() {
if (this.linkedOrConsumed) {
throw new IllegalStateException("stream has already been operated upon or closed");
} else {
this.linkedOrConsumed = true;
if (this == this.sourceStage) {
if (this.sourceStage.sourceSpliterator != null) {
Spliterator var2 = this.sourceStage.sourceSpliterator;
this.sourceStage.sourceSpliterator = null;
return var2;
} else if (this.sourceStage.sourceSupplier != null) {
Supplier var1 = this.sourceStage.sourceSupplier;
this.sourceStage.sourceSupplier = null;
return this.lazySpliterator(var1);
} else {
throw new IllegalStateException("source already consumed or closed");
}
} else {
return this.wrap(this, () -> {
return this.sourceSpliterator(0);
}, this.isParallel());
}
}
}
public final boolean isParallel() {
return this.sourceStage.parallel;
}
final int getStreamFlags() {
return StreamOpFlag.toStreamFlags(this.combinedFlags);
}
private Spliterator<?> sourceSpliterator(int var1) {
Spliterator var2 = null;
if (this.sourceStage.sourceSpliterator != null) {
var2 = this.sourceStage.sourceSpliterator;
this.sourceStage.sourceSpliterator = null;
} else {
if (this.sourceStage.sourceSupplier == null) {
throw new IllegalStateException("source already consumed or closed");
}
var2 = (Spliterator)this.sourceStage.sourceSupplier.get();
this.sourceStage.sourceSupplier = null;
}
if (this.isParallel() && this.sourceStage.sourceAnyStateful) {
int var3 = 1;
AbstractPipeline var4 = this.sourceStage;
AbstractPipeline var5 = this.sourceStage.nextStage;
for(AbstractPipeline var6 = this; var4 != var6; var5 = var5.nextStage) {
int var7 = var5.sourceOrOpFlags;
if (var5.opIsStateful()) {
var3 = 0;
if (StreamOpFlag.SHORT_CIRCUIT.isKnown(var7)) {
var7 &= ~StreamOpFlag.IS_SHORT_CIRCUIT;
}
var2 = var5.opEvaluateParallelLazy(var4, var2);
var7 = var2.hasCharacteristics(64) ? var7 & ~StreamOpFlag.NOT_SIZED | StreamOpFlag.IS_SIZED : var7 & ~StreamOpFlag.IS_SIZED | StreamOpFlag.NOT_SIZED;
}
var5.depth = var3++;
var5.combinedFlags = StreamOpFlag.combineOpFlags(var7, var4.combinedFlags);
var4 = var5;
}
}
if (var1 != 0) {
this.combinedFlags = StreamOpFlag.combineOpFlags(var1, this.combinedFlags);
}
return var2;
}
final StreamShape getSourceShape() {
AbstractPipeline var1;
for(var1 = this; var1.depth > 0; var1 = var1.previousStage) {
;
}
return var1.getOutputShape();
}
final <P_IN> long exactOutputSizeIfKnown(Spliterator<P_IN> var1) {
return StreamOpFlag.SIZED.isKnown(this.getStreamAndOpFlags()) ? var1.getExactSizeIfKnown() : -1L;
}
final <P_IN, S extends Sink<E_OUT>> S wrapAndCopyInto(S var1, Spliterator<P_IN> var2) {
this.copyInto(this.wrapSink((Sink)Objects.requireNonNull(var1)), var2);
return var1;
}
final <P_IN> void copyInto(Sink<P_IN> var1, Spliterator<P_IN> var2) {
Objects.requireNonNull(var1);
if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(this.getStreamAndOpFlags())) {
var1.begin(var2.getExactSizeIfKnown());
var2.forEachRemaining(var1);
var1.end();
} else {
this.copyIntoWithCancel(var1, var2);
}
}
final <P_IN> void copyIntoWithCancel(Sink<P_IN> var1, Spliterator<P_IN> var2) {
AbstractPipeline var3;
for(var3 = this; var3.depth > 0; var3 = var3.previousStage) {
;
}
var1.begin(var2.getExactSizeIfKnown());
var3.forEachWithCancel(var2, var1);
var1.end();
}
final int getStreamAndOpFlags() {
return this.combinedFlags;
}
final boolean isOrdered() {
return StreamOpFlag.ORDERED.isKnown(this.combinedFlags);
}
final <P_IN> Sink<P_IN> wrapSink(Sink<E_OUT> var1) {
Objects.requireNonNull(var1);
for(AbstractPipeline var2 = this; var2.depth > 0; var2 = var2.previousStage) {
var1 = var2.opWrapSink(var2.previousStage.combinedFlags, var1);
}
return var1;
}
final <P_IN> Spliterator<E_OUT> wrapSpliterator(Spliterator<P_IN> var1) {
return this.depth == 0 ? var1 : this.wrap(this, () -> {
return var1;
}, this.isParallel());
}
final <P_IN> Node<E_OUT> evaluate(Spliterator<P_IN> var1, boolean var2, IntFunction<E_OUT[]> var3) {
if (this.isParallel()) {
return this.evaluateToNode(this, var1, var2, var3);
} else {
Builder var4 = this.makeNodeBuilder(this.exactOutputSizeIfKnown(var1), var3);
return ((Builder)this.wrapAndCopyInto(var4, var1)).build();
}
}
abstract StreamShape getOutputShape();
abstract <P_IN> Node<E_OUT> evaluateToNode(PipelineHelper<E_OUT> var1, Spliterator<P_IN> var2, boolean var3, IntFunction<E_OUT[]> var4);
abstract <P_IN> Spliterator<E_OUT> wrap(PipelineHelper<E_OUT> var1, Supplier<Spliterator<P_IN>> var2, boolean var3);
abstract Spliterator<E_OUT> lazySpliterator(Supplier<? extends Spliterator<E_OUT>> var1);
abstract void forEachWithCancel(Spliterator<E_OUT> var1, Sink<E_OUT> var2);
abstract Builder<E_OUT> makeNodeBuilder(long var1, IntFunction<E_OUT[]> var3);
abstract boolean opIsStateful();
abstract Sink<E_IN> opWrapSink(int var1, Sink<E_OUT> var2);
<P_IN> Node<E_OUT> opEvaluateParallel(PipelineHelper<E_OUT> var1, Spliterator<P_IN> var2, IntFunction<E_OUT[]> var3) {
throw new UnsupportedOperationException("Parallel evaluation is not supported");
}
<P_IN> Spliterator<E_OUT> opEvaluateParallelLazy(PipelineHelper<E_OUT> var1, Spliterator<P_IN> var2) {
return this.opEvaluateParallel(var1, var2, (var0) -> {
return (Object[])(new Object[var0]);
}).spliterator();
}
}
BaseStream源码
package java.util.stream;
import java.util.Iterator;
import java.util.Spliterator;
// T 是流中元素的类型
// S 是BaseStream的实现
public interface BaseStream<T, S extends BaseStream<T, S>> extends AutoCloseable {
// 该方法返回流中元素的迭代器,泛型就是T(流中元素的类型)
Iterator<T> iterator();
//该方法返回流中元素的分割迭代器,泛型就是T(流中元素的类型)
Spliterator<T> spliterator();
//该方法返回流是否是并行流(流中元素的类型),必须在终止操作之前调用此方法,否则会造成超出预期的结果
boolean isParallel();
//该方法返回的是S,一个元素仍然是T的新Stream,如果已经是串行的流调用此方法则返回本身
S sequential();
//方法返回的流是并行流
S parallel();
//该方法返回的是S,上述解释S就是一个新的Stream,新Stream的元素仍然是T。新流是无序的,如果旧流本身
//是无序的或者已经设置为无序,则新流就是旧流本身
S unordered();
//当close方法调用时,参数中的closeHandler才会执行,并且执行的顺序是添加的顺序。及时先添的
//closeHandler抛出了异常,后续的closeHandler任会执行。第一个异常会传递到调用者处,后续的异常则是
//一起汇聚为suppressed exceptions
S onClose(Runnable var1);
//关闭流,会导致上述方法中所有的closeHandler调用
void close();
}
Stream源码
package java.util.stream;
import java.util.Arrays;
import java.util.Comparator;
import java.util.Objects;
import java.util.Optional;
import java.util.Spliterator;
import java.util.Spliterators;
import java.util.Spliterators.AbstractSpliterator;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.BinaryOperator;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.IntFunction;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.function.ToDoubleFunction;
import java.util.function.ToIntFunction;
import java.util.function.ToLongFunction;
import java.util.function.UnaryOperator;
import java.util.stream.StreamSpliterators.InfiniteSupplyingSpliterator.OfRef;
import java.util.stream.Streams.StreamBuilderImpl;
import java.util.stream.WhileOps.UnorderedWhileSpliterator.OfRef.Dropping;
import java.util.stream.WhileOps.UnorderedWhileSpliterator.OfRef.Taking;
public interface Stream<T> extends BaseStream<T, Stream<T>> {
Stream<T> filter(Predicate<? super T> var1);
<R> Stream<R> map(Function<? super T, ? extends R> var1);
IntStream mapToInt(ToIntFunction<? super T> var1);
LongStream mapToLong(ToLongFunction<? super T> var1);
DoubleStream mapToDouble(ToDoubleFunction<? super T> var1);
<R> Stream<R> flatMap(Function<? super T, ? extends Stream<? extends R>> var1);
IntStream flatMapToInt(Function<? super T, ? extends IntStream> var1);
LongStream flatMapToLong(Function<? super T, ? extends LongStream> var1);
DoubleStream flatMapToDouble(Function<? super T, ? extends DoubleStream> var1);
Stream<T> distinct();
Stream<T> sorted();
Stream<T> sorted(Comparator<? super T> var1);
Stream<T> peek(Consumer<? super T> var1);
Stream<T> limit(long var1);
Stream<T> skip(long var1);
default Stream<T> takeWhile(Predicate<? super T> predicate) {
Objects.requireNonNull(predicate);
return (Stream)StreamSupport.stream(new Taking(this.spliterator(), true, predicate), this.isParallel()).onClose(this::close);
}
default Stream<T> dropWhile(Predicate<? super T> predicate) {
Objects.requireNonNull(predicate);
return (Stream)StreamSupport.stream(new Dropping(this.spliterator(), true, predicate), this.isParallel()).onClose(this::close);
}
void forEach(Consumer<? super T> var1);
void forEachOrdered(Consumer<? super T> var1);
Object[] toArray();
<A> A[] toArray(IntFunction<A[]> var1);
T reduce(T var1, BinaryOperator<T> var2);
Optional<T> reduce(BinaryOperator<T> var1);
<U> U reduce(U var1, BiFunction<U, ? super T, U> var2, BinaryOperator<U> var3);
<R> R collect(Supplier<R> var1, BiConsumer<R, ? super T> var2, BiConsumer<R, R> var3);
<R, A> R collect(Collector<? super T, A, R> var1);
Optional<T> min(Comparator<? super T> var1);
Optional<T> max(Comparator<? super T> var1);
long count();
boolean anyMatch(Predicate<? super T> var1);
boolean allMatch(Predicate<? super T> var1);
boolean noneMatch(Predicate<? super T> var1);
Optional<T> findFirst();
Optional<T> findAny();
static <T> Stream.Builder<T> builder() {
return new StreamBuilderImpl();
}
static <T> Stream<T> empty() {
return StreamSupport.stream(Spliterators.emptySpliterator(), false);
}
static <T> Stream<T> of(T t) {
return StreamSupport.stream(new StreamBuilderImpl(t), false);
}
static <T> Stream<T> ofNullable(T t) {
return t == null ? empty() : StreamSupport.stream(new StreamBuilderImpl(t), false);
}
@SafeVarargs
static <T> Stream<T> of(T... values) {
return Arrays.stream(values);
}
static <T> Stream<T> iterate(final T seed, final UnaryOperator<T> f) {
Objects.requireNonNull(f);
Spliterator<T> spliterator = new AbstractSpliterator<T>(9223372036854775807L, 1040) {
T prev;
boolean started;
public boolean tryAdvance(Consumer<? super T> action) {
Objects.requireNonNull(action);
Object t;
if (this.started) {
t = f.apply(this.prev);
} else {
t = seed;
this.started = true;
}
action.accept(this.prev = t);
return true;
}
};
return StreamSupport.stream(spliterator, false);
}
static <T> Stream<T> iterate(final T seed, final Predicate<? super T> hasNext, final UnaryOperator<T> next) {
Objects.requireNonNull(next);
Objects.requireNonNull(hasNext);
Spliterator<T> spliterator = new AbstractSpliterator<T>(9223372036854775807L, 1040) {
T prev;
boolean started;
boolean finished;
public boolean tryAdvance(Consumer<? super T> action) {
Objects.requireNonNull(action);
if (this.finished) {
return false;
} else {
Object t;
if (this.started) {
t = next.apply(this.prev);
} else {
t = seed;
this.started = true;
}
if (!hasNext.test(t)) {
this.prev = null;
this.finished = true;
return false;
} else {
action.accept(this.prev = t);
return true;
}
}
}
public void forEachRemaining(Consumer<? super T> action) {
Objects.requireNonNull(action);
if (!this.finished) {
this.finished = true;
T t = this.started ? next.apply(this.prev) : seed;
for(this.prev = null; hasNext.test(t); t = next.apply(t)) {
action.accept(t);
}
}
}
};
return StreamSupport.stream(spliterator, false);
}
static <T> Stream<T> generate(Supplier<? extends T> s) {
Objects.requireNonNull(s);
return StreamSupport.stream(new OfRef(9223372036854775807L, s), false);
}
static <T> Stream<T> concat(Stream<? extends T> a, Stream<? extends T> b) {
Objects.requireNonNull(a);
Objects.requireNonNull(b);
Spliterator<T> split = new java.util.stream.Streams.ConcatSpliterator.OfRef(a.spliterator(), b.spliterator());
Stream<T> stream = StreamSupport.stream(split, a.isParallel() || b.isParallel());
return (Stream)stream.onClose(Streams.composedClose(a, b));
}
public interface Builder<T> extends Consumer<T> {
void accept(T var1);
default Stream.Builder<T> add(T t) {
this.accept(t);
return this;
}
Stream<T> build();
}
}
ReferencePipeline源码
//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by Fernflower decompiler)
//
package java.util.stream;
import java.util.Comparator;
import java.util.Iterator;
import java.util.Objects;
import java.util.Optional;
import java.util.Spliterator;
import java.util.Spliterators;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.BinaryOperator;
import java.util.function.Consumer;
import java.util.function.DoubleConsumer;
import java.util.function.Function;
import java.util.function.IntConsumer;
import java.util.function.IntFunction;
import java.util.function.LongConsumer;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.function.ToDoubleFunction;
import java.util.function.ToIntFunction;
import java.util.function.ToLongFunction;
import java.util.stream.Collector.Characteristics;
import java.util.stream.MatchOps.MatchKind;
import java.util.stream.Node.Builder;
import java.util.stream.Sink.ChainedReference;
import java.util.stream.StreamSpliterators.DelegatingSpliterator;
import java.util.stream.StreamSpliterators.WrappingSpliterator;
abstract class ReferencePipeline<P_IN, P_OUT> extends AbstractPipeline<P_IN, P_OUT, Stream<P_OUT>> implements Stream<P_OUT> {
ReferencePipeline(Supplier<? extends Spliterator<?>> source, int sourceFlags, boolean parallel) {
super(source, sourceFlags, parallel);
}
ReferencePipeline(Spliterator<?> source, int sourceFlags, boolean parallel) {
super(source, sourceFlags, parallel);
}
ReferencePipeline(AbstractPipeline<?, P_IN, ?> upstream, int opFlags) {
super(upstream, opFlags);
}
final StreamShape getOutputShape() {
return StreamShape.REFERENCE;
}
final <P_IN> Node<P_OUT> evaluateToNode(PipelineHelper<P_OUT> helper, Spliterator<P_IN> spliterator, boolean flattenTree, IntFunction<P_OUT[]> generator) {
return Nodes.collect(helper, spliterator, flattenTree, generator);
}
final <P_IN> Spliterator<P_OUT> wrap(PipelineHelper<P_OUT> ph, Supplier<Spliterator<P_IN>> supplier, boolean isParallel) {
return new WrappingSpliterator(ph, supplier, isParallel);
}
final Spliterator<P_OUT> lazySpliterator(Supplier<? extends Spliterator<P_OUT>> supplier) {
return new DelegatingSpliterator(supplier);
}
final boolean forEachWithCancel(Spliterator<P_OUT> spliterator, Sink<P_OUT> sink) {
boolean cancelled;
while(!(cancelled = sink.cancellationRequested()) && spliterator.tryAdvance(sink)) {
}
return cancelled;
}
final Builder<P_OUT> makeNodeBuilder(long exactSizeIfKnown, IntFunction<P_OUT[]> generator) {
return Nodes.builder(exactSizeIfKnown, generator);
}
public final Iterator<P_OUT> iterator() {
return Spliterators.iterator(this.spliterator());
}
public Stream<P_OUT> unordered() {
return (Stream)(!this.isOrdered() ? this : new ReferencePipeline.StatelessOp<P_OUT, P_OUT>(this, StreamShape.REFERENCE, StreamOpFlag.NOT_ORDERED) {
Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) {
return sink;
}
});
}
public final Stream<P_OUT> filter(final Predicate<? super P_OUT> predicate) {
Objects.requireNonNull(predicate);
return new ReferencePipeline.StatelessOp<P_OUT, P_OUT>(this, StreamShape.REFERENCE, StreamOpFlag.NOT_SIZED) {
Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) {
return new ChainedReference<P_OUT, P_OUT>(sink) {
public void begin(long size) {
this.downstream.begin(-1L);
}
public void accept(P_OUT u) {
if (predicate.test(u)) {
this.downstream.accept(u);
}
}
};
}
};
}
public final <R> Stream<R> map(final Function<? super P_OUT, ? extends R> mapper) {
Objects.requireNonNull(mapper);
return new ReferencePipeline.StatelessOp<P_OUT, R>(this, StreamShape.REFERENCE, StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) {
return new ChainedReference<P_OUT, R>(sink) {
public void accept(P_OUT u) {
this.downstream.accept(mapper.apply(u));
}
};
}
};
}
public final IntStream mapToInt(final ToIntFunction<? super P_OUT> mapper) {
Objects.requireNonNull(mapper);
return new java.util.stream.IntPipeline.StatelessOp<P_OUT>(this, StreamShape.REFERENCE, StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
Sink<P_OUT> opWrapSink(int flags, Sink<Integer> sink) {
return new ChainedReference<P_OUT, Integer>(sink) {
public void accept(P_OUT u) {
this.downstream.accept(mapper.applyAsInt(u));
}
};
}
};
}
public final LongStream mapToLong(final ToLongFunction<? super P_OUT> mapper) {
Objects.requireNonNull(mapper);
return new java.util.stream.LongPipeline.StatelessOp<P_OUT>(this, StreamShape.REFERENCE, StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
Sink<P_OUT> opWrapSink(int flags, Sink<Long> sink) {
return new ChainedReference<P_OUT, Long>(sink) {
public void accept(P_OUT u) {
this.downstream.accept(mapper.applyAsLong(u));
}
};
}
};
}
public final DoubleStream mapToDouble(final ToDoubleFunction<? super P_OUT> mapper) {
Objects.requireNonNull(mapper);
return new java.util.stream.DoublePipeline.StatelessOp<P_OUT>(this, StreamShape.REFERENCE, StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
Sink<P_OUT> opWrapSink(int flags, Sink<Double> sink) {
return new ChainedReference<P_OUT, Double>(sink) {
public void accept(P_OUT u) {
this.downstream.accept(mapper.applyAsDouble(u));
}
};
}
};
}
public final <R> Stream<R> flatMap(final Function<? super P_OUT, ? extends Stream<? extends R>> mapper) {
Objects.requireNonNull(mapper);
return new ReferencePipeline.StatelessOp<P_OUT, R>(this, StreamShape.REFERENCE, StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) {
Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) {
return new ChainedReference<P_OUT, R>(sink) {
public void begin(long size) {
this.downstream.begin(-1L);
}
public void accept(P_OUT u) {
Stream<? extends R> result = (Stream)mapper.apply(u);
Throwable var3 = null;
try {
if (result != null) {
((Stream)result.sequential()).forEach(this.downstream);
}
} catch (Throwable var12) {
var3 = var12;
throw var12;
} finally {
if (result != null) {
if (var3 != null) {
try {
result.close();
} catch (Throwable var11) {
var3.addSuppressed(var11);
}
} else {
result.close();
}
}
}
}
};
}
};
}
public final IntStream flatMapToInt(final Function<? super P_OUT, ? extends IntStream> mapper) {
Objects.requireNonNull(mapper);
return new java.util.stream.IntPipeline.StatelessOp<P_OUT>(this, StreamShape.REFERENCE, StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) {
Sink<P_OUT> opWrapSink(int flags, Sink<Integer> sink) {
return new ChainedReference<P_OUT, Integer>(sink) {
IntConsumer downstreamAsInt;
{
Sink var10001 = this.downstream;
Objects.requireNonNull(var10001);
this.downstreamAsInt = var10001::accept;
}
public void begin(long size) {
this.downstream.begin(-1L);
}
public void accept(P_OUT u) {
IntStream result = (IntStream)mapper.apply(u);
Throwable var3 = null;
try {
if (result != null) {
result.sequential().forEach(this.downstreamAsInt);
}
} catch (Throwable var12) {
var3 = var12;
throw var12;
} finally {
if (result != null) {
if (var3 != null) {
try {
result.close();
} catch (Throwable var11) {
var3.addSuppressed(var11);
}
} else {
result.close();
}
}
}
}
};
}
};
}
public final DoubleStream flatMapToDouble(final Function<? super P_OUT, ? extends DoubleStream> mapper) {
Objects.requireNonNull(mapper);
return new java.util.stream.DoublePipeline.StatelessOp<P_OUT>(this, StreamShape.REFERENCE, StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) {
Sink<P_OUT> opWrapSink(int flags, Sink<Double> sink) {
return new ChainedReference<P_OUT, Double>(sink) {
DoubleConsumer downstreamAsDouble;
{
Sink var10001 = this.downstream;
Objects.requireNonNull(var10001);
this.downstreamAsDouble = var10001::accept;
}
public void begin(long size) {
this.downstream.begin(-1L);
}
public void accept(P_OUT u) {
DoubleStream result = (DoubleStream)mapper.apply(u);
Throwable var3 = null;
try {
if (result != null) {
result.sequential().forEach(this.downstreamAsDouble);
}
} catch (Throwable var12) {
var3 = var12;
throw var12;
} finally {
if (result != null) {
if (var3 != null) {
try {
result.close();
} catch (Throwable var11) {
var3.addSuppressed(var11);
}
} else {
result.close();
}
}
}
}
};
}
};
}
public final LongStream flatMapToLong(final Function<? super P_OUT, ? extends LongStream> mapper) {
Objects.requireNonNull(mapper);
return new java.util.stream.LongPipeline.StatelessOp<P_OUT>(this, StreamShape.REFERENCE, StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) {
Sink<P_OUT> opWrapSink(int flags, Sink<Long> sink) {
return new ChainedReference<P_OUT, Long>(sink) {
LongConsumer downstreamAsLong;
{
Sink var10001 = this.downstream;
Objects.requireNonNull(var10001);
this.downstreamAsLong = var10001::accept;
}
public void begin(long size) {
this.downstream.begin(-1L);
}
public void accept(P_OUT u) {
LongStream result = (LongStream)mapper.apply(u);
Throwable var3 = null;
try {
if (result != null) {
result.sequential().forEach(this.downstreamAsLong);
}
} catch (Throwable var12) {
var3 = var12;
throw var12;
} finally {
if (result != null) {
if (var3 != null) {
try {
result.close();
} catch (Throwable var11) {
var3.addSuppressed(var11);
}
} else {
result.close();
}
}
}
}
};
}
};
}
public final Stream<P_OUT> peek(final Consumer<? super P_OUT> action) {
Objects.requireNonNull(action);
return new ReferencePipeline.StatelessOp<P_OUT, P_OUT>(this, StreamShape.REFERENCE, 0) {
Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) {
return new ChainedReference<P_OUT, P_OUT>(sink) {
public void accept(P_OUT u) {
action.accept(u);
this.downstream.accept(u);
}
};
}
};
}
public final Stream<P_OUT> distinct() {
return DistinctOps.makeRef(this);
}
public final Stream<P_OUT> sorted() {
return SortedOps.makeRef(this);
}
public final Stream<P_OUT> sorted(Comparator<? super P_OUT> comparator) {
return SortedOps.makeRef(this, comparator);
}
public final Stream<P_OUT> limit(long maxSize) {
if (maxSize < 0L) {
throw new IllegalArgumentException(Long.toString(maxSize));
} else {
return SliceOps.makeRef(this, 0L, maxSize);
}
}
public final Stream<P_OUT> skip(long n) {
if (n < 0L) {
throw new IllegalArgumentException(Long.toString(n));
} else {
return (Stream)(n == 0L ? this : SliceOps.makeRef(this, n, -1L));
}
}
public final Stream<P_OUT> takeWhile(Predicate<? super P_OUT> predicate) {
return WhileOps.makeTakeWhileRef(this, predicate);
}
public final Stream<P_OUT> dropWhile(Predicate<? super P_OUT> predicate) {
return WhileOps.makeDropWhileRef(this, predicate);
}
public void forEach(Consumer<? super P_OUT> action) {
this.evaluate(ForEachOps.makeRef(action, false));
}
public void forEachOrdered(Consumer<? super P_OUT> action) {
this.evaluate(ForEachOps.makeRef(action, true));
}
public final <A> A[] toArray(IntFunction<A[]> generator) {
return Nodes.flatten(this.evaluateToArrayNode(generator), generator).asArray(generator);
}
public final Object[] toArray() {
return this.toArray((x$0) -> {
return new Object[x$0];
});
}
public final boolean anyMatch(Predicate<? super P_OUT> predicate) {
return (Boolean)this.evaluate(MatchOps.makeRef(predicate, MatchKind.ANY));
}
public final boolean allMatch(Predicate<? super P_OUT> predicate) {
return (Boolean)this.evaluate(MatchOps.makeRef(predicate, MatchKind.ALL));
}
public final boolean noneMatch(Predicate<? super P_OUT> predicate) {
return (Boolean)this.evaluate(MatchOps.makeRef(predicate, MatchKind.NONE));
}
public final Optional<P_OUT> findFirst() {
return (Optional)this.evaluate(FindOps.makeRef(true));
}
public final Optional<P_OUT> findAny() {
return (Optional)this.evaluate(FindOps.makeRef(false));
}
public final P_OUT reduce(P_OUT identity, BinaryOperator<P_OUT> accumulator) {
return this.evaluate(ReduceOps.makeRef(identity, accumulator, accumulator));
}
public final Optional<P_OUT> reduce(BinaryOperator<P_OUT> accumulator) {
return (Optional)this.evaluate(ReduceOps.makeRef(accumulator));
}
public final <R> R reduce(R identity, BiFunction<R, ? super P_OUT, R> accumulator, BinaryOperator<R> combiner) {
return this.evaluate(ReduceOps.makeRef(identity, accumulator, combiner));
}
public final <R, A> R collect(Collector<? super P_OUT, A, R> collector) {
Object container;
if (!this.isParallel() || !collector.characteristics().contains(Characteristics.CONCURRENT) || this.isOrdered() && !collector.characteristics().contains(Characteristics.UNORDERED)) {
container = this.evaluate(ReduceOps.makeRef(collector));
} else {
container = collector.supplier().get();
BiConsumer<A, ? super P_OUT> accumulator = collector.accumulator();
this.forEach((u) -> {
accumulator.accept(container, u);
});
}
return collector.characteristics().contains(Characteristics.IDENTITY_FINISH) ? container : collector.finisher().apply(container);
}
public final <R> R collect(Supplier<R> supplier, BiConsumer<R, ? super P_OUT> accumulator, BiConsumer<R, R> combiner) {
return this.evaluate(ReduceOps.makeRef(supplier, accumulator, combiner));
}
public final Optional<P_OUT> max(Comparator<? super P_OUT> comparator) {
return this.reduce(BinaryOperator.maxBy(comparator));
}
public final Optional<P_OUT> min(Comparator<? super P_OUT> comparator) {
return this.reduce(BinaryOperator.minBy(comparator));
}
public final long count() {
return (Long)this.evaluate(ReduceOps.makeRefCounting());
}
//内部类实现3:StatefulOp 有状态操作
abstract static class StatefulOp<E_IN, E_OUT> extends ReferencePipeline<E_IN, E_OUT> {
StatefulOp(AbstractPipeline<?, E_IN, ?> upstream, StreamShape inputShape, int opFlags) {
super(upstream, opFlags);
assert upstream.getOutputShape() == inputShape;
}
final boolean opIsStateful() {
return true;
}
abstract <P_IN> Node<E_OUT> opEvaluateParallel(PipelineHelper<E_OUT> var1, Spliterator<P_IN> var2, IntFunction<E_OUT[]> var3);
}
//内部类实现2:StatelessOp 无状态操作
abstract static class StatelessOp<E_IN, E_OUT> extends ReferencePipeline<E_IN, E_OUT> {
StatelessOp(AbstractPipeline<?, E_IN, ?> upstream, StreamShape inputShape, int opFlags) {
super(upstream, opFlags);
assert upstream.getOutputShape() == inputShape;
}
final boolean opIsStateful() {
return false;
}
}
//内部类实现1:Head 初始化
static class Head<E_IN, E_OUT> extends ReferencePipeline<E_IN, E_OUT> {
Head(Supplier<? extends Spliterator<?>> source, int sourceFlags, boolean parallel) {
super(source, sourceFlags, parallel);
}
Head(Spliterator<?> source, int sourceFlags, boolean parallel) {
super(source, sourceFlags, parallel);
}
final boolean opIsStateful() {
throw new UnsupportedOperationException();
}
final Sink<E_IN> opWrapSink(int flags, Sink<E_OUT> sink) {
throw new UnsupportedOperationException();
}
public void forEach(Consumer<? super E_OUT> action) {
if (!this.isParallel()) {
this.sourceStageSpliterator().forEachRemaining(action);
} else {
super.forEach(action);
}
}
public void forEachOrdered(Consumer<? super E_OUT> action) {
if (!this.isParallel()) {
this.sourceStageSpliterator().forEachRemaining(action);
} else {
super.forEachOrdered(action);
}
}
}
}
Collector源码
Collector接口包含了一系列方法,为实现具体的归约/聚合操作(即收集器)提供了范本。我们已经看过了Collector接口中实现的许多收集器,例如toList或groupingBy。这也意味着你可以为Collector接口提供自己的实现,从而自由创建自定义归约操作。
Collector接口
package java.util.stream;
import java.util.Collections;
import java.util.EnumSet;
import java.util.Objects;
import java.util.Set;
import java.util.function.BiConsumer;
import java.util.function.BinaryOperator;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors.CollectorImpl;
public interface Collector<T, A, R> {
Supplier<A> supplier();
BiConsumer<A, T> accumulator();
BinaryOperator<A> combiner();
Function<A, R> finisher();
Set<Collector.Characteristics> characteristics();
static <T, R> Collector<T, R, R> of(Supplier<R> var0, BiConsumer<R, T> var1, BinaryOperator<R> var2, Collector.Characteristics... var3) {
Objects.requireNonNull(var0);
Objects.requireNonNull(var1);
Objects.requireNonNull(var2);
Objects.requireNonNull(var3);
Set var4 = var3.length == 0 ? Collectors.CH_ID : Collections.unmodifiableSet(EnumSet.of(Collector.Characteristics.IDENTITY_FINISH, var3));
return new CollectorImpl(var0, var1, var2, var4);
}
static <T, A, R> Collector<T, A, R> of(Supplier<A> var0, BiConsumer<A, T> var1, BinaryOperator<A> var2, Function<A, R> var3, Collector.Characteristics... var4) {
Objects.requireNonNull(var0);
Objects.requireNonNull(var1);
Objects.requireNonNull(var2);
Objects.requireNonNull(var3);
Objects.requireNonNull(var4);
Set var5 = Collectors.CH_NOID;
if (var4.length > 0) {
EnumSet var6 = EnumSet.noneOf(Collector.Characteristics.class);
Collections.addAll(var6, var4);
var5 = Collections.unmodifiableSet(var6);
}
return new CollectorImpl(var0, var1, var2, var3, var5);
}
public static enum Characteristics {
CONCURRENT,
UNORDERED,
IDENTITY_FINISH;
private Characteristics() {
}
}
}
Collectors实现
通过内部类CollectorImpl
实现了Collector接口。
package java.util.stream;
import java.util.AbstractMap;
import java.util.AbstractSet;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.DoubleSummaryStatistics;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.IntSummaryStatistics;
import java.util.Iterator;
import java.util.List;
import java.util.LongSummaryStatistics;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.StringJoiner;
import java.util.AbstractMap.SimpleImmutableEntry;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.BinaryOperator;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.function.ToDoubleFunction;
import java.util.function.ToIntFunction;
import java.util.function.ToLongFunction;
import java.util.stream.Collector.Characteristics;
public final class Collectors {
static final Set<Characteristics> CH_CONCURRENT_ID;
static final Set<Characteristics> CH_CONCURRENT_NOID;
static final Set<Characteristics> CH_ID;
static final Set<Characteristics> CH_UNORDERED_ID;
static final Set<Characteristics> CH_NOID;
private Collectors() {
}
private static <T> BinaryOperator<T> throwingMerger() {
return (var0, var1) -> {
throw new IllegalStateException(String.format("Duplicate key %s", var0));
};
}
private static <I, R> Function<I, R> castingIdentity() {
return (var0) -> {
return var0;
};
}
public static <T, C extends Collection<T>> Collector<T, ?, C> toCollection(Supplier<C> var0) {
return new Collectors.CollectorImpl(var0, Collection::add, (var0x, var1) -> {
var0x.addAll(var1);
return var0x;
}, CH_ID);
}
//常用的toList方法(聚合操作)。
public static <T> Collector<T, ?, List<T>> toList() {
return new Collectors.CollectorImpl(ArrayList::new, List::add, (var0, var1) -> {
var0.addAll(var1);
return var0;
}, CH_ID);
}
public static <T> Collector<T, ?, Set<T>> toSet() {
return new Collectors.CollectorImpl(HashSet::new, Set::add, (var0, var1) -> {
var0.addAll(var1);
return var0;
}, CH_UNORDERED_ID);
}
public static Collector<CharSequence, ?, String> joining() {
return new Collectors.CollectorImpl(StringBuilder::new, StringBuilder::append, (var0, var1) -> {
var0.append(var1);
return var0;
}, StringBuilder::toString, CH_NOID);
}
public static Collector<CharSequence, ?, String> joining(CharSequence var0) {
return joining(var0, "", "");
}
public static Collector<CharSequence, ?, String> joining(CharSequence var0, CharSequence var1, CharSequence var2) {
return new Collectors.CollectorImpl(() -> {
return new StringJoiner(var0, var1, var2);
}, StringJoiner::add, StringJoiner::merge, StringJoiner::toString, CH_NOID);
}
private static <K, V, M extends Map<K, V>> BinaryOperator<M> mapMerger(BinaryOperator<V> var0) {
return (var1, var2) -> {
Iterator var3 = var2.entrySet().iterator();
while(var3.hasNext()) {
Entry var4 = (Entry)var3.next();
var1.merge(var4.getKey(), var4.getValue(), var0);
}
return var1;
};
}
public static <T, U, A, R> Collector<T, ?, R> mapping(Function<? super T, ? extends U> var0, Collector<? super U, A, R> var1) {
BiConsumer var2 = var1.accumulator();
return new Collectors.CollectorImpl(var1.supplier(), (var2x, var3) -> {
var2.accept(var2x, var0.apply(var3));
}, var1.combiner(), var1.finisher(), var1.characteristics());
}
public static <T, A, R, RR> Collector<T, A, RR> collectingAndThen(Collector<T, A, R> var0, Function<R, RR> var1) {
Set var2 = var0.characteristics();
if (var2.contains(Characteristics.IDENTITY_FINISH)) {
if (var2.size() == 1) {
var2 = CH_NOID;
} else {
EnumSet var3 = EnumSet.copyOf(var2);
var3.remove(Characteristics.IDENTITY_FINISH);
var2 = Collections.unmodifiableSet(var3);
}
}
return new Collectors.CollectorImpl(var0.supplier(), var0.accumulator(), var0.combiner(), var0.finisher().andThen(var1), var2);
}
public static <T> Collector<T, ?, Long> counting() {
return reducing(0L, (var0) -> {
return 1L;
}, Long::sum);
}
public static <T> Collector<T, ?, Optional<T>> minBy(Comparator<? super T> var0) {
return reducing(BinaryOperator.minBy(var0));
}
public static <T> Collector<T, ?, Optional<T>> maxBy(Comparator<? super T> var0) {
return reducing(BinaryOperator.maxBy(var0));
}
public static <T> Collector<T, ?, Integer> summingInt(ToIntFunction<? super T> var0) {
return new Collectors.CollectorImpl(() -> {
return new int[1];
}, (var1, var2) -> {
var1[0] += var0.applyAsInt(var2);
}, (var0x, var1) -> {
var0x[0] += var1[0];
return var0x;
}, (var0x) -> {
return var0x[0];
}, CH_NOID);
}
public static <T> Collector<T, ?, Long> summingLong(ToLongFunction<? super T> var0) {
return new Collectors.CollectorImpl(() -> {
return new long[1];
}, (var1, var2) -> {
var1[0] += var0.applyAsLong(var2);
}, (var0x, var1) -> {
var0x[0] += var1[0];
return var0x;
}, (var0x) -> {
return var0x[0];
}, CH_NOID);
}
public static <T> Collector<T, ?, Double> summingDouble(ToDoubleFunction<? super T> var0) {
return new Collectors.CollectorImpl(() -> {
return new double[3];
}, (var1, var2) -> {
sumWithCompensation(var1, var0.applyAsDouble(var2));
var1[2] += var0.applyAsDouble(var2);
}, (var0x, var1) -> {
sumWithCompensation(var0x, var1[0]);
var0x[2] += var1[2];
return sumWithCompensation(var0x, var1[1]);
}, (var0x) -> {
return computeFinalSum(var0x);
}, CH_NOID);
}
static double[] sumWithCompensation(double[] var0, double var1) {
double var3 = var1 - var0[1];
double var5 = var0[0];
double var7 = var5 + var3;
var0[1] = var7 - var5 - var3;
var0[0] = var7;
return var0;
}
static double computeFinalSum(double[] var0) {
double var1 = var0[0] + var0[1];
double var3 = var0[var0.length - 1];
return Double.isNaN(var1) && Double.isInfinite(var3) ? var3 : var1;
}
public static <T> Collector<T, ?, Double> averagingInt(ToIntFunction<? super T> var0) {
return new Collectors.CollectorImpl(() -> {
return new long[2];
}, (var1, var2) -> {
var1[0] += (long)var0.applyAsInt(var2);
++var1[1];
}, (var0x, var1) -> {
var0x[0] += var1[0];
var0x[1] += var1[1];
return var0x;
}, (var0x) -> {
return var0x[1] == 0L ? 0.0D : (double)var0x[0] / (double)var0x[1];
}, CH_NOID);
}
public static <T> Collector<T, ?, Double> averagingLong(ToLongFunction<? super T> var0) {
return new Collectors.CollectorImpl(() -> {
return new long[2];
}, (var1, var2) -> {
var1[0] += var0.applyAsLong(var2);
++var1[1];
}, (var0x, var1) -> {
var0x[0] += var1[0];
var0x[1] += var1[1];
return var0x;
}, (var0x) -> {
return var0x[1] == 0L ? 0.0D : (double)var0x[0] / (double)var0x[1];
}, CH_NOID);
}
public static <T> Collector<T, ?, Double> averagingDouble(ToDoubleFunction<? super T> var0) {
return new Collectors.CollectorImpl(() -> {
return new double[4];
}, (var1, var2) -> {
sumWithCompensation(var1, var0.applyAsDouble(var2));
++var1[2];
var1[3] += var0.applyAsDouble(var2);
}, (var0x, var1) -> {
sumWithCompensation(var0x, var1[0]);
sumWithCompensation(var0x, var1[1]);
var0x[2] += var1[2];
var0x[3] += var1[3];
return var0x;
}, (var0x) -> {
return var0x[2] == 0.0D ? 0.0D : computeFinalSum(var0x) / var0x[2];
}, CH_NOID);
}
public static <T> Collector<T, ?, T> reducing(T var0, BinaryOperator<T> var1) {
return new Collectors.CollectorImpl(boxSupplier(var0), (var1x, var2) -> {
var1x[0] = var1.apply(var1x[0], var2);
}, (var1x, var2) -> {
var1x[0] = var1.apply(var1x[0], var2[0]);
return var1x;
}, (var0x) -> {
return var0x[0];
}, CH_NOID);
}
private static <T> Supplier<T[]> boxSupplier(T var0) {
return () -> {
return (Object[])(new Object[]{var0});
};
}
public static <T> Collector<T, ?, Optional<T>> reducing(BinaryOperator<T> var0) {
return new Collectors.CollectorImpl(() -> {
class OptionalBox implements Consumer<T> {
T value = null;
boolean present = false;
OptionalBox() {
}
public void accept(T var1) {
if (this.present) {
this.value = var0.apply(this.value, var1);
} else {
this.value = var1;
this.present = true;
}
}
}
return new OptionalBox();
}, OptionalBox::accept, (var0x, var1) -> {
if (var1.present) {
var0x.accept(var1.value);
}
return var0x;
}, (var0x) -> {
return Optional.ofNullable(var0x.value);
}, CH_NOID);
}
public static <T, U> Collector<T, ?, U> reducing(U var0, Function<? super T, ? extends U> var1, BinaryOperator<U> var2) {
return new Collectors.CollectorImpl(boxSupplier(var0), (var2x, var3) -> {
var2x[0] = var2.apply(var2x[0], var1.apply(var3));
}, (var1x, var2x) -> {
var1x[0] = var2.apply(var1x[0], var2x[0]);
return var1x;
}, (var0x) -> {
return var0x[0];
}, CH_NOID);
}
public static <T, K> Collector<T, ?, Map<K, List<T>>> groupingBy(Function<? super T, ? extends K> var0) {
return groupingBy(var0, toList());
}
public static <T, K, A, D> Collector<T, ?, Map<K, D>> groupingBy(Function<? super T, ? extends K> var0, Collector<? super T, A, D> var1) {
return groupingBy(var0, HashMap::new, var1);
}
public static <T, K, D, A, M extends Map<K, D>> Collector<T, ?, M> groupingBy(Function<? super T, ? extends K> var0, Supplier<M> var1, Collector<? super T, A, D> var2) {
Supplier var3 = var2.supplier();
BiConsumer var4 = var2.accumulator();
BiConsumer var5 = (var3x, var4x) -> {
Object var5 = Objects.requireNonNull(var0.apply(var4x), "element cannot be mapped to a null key");
Object var6 = var3x.computeIfAbsent(var5, (var1) -> {
return var3.get();
});
var4.accept(var6, var4x);
};
BinaryOperator var6 = mapMerger(var2.combiner());
if (var2.characteristics().contains(Characteristics.IDENTITY_FINISH)) {
return new Collectors.CollectorImpl(var1, var5, var6, CH_ID);
} else {
Function var8 = var2.finisher();
Function var9 = (var1x) -> {
var1x.replaceAll((var1, var2) -> {
return var8.apply(var2);
});
return var1x;
};
return new Collectors.CollectorImpl(var1, var5, var6, var9, CH_NOID);
}
}
public static <T, K> Collector<T, ?, ConcurrentMap<K, List<T>>> groupingByConcurrent(Function<? super T, ? extends K> var0) {
return groupingByConcurrent(var0, ConcurrentHashMap::new, toList());
}
public static <T, K, A, D> Collector<T, ?, ConcurrentMap<K, D>> groupingByConcurrent(Function<? super T, ? extends K> var0, Collector<? super T, A, D> var1) {
return groupingByConcurrent(var0, ConcurrentHashMap::new, var1);
}
public static <T, K, A, D, M extends ConcurrentMap<K, D>> Collector<T, ?, M> groupingByConcurrent(Function<? super T, ? extends K> var0, Supplier<M> var1, Collector<? super T, A, D> var2) {
Supplier var3 = var2.supplier();
BiConsumer var4 = var2.accumulator();
BinaryOperator var5 = mapMerger(var2.combiner());
BiConsumer var7;
if (var2.characteristics().contains(Characteristics.CONCURRENT)) {
var7 = (var3x, var4x) -> {
Object var5 = Objects.requireNonNull(var0.apply(var4x), "element cannot be mapped to a null key");
Object var6 = var3x.computeIfAbsent(var5, (var1) -> {
return var3.get();
});
var4.accept(var6, var4x);
};
} else {
var7 = (var3x, var4x) -> {
Object var5 = Objects.requireNonNull(var0.apply(var4x), "element cannot be mapped to a null key");
Object var6 = var3x.computeIfAbsent(var5, (var1) -> {
return var3.get();
});
synchronized(var6) {
var4.accept(var6, var4x);
}
};
}
if (var2.characteristics().contains(Characteristics.IDENTITY_FINISH)) {
return new Collectors.CollectorImpl(var1, var7, var5, CH_CONCURRENT_ID);
} else {
Function var8 = var2.finisher();
Function var9 = (var1x) -> {
var1x.replaceAll((var1, var2) -> {
return var8.apply(var2);
});
return var1x;
};
return new Collectors.CollectorImpl(var1, var7, var5, var9, CH_CONCURRENT_NOID);
}
}
public static <T> Collector<T, ?, Map<Boolean, List<T>>> partitioningBy(Predicate<? super T> var0) {
return partitioningBy(var0, toList());
}
public static <T, D, A> Collector<T, ?, Map<Boolean, D>> partitioningBy(Predicate<? super T> var0, Collector<? super T, A, D> var1) {
BiConsumer var2 = var1.accumulator();
BiConsumer var3 = (var2x, var3x) -> {
var2.accept(var0.test(var3x) ? var2x.forTrue : var2x.forFalse, var3x);
};
BinaryOperator var4 = var1.combiner();
BinaryOperator var5 = (var1x, var2x) -> {
return new Collectors.Partition(var4.apply(var1x.forTrue, var2x.forTrue), var4.apply(var1x.forFalse, var2x.forFalse));
};
Supplier var6 = () -> {
return new Collectors.Partition(var1.supplier().get(), var1.supplier().get());
};
if (var1.characteristics().contains(Characteristics.IDENTITY_FINISH)) {
return new Collectors.CollectorImpl(var6, var3, var5, CH_ID);
} else {
Function var7 = (var1x) -> {
return new Collectors.Partition(var1.finisher().apply(var1x.forTrue), var1.finisher().apply(var1x.forFalse));
};
return new Collectors.CollectorImpl(var6, var3, var5, var7, CH_NOID);
}
}
public static <T, K, U> Collector<T, ?, Map<K, U>> toMap(Function<? super T, ? extends K> var0, Function<? super T, ? extends U> var1) {
return toMap(var0, var1, throwingMerger(), HashMap::new);
}
public static <T, K, U> Collector<T, ?, Map<K, U>> toMap(Function<? super T, ? extends K> var0, Function<? super T, ? extends U> var1, BinaryOperator<U> var2) {
return toMap(var0, var1, var2, HashMap::new);
}
public static <T, K, U, M extends Map<K, U>> Collector<T, ?, M> toMap(Function<? super T, ? extends K> var0, Function<? super T, ? extends U> var1, BinaryOperator<U> var2, Supplier<M> var3) {
BiConsumer var4 = (var3x, var4x) -> {
var3x.merge(var0.apply(var4x), var1.apply(var4x), var2);
};
return new Collectors.CollectorImpl(var3, var4, mapMerger(var2), CH_ID);
}
public static <T, K, U> Collector<T, ?, ConcurrentMap<K, U>> toConcurrentMap(Function<? super T, ? extends K> var0, Function<? super T, ? extends U> var1) {
return toConcurrentMap(var0, var1, throwingMerger(), ConcurrentHashMap::new);
}
public static <T, K, U> Collector<T, ?, ConcurrentMap<K, U>> toConcurrentMap(Function<? super T, ? extends K> var0, Function<? super T, ? extends U> var1, BinaryOperator<U> var2) {
return toConcurrentMap(var0, var1, var2, ConcurrentHashMap::new);
}
public static <T, K, U, M extends ConcurrentMap<K, U>> Collector<T, ?, M> toConcurrentMap(Function<? super T, ? extends K> var0, Function<? super T, ? extends U> var1, BinaryOperator<U> var2, Supplier<M> var3) {
BiConsumer var4 = (var3x, var4x) -> {
var3x.merge(var0.apply(var4x), var1.apply(var4x), var2);
};
return new Collectors.CollectorImpl(var3, var4, mapMerger(var2), CH_CONCURRENT_ID);
}
public static <T> Collector<T, ?, IntSummaryStatistics> summarizingInt(ToIntFunction<? super T> var0) {
return new Collectors.CollectorImpl(IntSummaryStatistics::new, (var1, var2) -> {
var1.accept(var0.applyAsInt(var2));
}, (var0x, var1) -> {
var0x.combine(var1);
return var0x;
}, CH_ID);
}
public static <T> Collector<T, ?, LongSummaryStatistics> summarizingLong(ToLongFunction<? super T> var0) {
return new Collectors.CollectorImpl(LongSummaryStatistics::new, (var1, var2) -> {
var1.accept(var0.applyAsLong(var2));
}, (var0x, var1) -> {
var0x.combine(var1);
return var0x;
}, CH_ID);
}
public static <T> Collector<T, ?, DoubleSummaryStatistics> summarizingDouble(ToDoubleFunction<? super T> var0) {
return new Collectors.CollectorImpl(DoubleSummaryStatistics::new, (var1, var2) -> {
var1.accept(var0.applyAsDouble(var2));
}, (var0x, var1) -> {
var0x.combine(var1);
return var0x;
}, CH_ID);
}
static {
CH_CONCURRENT_ID = Collections.unmodifiableSet(EnumSet.of(Characteristics.CONCURRENT, Characteristics.UNORDERED, Characteristics.IDENTITY_FINISH));
CH_CONCURRENT_NOID = Collections.unmodifiableSet(EnumSet.of(Characteristics.CONCURRENT, Characteristics.UNORDERED));
CH_ID = Collections.unmodifiableSet(EnumSet.of(Characteristics.IDENTITY_FINISH));
CH_UNORDERED_ID = Collections.unmodifiableSet(EnumSet.of(Characteristics.UNORDERED, Characteristics.IDENTITY_FINISH));
CH_NOID = Collections.emptySet();
}
static class CollectorImpl<T, A, R> implements Collector<T, A, R> {
private final Supplier<A> supplier;
private final BiConsumer<A, T> accumulator;
private final BinaryOperator<A> combiner;
private final Function<A, R> finisher;
private final Set<Characteristics> characteristics;
CollectorImpl(Supplier<A> var1, BiConsumer<A, T> var2, BinaryOperator<A> var3, Function<A, R> var4, Set<Characteristics> var5) {
this.supplier = var1;
this.accumulator = var2;
this.combiner = var3;
this.finisher = var4;
this.characteristics = var5;
}
CollectorImpl(Supplier<A> var1, BiConsumer<A, T> var2, BinaryOperator<A> var3, Set<Characteristics> var4) {
this(var1, var2, var3, Collectors.castingIdentity(), var4);
}
public BiConsumer<A, T> accumulator() {
return this.accumulator;
}
public Supplier<A> supplier() {
return this.supplier;
}
public BinaryOperator<A> combiner() {
return this.combiner;
}
public Function<A, R> finisher() {
return this.finisher;
}
public Set<Characteristics> characteristics() {
return this.characteristics;
}
}
private static final class Partition<T> extends AbstractMap<Boolean, T> implements Map<Boolean, T> {
final T forTrue;
final T forFalse;
Partition(T var1, T var2) {
this.forTrue = var1;
this.forFalse = var2;
}
public Set<Entry<Boolean, T>> entrySet() {
return new AbstractSet<Entry<Boolean, T>>() {
public Iterator<Entry<Boolean, T>> iterator() {
SimpleImmutableEntry var1 = new SimpleImmutableEntry(false, Partition.this.forFalse);
SimpleImmutableEntry var2 = new SimpleImmutableEntry(true, Partition.this.forTrue);
return Arrays.asList(var1, var2).iterator();
}
public int size() {
return 2;
}
};
}
}
}
TerminalOp终止操作
package java.util.stream;
import java.util.Spliterator;
interface TerminalOp<E_IN, R> {
default StreamShape inputShape() {
return StreamShape.REFERENCE;
}
default int getOpFlags() {
return 0;
}
default <P_IN> R evaluateParallel(PipelineHelper<E_IN> helper, Spliterator<P_IN> spliterator) {
if (Tripwire.ENABLED) {
Tripwire.trip(this.getClass(), "{0} triggering TerminalOp.evaluateParallel serial default");
}
return this.evaluateSequential(helper, spliterator);
}
<P_IN> R evaluateSequential(PipelineHelper<E_IN> var1, Spliterator<P_IN> var2);
}
Stream流水线解决方案
我们大致能够想到,应该采用某种方式记录用户每一步的操作,当用户调用结束操作时将之前记录的操作叠加到一起在一次迭代中全部执行掉。沿着这个思路,有几个问题需要解决:
- 用户的操作如何记录?
- 操作如何叠加?
- 叠加之后的操作如何执行?
- 执行后的结果(如果有)在哪里?
操作如何记录
参考资料
https://www.cnblogs.com/vinozly/p/5465454.html
http://www.cnblogs.com/CarpenterLee/p/6637118.html
https://www.ibm.com/developerworks/cn/java/j-java-streams-1-brian-goetz/index.html
http://www.cnblogs.com/noteless/p/9505098.html
https://blog.csdn.net/qq_36372507/article/details/78946818
https://blog.csdn.net/lz710117239/article/details/76039069
https://blog.csdn.net/lcgoing/article/details/87918010
http://www.oracle.com/technetwork/articles/java/ma14-java-se-8-streams-2177646.html
http://www.oracle.com/technetwork/articles/java/architect-streams-pt2-2227132.html
http://java.dzone.com/articles/understanding-java-8-streams-1
http://stackoverflow.com/questions/224648/external-iterator-vs-internal-iterator
http://codereview.stackexchange.com/questions/52050/spliterator-implementation
https://www.inkling.com/read/schildt-java-complete-reference-9th/chapter-18/spliterators
https://developer.ibm.com/series/java-streams/
DRAFT:
/**
* Copyright (C), 2015-2019
* FileName: BasicStreamDemo
* Author: luo.yongqian
* Date: 2019/4/28 11:00
* Description: 第一个Sream示例
* History:
* <author> <time> <version> <desc>
* luo.yongqian 2019/4/28 11:00 1.0.0 创建
*/
package com.roboslyq.java.java8.stream;
import com.roboslyq.java.ModelDemo;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
/**
*
* 〈第一个Sream示例〉
* Stream与旧板for遍历区别:
* 1、for是客户端(调用者)主动去获取数据,在数据仓库/集合外部实现(外部迭代)。Stream是仓库内部实现,然后将数据推送给客户端。
* 2、for循环有大量样板代码
* 3、for实现原理 :首先调用iterator 方法,产生一个新的Iterator 对象,进而控制整个迭代过程,这就是外部迭代
* Application Collection
* hasNext() --询问集合-->
* hasNext <---返回----
* next() -------->
* value <---返回元素----
* 4、Stream内部迭代
* Application Collection
* 发起操作 ------------->
* 内部迭代操作(Spliterator:)
* 返回操作结果 <-------------
* 5、惰性求值方法:自己本身不会执行,需要结束操作后再触发
* 6、及早求值方法-终止操作,会执行相关所有操作
* @author luo.yongqian
* @create 2019/4/28
* @since 1.0.0
*/
public class BasicStreamDemo2 {
public static void main(String[] args) {
List<ModelDemo> modelDemos = init();
//演示不会打印,因为没有终止操作符
modelDemos.stream().peek(md ->{
printThread();
System.out.println("没有打印1"+ md.getName());
})
.filter(md -> {
return md.getId() > 5;
})
.map(md ->{
printThread();
System.out.println("没有打印1"+ md.getName());
md.setName("wqni");
return md;
})
.collect(Collectors.toList());
for(ModelDemo modelDemo : modelDemos){
printThread();
System.out.println("foreach打印"+modelDemo.getName());
}
}
/**
* 初始化
* @return
*/
public static List<ModelDemo> init(){
List<ModelDemo> listModelDemo = new ArrayList<>();
for(int i=0;i<10;i++){
ModelDemo modelDemo = new ModelDemo();
modelDemo.setId(i);
modelDemo.setName("roboslyq_" + i);
listModelDemo.add(modelDemo);
}
return listModelDemo;
}
static void printThread(){
System.out.println("当前线程ID-"+Thread.currentThread().getId());
}
}
实际操作执行顺序:
从一个最常用Stream.of操作开始,来看看整个流的执行过程。
(1)流来源
1、Stream.of(T ... )
最终返回的是ReferencePipeline
@SafeVarargs
static <T> Stream<T> ofw(T... var0) {
return Arrays.stream(var0);ww
}
2、Arrays.Stream
public static <T> Stream<T> stream(T[] array) {
return stream(array, 0, array.length);
}
public static <T> Stream<T> stream(T[] array, int startInclusive, int endExclusive) {
return StreamSupport.stream(spliterator(array, startInclusive, endExclusive), false);
}
3、StreamSupport
public static <T> Stream<T> stream(Spliterator<T> spliterator, boolean parallel) {
Objects.requireNonNull(spliterator);
return new ReferencePipeline.Head<>(spliterator,
StreamOpFlag.fromCharacteristics(spliterator),
parallel);
}
4、ReferencePipeline.Head
Head(Spliterator<?> source,
int sourceFlags, boolean parallel) {
//super为ReferencePipeline
super(source, sourceFlags, parallel);
}
//ReferencePipeline的super为AbstractPipeline
ReferencePipeline(Spliterator<?> source,
int sourceFlags, boolean parallel) {
super(source, sourceFlags, parallel);
}
//AbstractPipeline
AbstractPipeline(Spliterator<?> source,
int sourceFlags, boolean parallel) {
this.previousStage = null;
this.sourceSpliterator = source;
this.sourceStage = this;
this.sourceOrOpFlags = sourceFlags & StreamOpFlag.STREAM_MASK;
// The following is an optimization of:
// StreamOpFlag.combineOpFlags(sourceOrOpFlags, StreamOpFlag.INITIAL_OPS_VALUE);
this.combinedFlags = (~(sourceOrOpFlags << 1)) & StreamOpFlag.INITIAL_OPS_VALUE;
this.depth = 0;
this.parallel = parallel;
}
(2)中间操作
1、ReferencePipeline.map()
@Override
@SuppressWarnings("unchecked")
public final <R> Stream<R> map(Function<? super P_OUT, ? extends R> mapper) {
Objects.requireNonNull(mapper);
//仅仅是创建了一个StatelessOp,表明,map操作是无状态的。
//并且将当前的引用this传入,作为upstream(父链),从而形成Operator链
return new StatelessOp<P_OUT, R>(this, StreamShape.REFERENCE,
StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
//重写StatelessOp中的opWrapSink方法,返回一个Sink。
@Override
Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) {
//通过Sink的内部类ChainedReference创建一个Sink
return new Sink.ChainedReference<P_OUT, R>(sink) {
@Override
//扩展实现Sink接口的accept方法。即具体的业务逻辑
public void accept(P_OUT u) {
//mapper.apply(u):即外面传入的Lambda表达式
downstream.accept(mapper.apply(u));
}
};
}
};
}
//Sink.ChainedReference实现(链式引用)
static abstract class ChainedReference<T, E_OUT> implements Sink<T> {
//链中下一个Sink
protected final Sink<? super E_OUT> downstream;
public ChainedReference(Sink<? super E_OUT> downstream) {
this.downstream = Objects.requireNonNull(downstream);
}
@Override
public void begin(long size) {
downstream.begin(size);
}
@Override
public void end() {
downstream.end();
}
@Override
public boolean cancellationRequested() {
return downstream.cancellationRequested();
}
}
(3)中止操作
1、Stream.collect()
public final <R, A> R collect(Collector<? super P_OUT, A, R> collector) {
A container;
if (isParallel()
&& (collector.characteristics().contains(Collector.Characteristics.CONCURRENT))
&& (!isOrdered() || collector.characteristics().contains(Collector.Characteristics.UNORDERED))) {
container = collector.supplier().get();
//并行流计算
BiConsumer<A, ? super P_OUT> accumulator = collector.accumulator();
//并行计算
forEach(u -> accumulator.accept(container, u));
}
else {
//串行流计算通过方法ReduceOps.makeRef(collector)构造一个TerminalOp
container = evaluate(ReduceOps.makeRef(collector));
}
//返回计算结果
return collector.characteristics().contains(Collector.Characteristics.IDENTITY_FINISH)
? (R) container
: collector.finisher().apply(container);
}
ReduceOps.makeRef
实现
public static <T, I> TerminalOp<T, I>
makeRef(Collector<? super T, I, ?> collector) {
Supplier<I> supplier = Objects.requireNonNull(collector).supplier();
BiConsumer<I, ? super T> accumulator = collector.accumulator();
BinaryOperator<I> combiner = collector.combiner();
//构造了一个内部类ReducingSink
class ReducingSink extends Box<I>
implements AccumulatingSink<T, I, ReducingSink> {
@Override
public void begin(long size) {
state = supplier.get();
}
@Override
public void accept(T t) {
accumulator.accept(state, t);
}
@Override
public void combine(ReducingSink other) {
state = combiner.apply(state, other.state);
}
}
//实例化ReduceOp
return new ReduceOp<T, I, ReducingSink>(StreamShape.REFERENCE) {
@Override
//构造一个ReducingSink
public ReducingSink makeSink() {
return new ReducingSink();
}
@Override
public int getOpFlags() {
return collector.characteristics().contains(Collector.Characteristics.UNORDERED)
? StreamOpFlag.NOT_ORDERED
: 0;
}
};
}
2、AbstractPipeline.evaluate()
final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) {
assert getOutputShape() == terminalOp.inputShape();
if (linkedOrConsumed)
throw new IllegalStateException(MSG_STREAM_LINKED);
linkedOrConsumed = true;
return isParallel()
//并行条件为真,执行并行操作
? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags()))
//并行条件为假,执行串行操作
: terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));
}
3、ReduceOps.evaluateSequential()
@Override
public <P_IN> R evaluateSequential(PipelineHelper<T> helper,
Spliterator<P_IN> spliterator) {
return helper.wrapAndCopyInto(makeSink(), spliterator).get();
}
4、AbstractPipeline.wrapAndCopyInto()
@Override
final <P_IN, S extends Sink<E_OUT>> S wrapAndCopyInto(S sink, Spliterator<P_IN> spliterator) {
copyInto(wrapSink(Objects.requireNonNull(sink)), spliterator);
return sink;
}
@Override
final <P_IN> void copyInto(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) {
Objects.requireNonNull(wrappedSink);
if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())) {
wrappedSink.begin(spliterator.getExactSizeIfKnown());
spliterator.forEachRemaining(wrappedSink);
wrappedSink.end();
}
else {
copyIntoWithCancel(wrappedSink, spliterator);
}
}
@Override
@SuppressWarnings("unchecked")
final <P_IN> boolean copyIntoWithCancel(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) {
@SuppressWarnings({"rawtypes","unchecked"})
AbstractPipeline p = AbstractPipeline.this;
while (p.depth > 0) {
p = p.previousStage;
}
wrappedSink.begin(spliterator.getExactSizeIfKnown());
boolean cancelled = p.forEachWithCancel(spliterator, wrappedSink);
wrappedSink.end();
return cancelled;
}
5.Sink.ChainedReference
此类为Sink的内部类,实现了Sink接口。
abstract static class ChainedReference<T, E_OUT> implements Sink<T> {
//定义downStream
protected final Sink<? super E_OUT> downstream;
//构造函数赋值downstream
public ChainedReference(Sink<? super E_OUT> downstream) {
this.downstream = Objects.requireNonNull(downstream);
}
//开始任务
@Override
public void begin(long size) {
downstream.begin(size);
}
//开始任务
@Override
public void end() {
downstream.end();
}
//开始任务
@Override
public boolean cancellationRequested() {
return downstream.cancellationRequested();
}
}