Java8 之 Stream

wiki

近些年来,随着大数据概念的崛起,函数式、流式、链式编程也逐渐开始走进主流视野,而 Stream 就是使用类 SQL 语句方式,将结构化和集合化数据,使用一种更加符合直觉的方式进行处理,代码结构也更加简洁

特点

  • 无结构 : 管道从 source 端获取数据,经中间操作后,输出到 sink 端,而原始的输入数据不会收到任何影响
  • 无索引 : 无法使用索引就行随机访问
  • 惰性化 : 大部分 Stream 操作是延迟的,直到最后触发时才回回溯计算,输出结果
  • 并行性 : 可以使用 Fork-Join 的方式处理数据流,提高效率
  • 短路性 : 有一些 short-circuiting 操作,无需遍历全部数据,就可输出结果

类型

  • 中间操作(Intermediate) : 一个流其后可接无数个中间操作,可以完成过滤和分组等操作,然后扔会返回一个中间操作,由于其惰性,在终止操作前,不会触发流处理
  • 终止操作(Terminal) : 一个流只能有一个终止操作,其真正触发流处理,它是流的最后一个操作

1. 流创建

1.1. 集合

从 JRE 8 开始,Collection 提供了获取流的方法

1
2
3
List<String> list = ...;
list.stream(); // 顺序流
list.parallelStream(); // 并行流

1.2. 数组

Arrays 类提供了静态方法 stream() 获取流

1
2
3
Integer[] arr = ...;
Arrays.stream(arr);
Arrays.stream(arr, 0, arr.length);

1.3. Stream 类

util 包中新增了 Stream 接口,其提供获取流的静态方法,可以创建有限流、无限流和空流

1
2
3
4
5
6
7
// 有限流
Stream.of("a", "b", "c");
// 无限流,可以使用 limit 等方式取有限部分
Stream.iterate(1, f -> f * 2).limit(5).forEach(System.out::println);
Stream.generate(() -> Math.random()).limit(5).forEach(System.out::println);
// 空流,一般用来避免调用流参数时的空指针异常
Stream.empty();

2. 流转换

Stream 中间操作是惰性的,只会在遇到终止操作时一次性回溯式处理数据

2.1. 切片

  • distinct()

根据 equals() 方法去重,可重写该方法实现定制去重。对顺序流会保留最先出现的元素,无序流则不同元素的选择不一定是稳定的

  • filter(Predicate predicate)

筛选,抛弃返回 false 的元素

  • limit(long maxSize)

截断,使其元素长度不超过给定数量

  • peek(Consumer action)

消费或处理每个元素,常用来打印中间操作结果

  • skip(long n)

跳过元素,返回一个扔掉前 n 个元素的流,如果元素不足 n ,返回空流。

2.2. 映射

  • map(Function f)

  • mapToInt(ToIntFunction f)

一对一:其会应用到每一个元素上,并将其映射成一个新元素。同样有 Double、Long 类型

1
list.stream().map(e -> e.toUpperCase()).forEach(System.out::println);
  • flatMap(Function f)

  • flatMapToInt(Function f)

多对一:其会应用到每一个元素上,并将元素映射成另一个流,然后把所有流连接成一个流。同样有 Double、Long 类型

1
2
3
4
5
6
7
String s1 = "a,b,c";
String s2 = "d,e,f";
List<String> l = new ArrayList<>();
l.add(s1);
l.add(s2);
List<String> result = l.stream().flatMap(e -> Arrays.stream(e.split(","))).collect(Collectors.toList());
System.out.println(result); // [a, b, c, d, e, f]

2.3. 排序

  • sorted()

  • sorted(Comparator comp)

产生新流,按照自然顺序或者比较器顺序排序

1
2
3
4
5
6
7
8
9
10
11
12
13
14
//自然序排序一个list
list.stream().sorted()

//自然序逆序元素,使用Comparator 提供的reverseOrder() 方法
list.stream().sorted(Comparator.reverseOrder())

//使用 Comparator 来排序一个list
list.stream().sorted(Comparator.comparing(Student::getAge))

//使用多字段 Comparator 来排序一个list
list.stream().sorted(Comparator.comparing(Student::getAge).thenComparing(Student::getLevel, Comparator.reverseOrder()))

//颠倒使用 Comparator 来排序一个list的顺序,使用Comparator 提供的reverseOrder() 方法
list.stream().sorted(Comparator.comparing(Student::getAge).reversed())

3. 流终止

终止操作会在流最后生成结果

3.1. 查找

查找的方法全是 short-circuiting 方法,当匹配到不符合条件的元素时,会立即停止流,输出结果

  • allMatch(Predicate p)

检查是否匹配所有元素。只有所有的元素都匹配条件时,方法才会返回 true

  • anyMatch(Predicate p)

