Java Stream API 是 Java 8 引入的最重要的特性之一。它提供了一种声明式的数据处理方式,让你可以用简洁优雅的代码完成复杂的数据集合操作。本文将从 Stream 的创建开始,逐步深入到各种中间操作、终端操作、并行流以及自定义收集器,帮助你真正掌握 Stream API 的精髓。

1. Stream 的创建方式

在使用 Stream 之前,首先要创建 Stream 对象。Java 提供了多种创建 Stream 的方式,覆盖了日常开发中的大部分场景。

1.1 从集合创建

这是最常用的方式。Java 8 为 Collection 接口新增了 stream()parallelStream() 方法,可以直接将集合转为流:

List<String> list = Arrays.asList("a", "b", "c");
Stream<String> stream = list.stream();                // 串行流
Stream<String> parallel = list.parallelStream();       // 并行流

1.2 从数组创建

使用 Arrays.stream() 可以将数组转为流,支持指定范围:

String[] array = {"a", "b", "c", "d", "e"};
Stream<String> fullStream = Arrays.stream(array);           // 全部元素
Stream<String> rangeStream = Arrays.stream(array, 1, 4);    // 索引 1 到 3: "b", "c", "d"

int[] numbers = {1, 2, 3, 4, 5};
IntStream intStream = Arrays.stream(numbers);                // 基本类型数组
// 基本类型流有 IntStream、LongStream、DoubleStream,避免装箱开销

1.3 使用 Stream.of 和 Stream.iterate

Stream.of() 直接传入元素创建流,Stream.iterate() 可以生成无限的序列流:

// 静态工厂方法
Stream<String> ofStream = Stream.of("a", "b", "c");
Stream<Integer> empty = Stream.empty();                     // 空流

// Stream.iterate 生成无限流
Stream<Integer> fibonacci = Stream.iterate(
    new int[]{0, 1},
    f -> new int[]{f[1], f[0] + f[1]}
).map(f -> f[0]);
fibonacci.limit(10).forEach(n -> System.out.print(n + " ")); // 0 1 1 2 3 5 8 13 21 34

// Stream.generate 生成常量流
Stream<Double> randomStream = Stream.generate(Math::random).limit(5);

提示:对于基本类型(int、long、double),优先使用 IntStream、LongStream、DoubleStream 避免自动装箱带来的性能开销。

2. 中间操作详解

中间操作返回一个新的 Stream,可以链式调用多个中间操作。中间操作是惰性的,只有在遇到终端操作时才会实际执行。

2.1 filter - 筛选

根据 Predicate 条件过滤元素,保留满足条件的元素:

List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8);

List<Integer> evens = numbers.stream()
    .filter(n -> n % 2 == 0)
    .collect(Collectors.toList());
// 结果: [2, 4, 6, 8]

// 复杂条件过滤
List<String> names = Arrays.asList("Alice", "Bob", "Charlie", "David", "Eve");
List<String> filtered = names.stream()
    .filter(name -> name.startsWith("A") || name.length() > 4)
    .collect(Collectors.toList());
// 结果: ["Alice", "Charlie", "David"]

2.2 map - 映射

将每个元素通过 Function 转换为另一个元素,是一对一的转换:

List<String> words = Arrays.asList("hello", "world", "java");
List<String> upper = words.stream()
    .map(String::toUpperCase)
    .collect(Collectors.toList());
// 结果: ["HELLO", "WORLD", "JAVA"]

// 提取对象属性
List<User> users = getUserList();
List<String> names = users.stream()
    .map(User::getName)
    .collect(Collectors.toList());

2.3 flatMap - 扁平化映射

将每个元素映射为一个 Stream,然后将所有 Stream 合并为一个。适合处理嵌套集合:

List<List<Integer>> nested = Arrays.asList(
    Arrays.asList(1, 2),
    Arrays.asList(3, 4, 5),
    Arrays.asList(6, 7, 8, 9)
);

List<Integer> flattened = nested.stream()
    .flatMap(Collection::stream)
    .collect(Collectors.toList());
// 结果: [1, 2, 3, 4, 5, 6, 7, 8, 9]

// 按单词拆分为字母
List<String> words = Arrays.asList("hello", "world");
List<String> letters = words.stream()
    .flatMap(w -> Arrays.stream(w.split("")))
    .distinct()
    .collect(Collectors.toList());
