java8数据流再识

数据流操作要么是衔接操作,要么是终止操作。当一个函数不修改数据流的底层数据源,它就是无干扰的。当一个函数的操作的执行是确定性的,它就是无状态的。

数据流可以从多种数据源创建,尤其是集合。可以有不同类型的数据流。

  1. /**
  2.      * 从多种数据源创建数据流
  3.      * 
  4.      * @author wenqy
  5.      * @date 2020年1月17日 上午11:00:15
  6.      */
  7.     private void diffStreamType() {
  8.         System.out.println(“—–>diffStreamType—–>”);
  9.         Arrays.asList(“a1”“a2”“a3”)
  10.             .stream()
  11.             .findFirst()
  12.             .ifPresent(System.out::println);  // a1
  13.         Stream.of(“a1”“a2”“a3”)
  14.             .findFirst()
  15.             .ifPresent(System.out::println);  // a1
  16.         IntStream.range(14)   // 基本数据类型 // 1 2 3
  17.             .forEach(System.out::println);
  18.     }

基本数据流和对象数据流间也可以转换

  1. /**
  2.      * 基本数据流操作
  3.      * 
  4.      * @author wenqy
  5.      * @date 2020年1月17日 上午11:01:26
  6.      */
  7.     private void baseStream() {
  8.         System.out.println(“—–>baseStream—–>”);
  9.         Arrays.stream(new int[] {123})
  10.             .map(n -> 2 * n + 1)
  11.             .average() // 终止操作,求平均值
  12.             .ifPresent(System.out::println);  // 5.0
  13.         Stream.of(“a1”“a2”“a3”)
  14.             .map(s -> s.substring(1))
  15.             .mapToInt(Integer::parseInt) // 对象数据流转换为基本数据流
  16.             .max()
  17.             .ifPresent(System.out::println);  // 3
  18.         IntStream.range(14)
  19.             .mapToObj(i -> “a” + i) // 基本数据流转换为对象数据流
  20.             .forEach(System.out::println); // a1 a2 a3
  21.     }

数据流处理时,衔接操作的一个重要特性就是延迟性,在调用链上是垂直移动的,减少每个元素上所执行的实际操作数量。

  1. /**
  2.      * 处理顺序
  3.      * 
  4.      * @author wenqy
  5.      * @date 2020年1月17日 上午11:08:33
  6.      */
  7.     private void streamHandleSort() {
  8.         System.out.println(“—–>streamHandleSort—–>”);
  9.         Stream.of(“d2”“a2”“b1”“b3”“c”)
  10.             .filter(s -> {
  11.                 System.out.println(“filter: “ + s);
  12.                 return true;
  13.             })
  14.             .forEach(s -> System.out.println(“forEach: “ + s));
  15.         // filter:  d2 forEach: d2 … 每个元素在调用链上垂直移动
  16.         Stream.of(“d2”“a2”“b1”“b3”“c”)
  17.             .map(s -> {
  18.                 System.out.println(“map: “ + s);
  19.                 return s.toUpperCase();
  20.             })
  21.             .anyMatch(s -> {
  22.                 System.out.println(“anyMatch: “ + s);
  23.                 return s.startsWith(“A”);
  24.             });
  25.         // map:d2 anyMatch:D2 map:a2 anyMatch:A2 anyMatch返回true时终止
  26.         Stream.of(“d2”“a2”“b1”“b3”“c”)
  27.             .map(s -> {
  28.                 System.out.println(“map: “ + s);
  29.                 return s.toUpperCase();
  30.             })
  31.             .filter(s -> {
  32.                 System.out.println(“filter: “ + s);
  33.                 return s.startsWith(“A”);
  34.             })
  35.             .forEach(s -> System.out.println(“forEach: “ + s));
  36.         // map和filter会对底层集合的每个字符串调用五次,而forEach只会调用一次
  37.         Stream.of(“d2”“a2”“b1”“b3”“c”)
  38.             .filter(s -> {
  39.                 System.out.println(“filter: “ + s);
  40.                 return s.startsWith(“a”);
  41.             })
  42.             .map(s -> {
  43.                 System.out.println(“map: “ + s);
  44.                 return s.toUpperCase();
  45.             })
  46.             .forEach(s -> System.out.println(“forEach: “ + s));
  47.         // filter移动到调用链的顶端  map只会调用一次 执行更快
  48.         Stream.of(“d2”“a2”“b1”“b3”“c”)
  49.             .sorted((s1, s2) -> {
  50.                 System.out.printf(“sort: %s; %s\n”, s1, s2);
  51.                 return s1.compareTo(s2);
  52.             })
  53.             .filter(s -> {
  54.                 System.out.println(“filter: “ + s);
  55.                 return s.startsWith(“a”);
  56.             })
  57.             .map(s -> {
  58.                 System.out.println(“map: “ + s);
  59.                 return s.toUpperCase();
  60.             })
  61.             .forEach(s -> System.out.println(“forEach: “ + s));
  62.         // 排序是一类特殊的衔接操作。它是有状态的操作,因为你需要在处理中保存状态来对集合中的元素排序
  63.         Stream.of(“d2”“a2”“b1”“b3”“c”)
  64.             .filter(s -> {
  65.                 System.out.println(“filter: “ + s);
  66.                 return s.startsWith(“a”);
  67.             })
  68.             .sorted((s1, s2) -> {
  69.                 System.out.printf(“sort: %s; %s\n”, s1, s2);
  70.                 return s1.compareTo(s2);
  71.             })
  72.             .map(s -> {
  73.                 System.out.println(“map: “ + s);
  74.                 return s.toUpperCase();
  75.             })
  76.             .forEach(s -> System.out.println(“forEach: “ + s));
  77.         // 重排调用链来优化性能,这个例子中sorted永远不会调用,极大提升性能
  78.     }