检查是否匹配所有元素。只要有任意一个元素符合条件,方法就会返回 true

  • noneMatch(Predicate p)

检查是否匹配所有元素。只有所有的元素都不符合条件时,方法才会返回 true

  • findFirst()

返回第一个元素,比如排序之后的第一名

  • findAny()

返回当前流中的任意元素

3.2. 统计

统计方法会把所有元素全部遍历完才会输出结果

  • count()

返回流中元素总数

  • max(Comparator c)

返回流中最大值

  • min(Comparator c)

返回流中最小值

  • forEach(Consumer c)

  • forEachOrdered(Consumer c)

按照流的遇到顺序为此流的每个元素执行一个操作

1
2
3
4
5
6
7
List<User> list = ...;
int max = list.stream().mapToInt(User::getAge).max().orElse(0);
int min = list.stream().mapToInt(User::getAge).min().orElse(0);
int sum = list.stream().mapToInt(User::getAge).sum();
double average = list.stream().mapToInt(User::getAge).average().getAsDouble();
// 自定比较器
max = list.stream().map(User::getAge).max(Comparator.comparing(Integer::intValue)).orElse(0);

3.3. 规约

  • reduce(BinaryOperator<T> accumulator)

  • reduce(T identity, BinaryOperator<T> accumulator)

  • reduce(U identity, BiFunction<U, ? super T, U> accumulator, BinaryOperator<U> combiner)

可以将流中的元素反复结合起来,得到一个值

参数含义:

  • accumulator : BinaryOperator<T> 实现自 BiFunction<<T,T,T> , 其 3 个泛型都是一样的, 即实现将前两个元素结合, 输出第三个
  • identity : 即元素的初始值
  • combiner : 合并多线程计算的结果
1
2
3
4
5
6
List<Integer> list = Arrays.asList(1,2,3,4,5,6,7,8,9,10);
Integer sum = list.stream().reduce(0, (x, y) -> x + y);
System.out.println(sum);

Optional<Double> op = employees.stream().map(Employee::getSalary).reduce(Double::sum);
System.out.println(op.get());

3.4. 收集

  • collect(supplier, accumulator, combiner)

类似 reduce 的 3 参数构造方法

  • supplier : Supplier<R>, 初始元素, 一般是一个集合
  • accumulator : BiConsumer<R, ? super T>, 叠加器, 确定如何把元素 T 添加进初始集合 R
  • combiner : BiConsumer<R, R>, 合并结果
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// reduce 实现 filter 功能
public static <T> List<T> filter(Stream<T> stream, Predicate<T> predicate) {
return stream.reduce(new ArrayList<T>(),
(acc, t) -> {
if (predicate.test(t)) {
List<T> lists = new ArrayList<T>(acc);
lists.add(t);
return lists;
}
return acc;
},
(List<T> left, List<T> right) -> {
List<T> lists = new ArrayList<T>(left);
lists.addAll(right);
return lists;
});
}
// collect 实现 filter 功能
public static <T> List<T> filter(Stream<T> stream, Predicate<T> predicate) {
return stream.collect(ArrayList::new,
(acc, t) -> {
if (predicate.test(t))
acc.add(t);
},
ArrayList::addAll);
}

// collect 实现 toMap 功能
stream.collect(HashMap::new, (acc, e) -> {acc.put(K, V)}, HashMap:putAll);
  • collect(Collector c)

将流转换为其他形式。接收一个 Collector 接口的实现。用于给 Stream 中元素做汇总方法

Collectors 实用类提供了很多静态方法,有收集、统计、规约等操作

3.4.1. 集合

创建相应集合,并返回

  • toList()
  • toSet()
  • toMap()
  • toCollection()
1
2
List employees = list.stream().collect(Collectors.toList());
Collection employees = list.stream().collect(Collectors.toCollection(ArrayList::new));
3.4.1.1.toMap()

有 3 种重载方法

  • toMap(keyMapper, valueMapper)
  • toMap(keyMapper, valueMapper, mergeFunction)
  • toMap(keyMapper, valueMapper, mergeFunction, mapSupplier)

参数含义:

  • keyMapper : Function<? super T, ? extends K>, key 的映射函数
  • valueMapper : Function<? super T, ? extends U>, value 的映射函数
  • mergeFunction : BinaryOperator<U>, 当 key 冲突时, 调用的合并方法, 例如 (k1,k2)->k1, 如果 k1 和 k2 的 key 相同, 则选择 k1 的 value 值
  • mapSupplier : Supplier<M>, 默认返回 HashMap, 可以使用它返回其他类型 map

3.4.2. 统计

  • counting():计算元素个数
  • summingInt():返回 int,计算 Integer 元素和。同样有 Double、Long 类型
  • averaingInt():返回 double,计算 Integer 元素平均值。同样有 Double、Long 类型
  • summarizingInt():返回 IntSummaryStatistics,收集 Integer 元素的统计值。同样有 Double、Long 类型