// 结果: ["h", "e", "l", "o", "w", "r", "d"]

2.4 其他常用中间操作

  • distinct():去重,基于元素的 equals 方法
  • sorted():排序,可传入自定义 Comparator
  • peek():偷看每个元素,主要用于调试
  • limit(n):截取前 n 个元素
  • skip(n):跳过前 n 个元素
List<Integer> numbers = Arrays.asList(3, 1, 4, 1, 5, 9, 2, 6, 5, 3);

List<Integer> result = numbers.stream()
    .distinct()                         // 去重: [3, 1, 4, 5, 9, 2, 6]
    .sorted()                           // 排序: [1, 2, 3, 4, 5, 6, 9]
    .skip(2)                            // 跳过前2个: [3, 4, 5, 6, 9]
    .limit(3)                           // 取前3个: [3, 4, 5]
    .peek(System.out::println)          // 调试打印
    .collect(Collectors.toList());
// 结果: [3, 4, 5]

3. 终端操作详解

终端操作会触发 Stream 的实际执行,执行完毕后 Stream 被消费,不可再使用。终端操作返回一个结果或产生副作用。

3.1 collect - 收集结果

collect 是最灵活的终端操作,通过 Collectors 工具类可以收集到各种容器中:

List<String> items = Arrays.asList("apple", "banana", "apple", "orange", "banana", "apple");

// 收集到 List
List<String> list = items.stream().collect(Collectors.toList());

// 收集到 Set(自动去重)
Set<String> set = items.stream().collect(Collectors.toSet());

// 收集到 Map(分组计数)
Map<String, Long> countMap = items.stream()
    .collect(Collectors.groupingBy(
        Function.identity(),
        Collectors.counting()
    ));
// 结果: {apple=3, banana=2, orange=1}

// 收集到不可变集合
List<String> unmodifiable = items.stream()
    .collect(Collectors.toUnmodifiableList());

// 拼接字符串
String joined = items.stream()
    .distinct()
    .collect(Collectors.joining(", "));
// 结果: "apple, banana, orange"

3.2 forEach - 遍历

对每个元素执行 Consumer 操作,常用于打印或副作用操作:

IntStream.rangeClosed(1, 5)
    .forEach(i -> System.out.print(i + " "));
// 输出: 1 2 3 4 5

// 注意:forEach 不保证顺序(并行流下),需要顺序请用 forEachOrdered
IntStream.rangeClosed(1, 5)
    .parallel()
    .forEachOrdered(i -> System.out.print(i + " "));
// 输出: 1 2 3 4 5(保持顺序)

3.3 reduce - 归约

将 Stream 元素反复组合,归约为一个值。适合求和、求积、求最大值等场景:

List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5);

// 求和
int sum = numbers.stream().reduce(0, Integer::sum);
// 结果: 15

// 求最大值
int max = numbers.stream().reduce(Integer.MIN_VALUE, Integer::max);
// 结果: 5

// 无初始值的 reduce(返回 Optional)
Optional<Integer> optionalSum = numbers.stream().reduce(Integer::sum);

// 复杂归约:拼接字符串
List<String> words = Arrays.asList("Hello", "Stream", "API");
String sentence = words.stream()
    .reduce("", (a, b) -> a + " " + b)
    .trim();
// 结果: "Hello Stream API"

3.4 count - 计数

统计 Stream 中元素的数量,是最简单的终端操作:

long total = IntStream.rangeClosed(1, 100)
    .filter(n -> n % 7 == 0)
    .count();
// 结果: 14(1到100中7的倍数有14个)

4. 中间操作 vs 终端操作

理解中间操作和终端操作的区别是掌握 Stream API 的关键。下面是两者的详细对比:

对比维度 中间操作 终端操作
执行时机 惰性(lazy),不触发实际计算 即刻(eager),触发整个流水线执行
返回值 返回一个新的 Stream 返回一个结果或 void
链式调用 可以无限链式调用 只能作为最后一个操作
Stream 状态 Stream 仍可用 Stream 被消费,不可再用
常见例子 filter, map, flatMap, distinct, sorted, peek, limit, skip collect, forEach, reduce, count, anyMatch, findFirst, min, max
短路支持 有限支持(limit, skip) 支持(anyMatch, findFirst, findAny)