java8的数据流不能被复用。一旦你调用了任何终止操作,数据流就关闭了,要克服这个限制,我们需要为每个我们想要执行的终止操作创建新的数据流调用链。例如,我们创建一个数据流供应器,来构建新的数据流,并且设置好所有衔接操作。

  1. /**
  2.      * 复用数据流
  3.      * 
  4.      * @author wenqy
  5.      * @date 2020年1月17日 上午11:40:02
  6.      */
  7.     private void streamReuse() {
  8.         System.out.println(“—–>streamReuse—–>”);
  9.         Stream<String> stream =
  10.                 Stream.of(“d2”“a2”“b1”“b3”“c”)
  11.                     .filter(s -> s.startsWith(“a”));
  12.         stream.anyMatch(s -> true);    // ok
  13. //      stream.noneMatch(s -> true);   // exception  java.lang.IllegalStateException: stream has already been operated upon or closed
  14.         Supplier<Stream<String>> streamSupplier =
  15.                 () -> Stream.of(“d2”“a2”“b1”“b3”“c”)
  16.                         .filter(s -> s.startsWith(“a”));
  17.         // 每次对get()的调用都构造了一个新的数据流,我们将其保存来调用终止操作
  18.         streamSupplier.get().anyMatch(s -> true);   // ok
  19.         streamSupplier.get().noneMatch(s -> true);  // ok
  20.     }

collect

collect是非常有用的终止操作,将流中的元素存放在不同类型的结果中,例如List、Set或者Map。collect接受收集器(Collector),它由四个不同的操作组成:供应器(supplier)、累加器(accumulator)、组合器(combiner)和终止器(finisher)。

  1. /**
  2.      * collect是非常有用的终止操作,将流中的元素存放在不同类型的结果中,例如List、Set或者Map。
  3.      * collect接受收集器(Collector),它由四个不同的操作组成:
  4.      *  供应器(supplier)、累加器(accumulator)、组合器(combiner)和终止器(finisher)
  5.      * 
  6.      * @author wenqy
  7.      * @date 2020年1月17日 上午11:45:18
  8.      */
  9.     private void streamCollect() {
  10.         System.out.println(“—–>streamCollect—–>”);
  11.         List<Person> persons = getPersionList();
  12.         List<Person> filtered =
  13.             persons
  14.                 .stream()
  15.                 .filter(p -> p.getFirstName().startsWith(“P”))
  16.                 .collect(Collectors.toList()); // 构造list
  17.         System.out.println(filtered);    // [Person [firstName=Peter, lastName=null, age=23], Person [firstName=Pamela, lastName=null, age=23]]
  18.         Map<Integer, List<Person>> personsByAge = persons
  19.             .stream()
  20.             .collect(Collectors.groupingBy(p -> p.getAge())); // 构造map key: age
  21.         personsByAge
  22.             .forEach((age, p) -> System.out.format(“age %s: %s\n”, age, p));
  23.         IntSummaryStatistics ageSummary =
  24.             persons
  25.                 .stream()
  26.                 .collect(Collectors.summarizingInt(p -> p.getAge()));
  27.         System.out.println(ageSummary); // 统计:简单计算最小年龄、最大年龄、算术平均年龄、总和和数量
  28.         String phrase = persons
  29.             .stream()
  30.             .filter(p -> p.getAge() >= 18)
  31.             .map(p -> p.getFirstName()) // 键必须是唯一的,否则会抛出IllegalStateException异常
  32.             .collect(Collectors.joining(” and ““In China “” are of legal age.”));
  33.         System.out.println(phrase); // 所有人连接为一个字符串
  34.         Collector<Person, StringJoiner, String> personNameCollector =
  35.             Collector.of(
  36.                 () -> new StringJoiner(” | “),          // supplier
  37.                 (j, p) -> j.add(p.getFirstName().toUpperCase()),  // accumulator
  38.                 (j1, j2) -> j1.merge(j2),               // combiner
  39.                 StringJoiner::toString);                // finisher
  40.         String names = persons
  41.             .stream()
  42.             .collect(personNameCollector);
  43.         System.out.println(names);  // MAX | PETER | PAMELA | DAVID
  44.         // 构建自己特殊收集器。将流中的所有人转换为一个字符串,包含所有大写的名称,并以|分割。
  45.     }