IntSummaryStatistics 类有相关统计属性可以调用。

1
2
3
4
5
6
7
8
9
10
long count = list.stream().collect(Collectors.counting());
int total = list.stream().collect(Collectors.summingInt(Employee::getSalary));
double avg = list.stream().collect(Collectors.averaingInt(Employee::getSalary));

IntSummaryStatistics iss = list.stream().collect(Collectors.summarizingInt(Employee::getSalary));
System.out.println(iss.getCount()); // 数量
System.out.println(iss.getSum()); // 求和
System.out.println(iss.getAverage)); // 平均值
System.out.println(iss.getMax()); // 最大值
System.out.println(iss.getMin()); // 最小值
  • maxBy():返回 Optional,根据比较器选择最大值
  • minBy():返回 Optional,根据比较器选择最小值
    1
    2
    3
    Stream<String> stream = Stream.of("1", "2", "4", "8");
    Optional<String> max = stream.collect(Collectors.maxBy(String::compareTo));
    Optional<String> min= stream.collect(Collectors.minBy(String::compareTo));

3.4.3. 拼接

  • joining():拼接流中的每个字符串
    1
    2
    3
    4
    // Collectors.joining() : 直接拼接
    // Collectors.joining(String s) : 拼接符
    // Collectors.joining(String s, String prefix, String suffix) : 拼接符,前缀,后缀
    String result = Stream.of("springboot", "mysql", "flink","kafka").collect(Collectors.joining(",", "[", "]"));

3.4.4. 分组

  • groupingBy(Function<T, K> f):返回 Map<K, List>,根据属性分组,分组属性为 K,分组结果为 V
  • partitioningBy(Predicate<T> p):返回 Map<Boolean, List>,根据 true 和 false 分组
    1
    2
    3
    4
    Map<String, List<Employee>> map1 = list.stream().collect(Collectors.groupingBy(Employee::getGender));
    Map<Boolean, List<Employee>> map2 = list.stream().collect(Collectors.partitioningBy(Employee::getManage));
    // 分组求和
    Map<String, Long> map3 = list.stream().collect(Collectors.groupingBy(Employee::getGender, Collectors.counting()));

3.4.5. 其他

  • collectingAndThen(Collector<T, A, R> downstream, Function<R, RR> finisher)
    它可接受两个参数,第一个参数用于 map 操作且其为 Collector 子类,所以 Collectors 类中的大多数方法都能使用,而第二参数用于 reduce 操作,是一个 Function 函数。经过第一个参数处理,收集结果集,再给第二个参数处理。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    // 例 : 将数组中的数字,先乘2再求平均值,再求平均值的平方
    List<Integer> list = Arrays.asList(2, 4, 6, 8);
    Double result = list.stream().collect(
    Collectors.collectingAndThen(
    Collectors.averagingLong(e -> {
    return e * 2;
    }),
    avg -> {
    return avg * avg;
    })
    );
    System.out.println(result); // 100
  • reducing(BinaryOperator<T> op)
    参数是二目运算函数参数,取集合中前两个元素 a,b 进行计算,结果赋给 a 元素,再将第三个元素赋给 b 元素,再进行计算,直到遍历完所有元素,返回最终结果
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    // 例 : 找出每个组员工中最短的名字
    List<Team> teamList = List.of(
    (new Team("Dev","Tom")),
    (new Team("HR","Amy")),
    (new Team("HR","Jack")),
    (new Team("Dev","Andrew"))
    );

    Map<String, Optional<Team>> map = teamList.stream().collect(
    Collectors.groupingBy(
    e -> e.getJob(),
    Collectors.reducing(
    (a,b) -> a.getName().length() < b.getName().length() ? a : b
    )
    )
    );
    map.forEach((a,b) -> System.out.println(a + " : " + b.get().getName()));
    // Dev : Tom
    // HR : Amy
  • mapping(Function<? super T, ? extends U> mapper, Collector<? super U, A, R> downstream)
    一般用于多重 MapReduce 中,第一个参数进行先处理,结果再传给第二个参数进行处理
    1
    2
    3
    4
    5
    6
    7
    8
    // 例 : 将字符串全部大写后,拼接
    List<String> list = Arrays.asList("hello","world","java");
    String name = list.stream().collect(Collectors.mapping(s -> s.toUpperCase(), Collectors.joining(",", "[", "]")));
    System.out.println(name);
    name = list.stream().map(s -> s.toUpperCase()).collect(Collectors.joining(",", "[", "]"));
    System.out.println(name);
    // [HELLO,WORLD,JAVA]
    // [HELLO,WORLD,JAVA]
    可见,其与先 map 再 collect 结果一致