关键理解:中间操作构建的是数据处理流水线,终端操作才是启动流水线的开关。没有终端操作,中间操作永远不会执行。这种设计让 Stream 可以高效地进行短路优化和并行处理。

5. 并行流

并行流利用 Fork/Join 框架将任务拆分为多个子任务并行处理,可以充分利用多核 CPU 的计算能力。只需调用 parallelStream()stream().parallel() 即可启用。

但是,并行流并非银弹。使用不当反而会导致性能下降甚至结果错误:

// 并行流示例:计算 1 到 1_000_000 的和
long sum = LongStream.rangeClosed(1, 1_000_000)
    .parallel()
    .sum();
// 结果: 500000500000

// 并行流注意事项
List<Integer> numbers = new ArrayList<>();
for (int i = 0; i < 1000; i++) {
    numbers.add(i);
}

// ❌ 错误:共享可变状态
int[] wrongSum = {0};
numbers.parallelStream()
    .forEach(n -> wrongSum[0] += n);  // 线程不安全,结果不可预测

// ✅ 正确:使用 reduce 或 collect
int correctSum = numbers.parallelStream()
    .reduce(0, Integer::sum);

// ✅ 正确:使用线程安全的收集方式
List<Integer> result = numbers.parallelStream()
    .filter(n -> n % 2 == 0)
    .collect(Collectors.toList());

使用并行流时,记住几个原则:

  • 数据量足够大时才有并行收益(通常 10_000 以上)
  • 避免共享可变状态,不要使用 forEach 修改外部变量
  • 使用线程安全的收集器(如 collect(Collectors.toList())
  • 使用 FindFirst/Limit 的并行流会额外开销,因为要保持顺序
  • 如果不在意顺序,使用 findAny()unordered() 可以提升性能

6. 自定义收集器

Collectors 工具类提供的方法不能满足需求时,可以实现 Collector 接口来自定义收集器。一个收集器包含四个核心方法:

  • supplier():创建一个可变的结果容器
  • accumulator():将元素添加到结果容器
  • combiner():合并两个结果容器(并行流使用)
  • finisher():对结果容器做最终转换
import java.util.*;
import java.util.function.*;
import java.util.stream.*;

// 自定义收集器:将数字收集为统计对象(总和、数量、平均值)
public class NumberStats {
    private long count;
    private long sum;

    public NumberStats() {}

    public void accept(int value) {
        count++;
        sum += value;
    }

    public void combine(NumberStats other) {
        this.count += other.count;
        this.sum += other.sum;
    }

    public double average() {
        return count == 0 ? 0.0 : (double) sum / count;
    }

    @Override
    public String toString() {
        return String.format("count=%d, sum=%d, average=%.2f", count, sum, average());
    }
}

// 使用自定义收集器
List<Integer> numbers = Arrays.asList(10, 20, 30, 40, 50);
NumberStats stats = numbers.stream()
    .collect(
        NumberStats::new,           // supplier
        NumberStats::accept,        // accumulator
        NumberStats::combine        // combiner
    );

System.out.println(stats);
// 输出: count=5, sum=150, average=30.00

如果希望实现更复杂的收集器(如带特性标记的),可以实现 Collector 接口:

// 实现 Collector 接口的自定义收集器
Collector<Integer, NumberStats, NumberStats> statsCollector =
    Collector.of(
        NumberStats::new,
        NumberStats::accept,
        NumberStats::combine,
        Collector.Characteristics.IDENTITY_FINISH   // finisher 是恒等操作
    );

NumberStats stats2 = numbers.stream().collect(statsCollector);
System.out.println(stats2);

// Collectors 的常见内置收集器
// - toList(), toSet(), toMap()
// - groupingBy(), partitioningBy()
// - joining(), counting(), summarizingInt()
// - mapping(), flatMapping(), filtering()  // Java 9+

掌握 Stream API 不仅仅是记住几个方法,更重要的是理解它的设计思想:声明式编程、惰性求值、流水线操作和并行友好。在日常开发中,建议先用 Stream 简化集合操作代码,再逐步尝试编写自定义收集器和并行流优化。当你习惯了 Stream 的编程模式后,你会发现代码变得前所未有的简洁和优雅。