flatMap

我们已经了解了如何通过使用map操作,将流中的对象转换为另一种类型。map有时十分受限,因为每个对象只能映射为一个其它对象。但如何我希望将一个对象转换为多个或零个其他对象呢?flatMap这时就会派上用场。

flatMap将流中的每个元素,转换为其它对象的流。所以每个对象会被转换为零个、一个或多个其它对象,以流的形式返回。这些流的内容之后会放进flatMap所返回的流中。

  1. /**
  2.  * flatMap将流中的每个元素,转换为其它对象的流。
  3.  * 所以每个对象会被转换为零个、一个或多个其它对象,以流的形式返回。
  4.  * 这些流的内容之后会放进flatMap所返回的流中
  5.  * 
  6.  * @author wenqy
  7.  * @date 2020年1月17日 下午1:38:09
  8.  */
  9. private void streamFlatMap() {
  10.     System.out.println(“—–>streamFlatMap—–>”);
  11.     List<Foo> foos = new ArrayList<>();
  12.     // create foos
  13.     IntStream
  14.         .range(14)
  15.         .forEach(i -> foos.add(new Foo(“Foo” + i)));
  16.     // create bars
  17.     foos.forEach(f ->
  18.         IntStream
  19.             .range(14)
  20.             .forEach(i -> f.bars.add(new Bar(“Bar” + i + ” <- “ + f.name))));
  21.     foos.stream()
  22.         .flatMap(f -> f.bars.stream())
  23.         .forEach(b -> System.out.println(b.name)); // 将含有三个foo对象中的流转换为含有九个bar对象的流
  24.     IntStream.range(14)
  25.         .mapToObj(i -> new Foo(“Foo” + i))
  26.         .peek(f -> IntStream.range(14)
  27.             .mapToObj(i -> new Bar(“Bar” + i + ” <- “ + f.name))
  28.             .forEach(f.bars::add)) // 简化为流式操作的单一流水线
  29.         .flatMap(f -> f.bars.stream())
  30.         .forEach(b -> System.out.println(b.name));
  31.     Optional.of(new Outer())
  32.         .flatMap(o -> Optional.ofNullable(o.nested))
  33.         .flatMap(n -> Optional.ofNullable(n.inner))
  34.         .flatMap(i -> Optional.ofNullable(i.foo))
  35.         .ifPresent(System.out::println);
  36.     // 如果存在的话,每个flatMap的调用都会返回预期对象的Optional包装,
  37.     // 否则为null的Optional包装,避免潜在NullPointerException
  38. }

reduce

归约操作将所有流中的元素组合为单一结果。Java8支持三种不同类型的reduce方法。

  1. /**
  2.      * 归约操作将所有流中的元素组合为单一结果
  3.      * 
  4.      * @author wenqy
  5.      * @date 2020年1月17日 下午2:01:23
  6.      */
  7.     private void streamReduce() {
  8.         System.out.println(“—–>streamReduce—–>”);
  9.         List<Person> persons = getPersionList();
  10.         persons
  11.             .stream()
  12.             .reduce((p1, p2) -> p1.getAge() > p2.getAge() ? p1 : p2)
  13.             .ifPresent(System.out::println);    // 计算年龄最大的人 Pamela
  14.         Person result =
  15.             persons
  16.                 .stream()
  17.                 .reduce(new Person(“”0), (p1, p2) -> {
  18.                     p1.setAge(p1.getAge() + p2.getAge());
  19.                     p1.setFirstName(p1.getFirstName() + p2.getFirstName());
  20.                     return p1;  // 构造带有聚合后名称和年龄的新Person对象
  21.                 });
  22.         // name=MaxPeterPamelaDavid; age=76
  23.         System.out.format(“name=%s; age=%s.\n”, result.getFirstName(), result.getAge());
  24.         Integer ageSum = persons
  25.             .stream()
  26.             .reduce(0, (sum, p) -> sum += p.getAge(), (sum1, sum2) -> sum1 + sum2);
  27.         System.out.println(ageSum);  // 计算所有人的年龄总和 76
  28.         Integer ageSum2 = persons
  29.             .stream()
  30.             .reduce(0,
  31.                 (sum, p) -> {
  32.                     System.out.format(“accumulator: sum=%s; person=%s\n”, sum, p);
  33.                     return sum += p.getAge();
  34.                 },
  35.                 (sum1, sum2) -> {
  36.                     System.out.format(“combiner: sum1=%s; sum2=%s\n”, sum1, sum2);
  37.                     return sum1 + sum2;
  38.                 });
  39.         System.out.println(ageSum2); // 输出调试信息,combiner并没有输出
  40.         Integer ageSum3 = persons
  41.             .parallelStream()
  42.             .reduce(0,
  43.                 (sum, p) -> {
  44.                     System.out.format(“accumulator: sum=%s; person=%s [%s]\n”, sum, p, Thread.currentThread().getName());
  45.                     return sum += p.getAge();
  46.                 },
  47.                 (sum1, sum2) -> {
  48.                     System.out.format(“combiner: sum1=%s; sum2=%s [%s]\n”, sum1, sum2, Thread.currentThread().getName());
  49.                     return sum1 + sum2;
  50.                 });
  51.         System.out.println(ageSum3); // 并行方式
  52.     }

