数据流操作要么是衔接操作,要么是终止操作。当一个函数不修改数据流的底层数据源,它就是无干扰的。当一个函数的操作的执行是确定性的,它就是无状态的。
数据流可以从多种数据源创建,尤其是集合。可以有不同类型的数据流。
- /**
- * 从多种数据源创建数据流
- *
- * @author wenqy
- * @date 2020年1月17日 上午11:00:15
- */
- private void diffStreamType() {
- System.out.println(“—–>diffStreamType—–>”);
- Arrays.asList(“a1”, “a2”, “a3”)
- .stream()
- .findFirst()
- .ifPresent(System.out::println); // a1
- Stream.of(“a1”, “a2”, “a3”)
- .findFirst()
- .ifPresent(System.out::println); // a1
- IntStream.range(1, 4) // 基本数据类型 // 1 2 3
- .forEach(System.out::println);
- }
基本数据流和对象数据流间也可以转换
- /**
- * 基本数据流操作
- *
- * @author wenqy
- * @date 2020年1月17日 上午11:01:26
- */
- private void baseStream() {
- System.out.println(“—–>baseStream—–>”);
- Arrays.stream(new int[] {1, 2, 3})
- .map(n -> 2 * n + 1)
- .average() // 终止操作,求平均值
- .ifPresent(System.out::println); // 5.0
- Stream.of(“a1”, “a2”, “a3”)
- .map(s -> s.substring(1))
- .mapToInt(Integer::parseInt) // 对象数据流转换为基本数据流
- .max()
- .ifPresent(System.out::println); // 3
- IntStream.range(1, 4)
- .mapToObj(i -> “a” + i) // 基本数据流转换为对象数据流
- .forEach(System.out::println); // a1 a2 a3
- }
数据流处理时,衔接操作的一个重要特性就是延迟性,在调用链上是垂直移动的,减少每个元素上所执行的实际操作数量。
- /**
- * 处理顺序
- *
- * @author wenqy
- * @date 2020年1月17日 上午11:08:33
- */
- private void streamHandleSort() {
- System.out.println(“—–>streamHandleSort—–>”);
- Stream.of(“d2”, “a2”, “b1”, “b3”, “c”)
- .filter(s -> {
- System.out.println(“filter: “ + s);
- return true;
- })
- .forEach(s -> System.out.println(“forEach: “ + s));
- // filter: d2 forEach: d2 … 每个元素在调用链上垂直移动
- Stream.of(“d2”, “a2”, “b1”, “b3”, “c”)
- .map(s -> {
- System.out.println(“map: “ + s);
- return s.toUpperCase();
- })
- .anyMatch(s -> {
- System.out.println(“anyMatch: “ + s);
- return s.startsWith(“A”);
- });
- // map:d2 anyMatch:D2 map:a2 anyMatch:A2 anyMatch返回true时终止
- Stream.of(“d2”, “a2”, “b1”, “b3”, “c”)
- .map(s -> {
- System.out.println(“map: “ + s);
- return s.toUpperCase();
- })
- .filter(s -> {
- System.out.println(“filter: “ + s);
- return s.startsWith(“A”);
- })
- .forEach(s -> System.out.println(“forEach: “ + s));
- // map和filter会对底层集合的每个字符串调用五次,而forEach只会调用一次
- Stream.of(“d2”, “a2”, “b1”, “b3”, “c”)
- .filter(s -> {
- System.out.println(“filter: “ + s);
- return s.startsWith(“a”);
- })
- .map(s -> {
- System.out.println(“map: “ + s);
- return s.toUpperCase();
- })
- .forEach(s -> System.out.println(“forEach: “ + s));
- // filter移动到调用链的顶端 map只会调用一次 执行更快
- Stream.of(“d2”, “a2”, “b1”, “b3”, “c”)
- .sorted((s1, s2) -> {
- System.out.printf(“sort: %s; %s\n”, s1, s2);
- return s1.compareTo(s2);
- })
- .filter(s -> {
- System.out.println(“filter: “ + s);
- return s.startsWith(“a”);
- })
- .map(s -> {
- System.out.println(“map: “ + s);
- return s.toUpperCase();
- })
- .forEach(s -> System.out.println(“forEach: “ + s));
- // 排序是一类特殊的衔接操作。它是有状态的操作,因为你需要在处理中保存状态来对集合中的元素排序
- Stream.of(“d2”, “a2”, “b1”, “b3”, “c”)
- .filter(s -> {
- System.out.println(“filter: “ + s);
- return s.startsWith(“a”);
- })
- .sorted((s1, s2) -> {
- System.out.printf(“sort: %s; %s\n”, s1, s2);
- return s1.compareTo(s2);
- })
- .map(s -> {
- System.out.println(“map: “ + s);
- return s.toUpperCase();
- })
- .forEach(s -> System.out.println(“forEach: “ + s));
- // 重排调用链来优化性能,这个例子中sorted永远不会调用,极大提升性能
- }
java8的数据流不能被复用。一旦你调用了任何终止操作,数据流就关闭了,要克服这个限制,我们需要为每个我们想要执行的终止操作创建新的数据流调用链。例如,我们创建一个数据流供应器,来构建新的数据流,并且设置好所有衔接操作。
- /**
- * 复用数据流
- *
- * @author wenqy
- * @date 2020年1月17日 上午11:40:02
- */
- private void streamReuse() {
- System.out.println(“—–>streamReuse—–>”);
- Stream<String> stream =
- Stream.of(“d2”, “a2”, “b1”, “b3”, “c”)
- .filter(s -> s.startsWith(“a”));
- stream.anyMatch(s -> true); // ok
- // stream.noneMatch(s -> true); // exception java.lang.IllegalStateException: stream has already been operated upon or closed
- Supplier<Stream<String>> streamSupplier =
- () -> Stream.of(“d2”, “a2”, “b1”, “b3”, “c”)
- .filter(s -> s.startsWith(“a”));
- // 每次对get()的调用都构造了一个新的数据流,我们将其保存来调用终止操作
- streamSupplier.get().anyMatch(s -> true); // ok
- streamSupplier.get().noneMatch(s -> true); // ok
- }
collect
collect
是非常有用的终止操作,将流中的元素存放在不同类型的结果中,例如List、Set或者Map。collect
接受收集器(Collector
),它由四个不同的操作组成:供应器(supplier
)、累加器(accumulator
)、组合器(combiner
)和终止器(finisher
)。
- /**
- * collect是非常有用的终止操作,将流中的元素存放在不同类型的结果中,例如List、Set或者Map。
- * collect接受收集器(Collector),它由四个不同的操作组成:
- * 供应器(supplier)、累加器(accumulator)、组合器(combiner)和终止器(finisher)
- *
- * @author wenqy
- * @date 2020年1月17日 上午11:45:18
- */
- private void streamCollect() {
- System.out.println(“—–>streamCollect—–>”);
- List<Person> persons = getPersionList();
- List<Person> filtered =
- persons
- .stream()
- .filter(p -> p.getFirstName().startsWith(“P”))
- .collect(Collectors.toList()); // 构造list
- System.out.println(filtered); // [Person [firstName=Peter, lastName=null, age=23], Person [firstName=Pamela, lastName=null, age=23]]
- Map<Integer, List<Person>> personsByAge = persons
- .stream()
- .collect(Collectors.groupingBy(p -> p.getAge())); // 构造map key: age
- personsByAge
- .forEach((age, p) -> System.out.format(“age %s: %s\n”, age, p));
- IntSummaryStatistics ageSummary =
- persons
- .stream()
- .collect(Collectors.summarizingInt(p -> p.getAge()));
- System.out.println(ageSummary); // 统计:简单计算最小年龄、最大年龄、算术平均年龄、总和和数量
- String phrase = persons
- .stream()
- .filter(p -> p.getAge() >= 18)
- .map(p -> p.getFirstName()) // 键必须是唯一的,否则会抛出IllegalStateException异常
- .collect(Collectors.joining(” and “, “In China “, ” are of legal age.”));
- System.out.println(phrase); // 所有人连接为一个字符串
- Collector<Person, StringJoiner, String> personNameCollector =
- Collector.of(
- () -> new StringJoiner(” | “), // supplier
- (j, p) -> j.add(p.getFirstName().toUpperCase()), // accumulator
- (j1, j2) -> j1.merge(j2), // combiner
- StringJoiner::toString); // finisher
- String names = persons
- .stream()
- .collect(personNameCollector);
- System.out.println(names); // MAX | PETER | PAMELA | DAVID
- // 构建自己特殊收集器。将流中的所有人转换为一个字符串,包含所有大写的名称,并以|分割。
- }
flatMap
我们已经了解了如何通过使用map操作,将流中的对象转换为另一种类型。map有时十分受限,因为每个对象只能映射为一个其它对象。但如何我希望将一个对象转换为多个或零个其他对象呢?flatMap
这时就会派上用场。
flatMap
将流中的每个元素,转换为其它对象的流。所以每个对象会被转换为零个、一个或多个其它对象,以流的形式返回。这些流的内容之后会放进flatMap
所返回的流中。
- /**
- * flatMap将流中的每个元素,转换为其它对象的流。
- * 所以每个对象会被转换为零个、一个或多个其它对象,以流的形式返回。
- * 这些流的内容之后会放进flatMap所返回的流中
- *
- * @author wenqy
- * @date 2020年1月17日 下午1:38:09
- */
- private void streamFlatMap() {
- System.out.println(“—–>streamFlatMap—–>”);
- List<Foo> foos = new ArrayList<>();
- // create foos
- IntStream
- .range(1, 4)
- .forEach(i -> foos.add(new Foo(“Foo” + i)));
- // create bars
- foos.forEach(f ->
- IntStream
- .range(1, 4)
- .forEach(i -> f.bars.add(new Bar(“Bar” + i + ” <- “ + f.name))));
- foos.stream()
- .flatMap(f -> f.bars.stream())
- .forEach(b -> System.out.println(b.name)); // 将含有三个foo对象中的流转换为含有九个bar对象的流
- IntStream.range(1, 4)
- .mapToObj(i -> new Foo(“Foo” + i))
- .peek(f -> IntStream.range(1, 4)
- .mapToObj(i -> new Bar(“Bar” + i + ” <- “ + f.name))
- .forEach(f.bars::add)) // 简化为流式操作的单一流水线
- .flatMap(f -> f.bars.stream())
- .forEach(b -> System.out.println(b.name));
- Optional.of(new Outer())
- .flatMap(o -> Optional.ofNullable(o.nested))
- .flatMap(n -> Optional.ofNullable(n.inner))
- .flatMap(i -> Optional.ofNullable(i.foo))
- .ifPresent(System.out::println);
- // 如果存在的话,每个flatMap的调用都会返回预期对象的Optional包装,
- // 否则为null的Optional包装,避免潜在NullPointerException
- }
reduce
归约操作将所有流中的元素组合为单一结果。Java8支持三种不同类型的reduce
方法。
- /**
- * 归约操作将所有流中的元素组合为单一结果
- *
- * @author wenqy
- * @date 2020年1月17日 下午2:01:23
- */
- private void streamReduce() {
- System.out.println(“—–>streamReduce—–>”);
- List<Person> persons = getPersionList();
- persons
- .stream()
- .reduce((p1, p2) -> p1.getAge() > p2.getAge() ? p1 : p2)
- .ifPresent(System.out::println); // 计算年龄最大的人 Pamela
- Person result =
- persons
- .stream()
- .reduce(new Person(“”, 0), (p1, p2) -> {
- p1.setAge(p1.getAge() + p2.getAge());
- p1.setFirstName(p1.getFirstName() + p2.getFirstName());
- return p1; // 构造带有聚合后名称和年龄的新Person对象
- });
- // name=MaxPeterPamelaDavid; age=76
- System.out.format(“name=%s; age=%s.\n”, result.getFirstName(), result.getAge());
- Integer ageSum = persons
- .stream()
- .reduce(0, (sum, p) -> sum += p.getAge(), (sum1, sum2) -> sum1 + sum2);
- System.out.println(ageSum); // 计算所有人的年龄总和 76
- Integer ageSum2 = persons
- .stream()
- .reduce(0,
- (sum, p) -> {
- System.out.format(“accumulator: sum=%s; person=%s\n”, sum, p);
- return sum += p.getAge();
- },
- (sum1, sum2) -> {
- System.out.format(“combiner: sum1=%s; sum2=%s\n”, sum1, sum2);
- return sum1 + sum2;
- });
- System.out.println(ageSum2); // 输出调试信息,combiner并没有输出
- Integer ageSum3 = persons
- .parallelStream()
- .reduce(0,
- (sum, p) -> {
- System.out.format(“accumulator: sum=%s; person=%s [%s]\n”, sum, p, Thread.currentThread().getName());
- return sum += p.getAge();
- },
- (sum1, sum2) -> {
- System.out.format(“combiner: sum1=%s; sum2=%s [%s]\n”, sum1, sum2, Thread.currentThread().getName());
- return sum1 + sum2;
- });
- System.out.println(ageSum3); // 并行方式
- }
parallelStream
流可以并行执行,在大量输入元素上可以提升运行时的性能。并行流使用公共的ForkJoinPool
,由ForkJoinPool.commonPool()
方法提供。底层线程池的大小最大为五个线程 — 取决于CPU的物理核数。
组合器函数只在并行流中调用,而不在串行流中调用
- /**
- * 并行流
- *
- * @author wenqy
- * @date 2020年1月17日 下午2:31:57
- */
- private void streamParallel() {
- System.out.println(“—–>streamParallel—–>”);
- ForkJoinPool commonPool = ForkJoinPool.commonPool();
- System.out.println(commonPool.getParallelism()); // 底层线程池的大小 — 取决于CPU的物理核数 本机 默认 7
- // 可用JVM参数增减 -Djava.util.concurrent.ForkJoinPool.common.parallelism=5
- Arrays.asList(“a1”, “a2”, “b1”, “c2”, “c1”)
- .parallelStream()
- .filter(s -> {
- System.out.format(“filter: %s [%s]\n”,
- s, Thread.currentThread().getName());
- return true;
- })
- .map(s -> {
- System.out.format(“map: %s [%s]\n”,
- s, Thread.currentThread().getName());
- return s.toUpperCase();
- })
- .forEach(s -> System.out.format(“forEach: %s [%s]\n”,
- s, Thread.currentThread().getName()));
- // 并行流使用了所有公共的ForkJoinPool中的可用线程来执行流式操作
- Arrays.asList(“a1”, “a2”, “b1”, “c2”, “c1”)
- .parallelStream()
- .filter(s -> {
- System.out.format(“filter: %s [%s]\n”,
- s, Thread.currentThread().getName());
- return true;
- })
- .map(s -> {
- System.out.format(“map: %s [%s]\n”,
- s, Thread.currentThread().getName());
- return s.toUpperCase();
- })
- .sorted((s1, s2) -> {
- System.out.format(“sort: %s <> %s [%s]\n”,
- s1, s2, Thread.currentThread().getName());
- return s1.compareTo(s2);
- })
- .forEach(s -> System.out.format(“forEach: %s [%s]\n”,
- s, Thread.currentThread().getName()));
- // sort看起来只在主线程上串行执行。实际上,并行流上的sort在背后使用了Java8中新的方法Arrays.parallelSort()。
- // 如javadoc所说,这个方法会参照数据长度来决定以串行或并行来执行,如果指定数据的长度小于最小粒度,它使用相应的Arrays.sort方法来排序
- // 所有并行流操作都共享相同的JVM相关的公共ForkJoinPool。所以你可能需要避免实现又慢又卡的流式操作,因为它可能会拖慢你应用中严重依赖并行流的其它部分。
- }
参考
https://github.com/winterbe/java8-tutorial java8教程
https://wizardforcel.gitbooks.io/modern-java/content/ 中文译站
发表评论