parallelStream

流可以并行执行,在大量输入元素上可以提升运行时的性能。并行流使用公共的ForkJoinPool,由ForkJoinPool.commonPool()方法提供。底层线程池的大小最大为五个线程 — 取决于CPU的物理核数。

组合器函数只在并行流中调用,而不在串行流中调用

  1. /**
  2.      * 并行流
  3.      * 
  4.      * @author wenqy
  5.      * @date 2020年1月17日 下午2:31:57
  6.      */
  7.     private void streamParallel() {
  8.         System.out.println(“—–>streamParallel—–>”);
  9.         ForkJoinPool commonPool = ForkJoinPool.commonPool();
  10.         System.out.println(commonPool.getParallelism());    // 底层线程池的大小 — 取决于CPU的物理核数 本机 默认 7 
  11.         // 可用JVM参数增减 -Djava.util.concurrent.ForkJoinPool.common.parallelism=5
  12.         Arrays.asList(“a1”“a2”“b1”“c2”“c1”)
  13.             .parallelStream()
  14.             .filter(s -> {
  15.                 System.out.format(“filter: %s [%s]\n”,
  16.                     s, Thread.currentThread().getName());
  17.                 return true;
  18.             })
  19.             .map(s -> {
  20.                 System.out.format(“map: %s [%s]\n”,
  21.                     s, Thread.currentThread().getName());
  22.                 return s.toUpperCase();
  23.             })
  24.             .forEach(s -> System.out.format(“forEach: %s [%s]\n”,
  25.                 s, Thread.currentThread().getName()));
  26.         // 并行流使用了所有公共的ForkJoinPool中的可用线程来执行流式操作
  27.         Arrays.asList(“a1”“a2”“b1”“c2”“c1”)
  28.             .parallelStream()
  29.             .filter(s -> {
  30.                 System.out.format(“filter: %s [%s]\n”,
  31.                     s, Thread.currentThread().getName());
  32.                 return true;
  33.             })
  34.             .map(s -> {
  35.                 System.out.format(“map: %s [%s]\n”,
  36.                     s, Thread.currentThread().getName());
  37.                 return s.toUpperCase();
  38.             })
  39.             .sorted((s1, s2) -> {
  40.                 System.out.format(“sort: %s <> %s [%s]\n”,
  41.                     s1, s2, Thread.currentThread().getName());
  42.                 return s1.compareTo(s2);
  43.             })
  44.             .forEach(s -> System.out.format(“forEach: %s [%s]\n”,
  45.                 s, Thread.currentThread().getName()));
  46.         // sort看起来只在主线程上串行执行。实际上,并行流上的sort在背后使用了Java8中新的方法Arrays.parallelSort()。
  47.         // 如javadoc所说,这个方法会参照数据长度来决定以串行或并行来执行,如果指定数据的长度小于最小粒度,它使用相应的Arrays.sort方法来排序
  48.         // 所有并行流操作都共享相同的JVM相关的公共ForkJoinPool。所以你可能需要避免实现又慢又卡的流式操作,因为它可能会拖慢你应用中严重依赖并行流的其它部分。
  49.     }

参考

https://github.com/winterbe/java8-tutorial java8教程

https://wizardforcel.gitbooks.io/modern-java/content/ 中文译站

发表评论

电子邮件地址不会被公开。 必填项已用*标注

9 + 8 = ?