13、第十四章流式编程

流的一个核心好处是,它使得程序更加短小并且更易理解。当 Lambda 表达式和方法引用(method references)和流一起使用的时候会让人感觉自成一体。流使得 Java 8 更具吸引力。

流式编程采用内部迭代

流是懒加载的。

流支持

Java 8 采用的解决方案是:在接口中添加被 default(默认)修饰的方法。通过这种方案,设计者们可以将流式(stream)方法平滑地嵌入到现有类中。流方法预置的操作几乎已满足了我们平常所有的需求。流操作的类型有三种:创建流修改流元素(中间操作, Intermediate Operations),消费流元素(终端操作, Terminal Operations)。最后一种类型通常意味着收集流元素(通常是到集合中)。

流创建

可以通过 Stream.of() 很容易地将一组元素转化成为流。

Stream.of(new Bubble(1), new Bubble(2), new Bubble(3)).forEach(System.out::println);
Stream.of("It's ", "a ", "wonderful ", "day ", "for ", "pie!").forEach(System.out::print);
Stream.of(3.14159, 2.718, 1.618).forEach(System.out::println);

每个集合都可以通过调用 stream() 方法来产生一个流。

Set<String> w = new HashSet<>(Arrays.asList("It's a wonderful day for pie!".split(" ")));
w.stream().map(x -> x + " ").forEach(System.out::print);

随机数流

public class RandomGenerators {
	public static <T> void show(Stream<T> stream) {
		stream.limit(4).forEach(System.out::println);
		System.out.println("++++++++");
	}

	public static void main(String[] args) {
		Random rand = new Random(47);
		show(rand.ints().boxed());
		show(rand.longs().boxed());
		show(rand.doubles().boxed());
		// Control the lower and upper bounds:
		show(rand.ints(10, 20).boxed());
		show(rand.longs(50, 100).boxed());
		show(rand.doubles(20, 30).boxed());
		// Control the stream size:
		show(rand.ints(2).boxed());
		show(rand.longs(2).boxed());
		show(rand.doubles(2).boxed());
		// Control the stream size and bounds:
		show(rand.ints(3, 3, 9).boxed());
		show(rand.longs(3, 12, 22).boxed());
		show(rand.doubles(3, 11.5, 12.3).boxed());
	}
}

int 类型的范围

IntStream.range():

System.out.println(IntStream.range(10, 20).sum());

public static void repeat(int n, Runnable action) {
	IntStream.range(0, n).forEach(i -> action.run());
}

generate()

如果要创建包含相同对象的流,只需要传递一个生成那些对象 lambda 到generate() 中。

public class Generator implements Supplier<String> {
	Random rand = new Random(47);
	char[] letters = "ABCDEFGHIJKLMNOPQRSTUVWXYZ".toCharArray();

	@Override
	public String get() {
		return "" + letters[rand.nextInt(letters.length)];
	}

	public static void main(String[] args) {
		String word = Stream.generate(new Generator()).limit(30).collect(Collectors.joining());
		System.out.println(word);
	}
}

Stream.generate(() -> "duplicate").limit(3).forEach(System.out::println);

iterate()

Stream.iterate() 以种子(第一个参数)开头,并将其传给方法(第二个参数)。方法的结果将添加到流,并存储作为第一个参数用于下次调用 iterate(),依次类推。

public class Fibonacci {
	int x = 1;

	Stream<Integer> numbers() {
		return Stream.iterate(0, i -> {
			int result = x + i;
			x = i;
			return result;
		});
	}

	public static void main(String[] args) {
		new Fibonacci().numbers().skip(20) // Don't use the first 20
				.limit(10) // Then take 10 of them
				.forEach(System.out::println);
	}
}

流的建造者模式

public class FileToWordsBuilder {
	Stream.Builder<String> builder = Stream.builder();

	public FileToWordsBuilder(String filePath) throws Exception {
		Files.lines(Paths.get(filePath)).skip(1) // Skip the comment line at the beginning
				.forEach(line -> {
					for (String w : line.split("[ .?,]+"))
						builder.add(w);
				});
	}

	Stream<String> stream() {
		return builder.build();
	}

	public static void main(String[] args) throws Exception {
		new FileToWordsBuilder("Cheese.dat").stream().limit(7).map(w -> w + " ").forEach(System.out::print);
	}
}

Arrays

Arrays 类中含有一个名为 stream() 的静态方法用于把数组转换成为流。

public class Machine2 {
	public static void main(String[] args) {
		Arrays.stream(new Operations[] { () -> Operations.show("Bing"), () -> Operations.show("Crack"),
				() -> Operations.show("Twist"), () -> Operations.show("Pop") }).forEach(Operations::execute);
	}
}

public class ArrayStreams {
	public static void main(String[] args) {
		Arrays.stream(new double[] { 3.14159, 2.718, 1.618 }).forEach(n -> System.out.format("%f ", n));
		System.out.println();
		Arrays.stream(new int[] { 1, 3, 5 }).forEach(n -> System.out.format("%d ", n));
		System.out.println();
		Arrays.stream(new long[] { 11, 22, 44, 66 }).forEach(n -> System.out.format("%d ", n));
		System.out.println();
		// Select a subrange:
		Arrays.stream(new int[] { 1, 3, 5, 7, 15, 28, 37 }, 3, 6).forEach(n -> System.out.format("%d ", n));
	}
}

正则表达式

Java 8 在 java.util.regex.Pattern 中增加了一个新的方法 splitAsStream()。这个方法可以根据传入的公式将字符序列转化为流。但是有一个限制,输入只能是 CharSequence,因此不能将流作为 splitAsStream() 的参数。

public class FileToWordsRegexp {
	private String all;

	public FileToWordsRegexp(String filePath) throws Exception {
		all = Files.lines(Paths.get(filePath)).skip(1) // First (comment) line
				.collect(Collectors.joining(" "));
	}

	public Stream<String> stream() {
		return Pattern.compile("[ .,?]+").splitAsStream(all);
	}

	public static void main(String[] args) throws Exception {
		FileToWordsRegexp fw = new FileToWordsRegexp("Cheese.dat");
		fw.stream().limit(7).map(w -> w + " ").forEach(System.out::print);
		fw.stream().skip(7).limit(2).map(w -> w + " ").forEach(System.out::print);
	}
}

中间操作

中间操作用于从一个流中获取对象,并将对象作为另一个流从后端输出,以连接到其他操作。

跟踪和调试

peek() 操作的目的是帮助调试。它允许你无修改地查看流中的元素。

class Peeking {
	public static void main(String[] args) throws Exception {
		FileToWords.stream("Cheese.dat").skip(21).limit(4).map(w -> w + " ").peek(System.out::print)
				.map(String::toUpperCase).peek(System.out::print).map(String::toLowerCase).forEach(System.out::print);
	}
}

流元素排序

public class SortedComparator {
	public static void main(String[] args) throws Exception {
		FileToWords.stream("Cheese.dat").skip(10).limit(10).sorted(Comparator.reverseOrder()).map(w -> w + " ")
				.forEach(System.out::print);
	}
}

sorted() 预设了一些默认的比较器。这里我们使用的是反转“自然排序”。当然你也可以把 Lambda 函数作为参数传递给 sorted()

移除元素

  • distinct():在 Randoms.java 类中的 distinct() 可用于消除流中的重复元素。相比创建一个 Set 集合,该方法的工作量要少得多。
  • filter(Predicate):过滤操作会保留与传递进去的过滤器函数计算结果为 true 元素。
public class Prime {
	public static boolean isPrime(long n) {
		return rangeClosed(2, (long) Math.sqrt(n)).noneMatch(i -> n % i == 0);
	}

	public LongStream numbers() {
		return iterate(2, i -> i + 1).filter(Prime::isPrime);
	}

	public static void main(String[] args) {
		new Prime().numbers().limit(10).forEach(n -> System.out.format("%d ", n));
		System.out.println();
		new Prime().numbers().skip(90).limit(10).forEach(n -> System.out.format("%d ", n));
	}
}

应用函数到元素

  • map(Function):将函数操作应用在输入流的元素中,并将返回值传递到输出流中。
  • mapToInt(ToIntFunction):操作同上,但结果是 IntStream。
  • mapToLong(ToLongFunction):操作同上,但结果是 LongStream。
  • mapToDouble(ToDoubleFunction):操作同上,但结果是 DoubleStream。

使用map() 映射多种函数到一个字符串流中:

class FunctionMap {
	static String[] elements = { "12", "", "23", "45" };

	static Stream<String> testStream() {
		return Arrays.stream(elements);
	}

	static void test(String descr, Function<String, String> func) {
		System.out.println(" ---( " + descr + " )---");
		testStream().map(func).forEach(System.out::println);
	}

	public static void main(String[] args) {

		test("add brackets", s -> "[" + s + "]");

		test("Increment", s -> {
			try {
				return Integer.parseInt(s) + 1 + "";
			} catch (NumberFormatException e) {
				return s;
			}
		});

		test("Replace", s -> s.replace("2", "9"));

		test("Take last digit", s -> s.length() > 0 ? s.charAt(s.length() - 1) + "" : s);
	}
}

map() 将一个字符串映射为另一个字符串,但是我们完全可以产生和接收类型完全不同的类型,从而改变流的数据类型。

class Numbered {
	final int n;

	Numbered(int n) {
		this.n = n;
	}

	@Override
	public String toString() {
		return "Numbered(" + n + ")";
	}
}

class FunctionMap2 {
	public static void main(String[] args) {
		Stream.of(1, 5, 7, 9, 11, 13).map(Numbered::new).forEach(System.out::println);
	}
}

如果使用 Function 返回的结果是数值类型的一种,我们必须使用合适的 mapTo数值类型 进行替代。

class FunctionMap3 {
	public static void main(String[] args) {
		Stream.of("5", "7", "9").mapToInt(Integer::parseInt).forEach(n -> System.out.format("%d ", n));
		System.out.println();
		Stream.of("17", "19", "23").mapToLong(Long::parseLong).forEach(n -> System.out.format("%d ", n));
		System.out.println();
		Stream.of("17", "1.9", ".23").mapToDouble(Double::parseDouble).forEach(n -> System.out.format("%f ", n));
	}
}

在 map() 中组合流

  • flatMap():获取流产生( stream-producing)函数,并将其应用于新到的元素(如 map() 所做的),然后获取每一个流并将其“扁平”为元素。所以它的输出只是元素。
  • flatMap(Function):当 Function 产生流时使用。
  • flatMapToInt(Function):当 Function 产生 IntStream 时使用。
  • flatMapToLong(Function):当 Function 产生 LongStream 时使用。
  • flatMapToDouble(Function):当 Function 产生 DoubleStream 时使用。
public class StreamOfRandoms {
	static Random rand = new Random(47);

	public static void main(String[] args) {
		Stream.of(1, 2, 3, 4, 5).flatMapToInt(i -> IntStream.concat(rand.ints(0, 100).limit(i), IntStream.of(-1)))
				.forEach(n -> System.out.format("%d ", n));
	}
}

Optional类

Optional 可以实现这样的功能。
可作为流元素的持有者,即使查看的元素不存在也能友好的提示我们(也就是说,没有异常)。
首先确保准流操作返回 Optional 对象,因为它们并不能保证预期结果一定存在。它们包括:

  • findFirst() 返回一个包含第一个元素的 Optional 对象,如果流为空则返回 Optional.empty
  • findAny() 返回包含任意元素的 Optional 对象,如果流为空则返回 Optional.empty
  • max() 和 min() 返回一个包含最大值或者最小值的 Optional 对象,如果流为空则返回 Optional.empty
  • reduce() 不再以 identity 形式开头,而是将其返回值包装在 Optional 中。(identity 对象成为其他形式的 reduce() 的默认结果,因此不存在空结果的风险)
  • 对于数字流 IntStream、LongStream 和 DoubleStream,average() 会将结果包装在 Optional 以防止流为空。
class OptionalsFromEmptyStreams {
	public static void main(String[] args) {
		System.out.println(Stream.<String>empty().findFirst());
		System.out.println(Stream.<String>empty().findAny());
		System.out.println(Stream.<String>empty().max(String.CASE_INSENSITIVE_ORDER));
		System.out.println(Stream.<String>empty().min(String.CASE_INSENSITIVE_ORDER));
		System.out.println(Stream.<String>empty().reduce((s1, s2) -> s1 + s2));
		System.out.println(IntStream.empty().average());
	}
}

当你接收到 Optional 对象时,应首先调用 isPresent() 检查其中是否包含元素。如果存在,可使用 get() 获取。

class OptionalBasics {
	static void test(Optional<String> optString) {
		if (optString.isPresent())
			System.out.println(optString.get());
		else
			System.out.println("Nothing inside!");
	}

	public static void main(String[] args) {
		test(Stream.of("Epithets").findFirst());
		test(Stream.<String>empty().findFirst());
	}
}

便利函数

有许多便利函数可以解包 Optional ,这简化了上述“对所包含的对象的检查和执行操作”的过程:

  • ifPresent(Consumer):当值存在时调用 Consumer,否则什么也不做。
  • orElse(otherObject):如果值存在则直接返回,否则生成 otherObject。
  • orElseGet(Supplier):如果值存在直接生成对象,否则使用 Supplier 函数生成一个可替代对象。
  • orElseThrow(Supplier):如果值存在直接生成对象,否则使用 Supplier 函数生成一个异常。
public class Optionals {
	static void basics(Optional<String> optString) {
		if (optString.isPresent())
			System.out.println(optString.get());
		else
			System.out.println("Nothing inside!");
	}

	static void ifPresent(Optional<String> optString) {
		optString.ifPresent(System.out::println);
	}

	static void orElse(Optional<String> optString) {
		System.out.println(optString.orElse("Nada"));
	}

	static void orElseGet(Optional<String> optString) {
		System.out.println(optString.orElseGet(() -> "Generated"));
	}

	static void orElseThrow(Optional<String> optString) {
		try {
			System.out.println(optString.orElseThrow(() -> new Exception("Supplied")));
		} catch (Exception e) {
			System.out.println("Caught " + e);
		}
	}

	static void test(String testName, Consumer<Optional<String>> cos) {
		System.out.println(" === " + testName + " === ");
		cos.accept(Stream.of("Epithets").findFirst());
		cos.accept(Stream.<String>empty().findFirst());
	}

	public static void main(String[] args) {
		test("basics", Optionals::basics);
		test("ifPresent", Optionals::ifPresent);
		test("orElse", Optionals::orElse);
		test("orElseGet", Optionals::orElseGet);
		test("orElseThrow", Optionals::orElseThrow);
	}
}

创建 Optional

当我们在自己的代码中加入 Optional 时,可以使用下面 3 个静态方法:

  • empty():生成一个空 Optional。
  • of(value):将一个非空值包装到 Optional 里。
  • ofNullable(value):针对一个可能为空的值,为空时自动生成 Optional.empty,否则将值包装在 Optional 中。
class CreatingOptionals {
	static void test(String testName, Optional<String> opt) {
		System.out.println(" === " + testName + " === ");
		System.out.println(opt.orElse("Null"));
	}

	public static void main(String[] args) {
		test("empty", Optional.empty());
		test("of", Optional.of("Howdy"));
		try {
			test("of", Optional.of(null));
		} catch (Exception e) {
			System.out.println(e);
		}
		test("ofNullable", Optional.ofNullable("Hi"));
		test("ofNullable", Optional.ofNullable(null));
	}
}

Optional 对象操作

当我们的流管道生成了 Optional 对象,下面 3 个方法可使得 Optional 的后续能做更多的操作:

  • filter(Predicate):将 Predicate 应用于 Optional 中的内容并返回结果。当 Optional 不满足 Predicate 时返回空。如果 Optional 为空,则直接返回。
  • map(Function):如果 Optional 不为空,应用 Function 于 Optional 中的内容,并返回结果。否则直接返回 Optional.empty。
  • flatMap(Function):同 map(),但是提供的映射函数将结果包装在 Optional 对象中,因此 flatMap() 不会在最后进行任何包装。

一般来说,流的 filter() 会在 Predicate 返回 false 时删除流元素。而 Optional.filter() 在失败时不会删除 Optional,而是将其保留下来,并转化为空。

map() 一样 , Optional.map() 应用于函数。它仅在 Optional 不为空时才应用映射函数,并将 Optional 的内容提取到映射函数。

Optional 的 flatMap() 应用于已生成 Optional 的映射函数,所以 flatMap() 不会像 map() 那样将结果封装在 Optional 中。

Optional 流

public class Signal {
	private final String msg;

	public Signal(String msg) {
		this.msg = msg;
	}

	public String getMsg() {
		return msg;
	}

	@Override
	public String toString() {
		return "Signal(" + msg + ")";
	}

	static Random rand = new Random(47);

	public static Signal morse() {
		switch (rand.nextInt(4)) {
		case 1:
			return new Signal("dot");
		case 2:
			return new Signal("dash");
		default:
			return null;
		}
	}

	public static Stream<Optional<Signal>> stream() {
		return Stream.generate(Signal::morse).map(signal -> Optional.ofNullable(signal));
	}
}

public class StreamOfOptionals {
	public static void main(String[] args) {
		Signal.stream().limit(10).forEach(System.out::println);
		System.out.println(" ---");
		Signal.stream().limit(10).filter(Optional::isPresent).map(Optional::get).forEach(System.out::println);
	}
}

终端操作

这些操作获取一个流并产生一个最终结果;它们不会像后端流提供任何东西。因此,终端操作总是你在管道中做的最后一件事情。

转化成数组(Convert to an Array)

  • toArray():将流转换成适当类型的数组。
  • toArray(generator):在特殊情况下,生成器用于分配你自己的数组存储。
public class RandInts {
	private static int[] rints = new Random(47).ints(0, 1000).limit(100).toArray();

	public static IntStream rands() {
		return Arrays.stream(rints);
	}
}

对每个元素应用最终操作(Apply a Final Operation to Every Element)

  • forEach(Consumer):你已经看到很多次 System.out::println 作为 Consumer 函数。
  • forEachOrdered(Consumer): 这个形式保证了 forEach 的操作顺序是原始流顺序。
public class ForEach {
	static final int SZ = 1000;

	public static void main(String[] args) {
		rands().limit(SZ).forEach(n -> System.out.format("%d ", n));
		System.out.println();
		rands().limit(SZ).parallel().forEach(n -> System.out.format("%d ", n));
		System.out.println();
		rands().limit(SZ).parallel().forEachOrdered(n -> System.out.format("%d ", n));
		rands().limit(SZ).parallel().forEachOrdered(n -> {
			System.out.println(Thread.currentThread().getName() + " -=- " + n);
		});
	}
}

收集(Collecting)

  • collect(Collector):使用 Collector 来累计流元素到结果集合中。
  • collect(Supplier, BiConsumer, BiConsumer):同上,但是 Supplier 创建了一个新的结果集合,第一个 BiConsumer 是将下一个元素包含在结果中的函数,而第二个 BiConsumer 是用于将两个值组合起来。

组合所有流元素(Combining All Stream Elements)

  • reduce(BinaryOperator):使用 BinaryOperator 来组合所有流中的元素。因为流可能为空,其返回值为 Optional。
  • reduce(identity, BinaryOperator):功能同上,但是使用 identity 作为其组合的初始值。因此如果流为空,identity 就是结果。
  • reduce(identity, BiFunction, BinaryOperator):这个形式更为复杂(所以我们不会介绍它),在这里被提到是因为它使用起来会更有效。通常,你可以显示的组合 map() 和 reduce() 来更简单的表达这一点。

匹配(Matching)

  • allMatch(Predicate) :如果流的每个元素根据提供的 Predicate 都返回 true 时,结果返回为 true。这个操作将会在第一个 false 之后短路;也就是不会在发生 false 之后继续执行计算。
  • anyMatch(Predicate):如果流中的一个元素根据提供的 Predicate 返回 true 时,结果返回为 true。这个操作将会在第一个 true 之后短路;也就是不会在发生 true 之后继续执行计算。
  • noneMatch(Predicate):如果流的每个元素根据提供的 Predicate 都返回 false 时,结果返回为 true。这个操作将会在第一个 true 之后短路;也就是不会在发生 true 之后继续执行计算。
interface Matcher extends BiPredicate<Stream<Integer>, Predicate<Integer>> {
}

public class Matching {
	static void show(Matcher match, int val) {
		System.out.println(
				match.test(IntStream.rangeClosed(1, 9).boxed().peek(n -> System.out.format("%d ", n)), n -> n < val));
	}

	public static void main(String[] args) {
		show(Stream::allMatch, 10);
		show(Stream::allMatch, 4);
		show(Stream::anyMatch, 2);
		show(Stream::anyMatch, 0);
		show(Stream::noneMatch, 5);
		show(Stream::noneMatch, 0);
	}
}

选择元素

  • findFirst():返回一个含有第一个流元素的 Optional,如果流为空返回 Optional.empty。
  • findAny():返回含有任意流元素的 Optional,如果流为空返回 Optional.empty。

信息(Informational)

  • count():流中的元素个数。
  • max(Comparator):根据所传入的 Comparator 所决定的“最大”元素。
  • min(Comparator):根据所传入的 Comparator 所决定的“最小”元素。
public class Informational {
	public static void main(String[] args) throws Exception {
		System.out.println(FileToWords.stream("Cheese.dat").count());
		System.out.println(FileToWords.stream("Cheese.dat").min(String.CASE_INSENSITIVE_ORDER).orElse("NONE"));
		System.out.println(FileToWords.stream("Cheese.dat").max(String.CASE_INSENSITIVE_ORDER).orElse("NONE"));
	}
}

数字流信息(Information for Numeric Streams)

  • average() :求取流元素平均值。
  • max() 和 min():因为这些操作在数字流上面,所以不需要 Comparator。
  • sum():对所有流元素进行求和。
  • summaryStatistics():生成可能有用的数据。目前还不太清楚他们为什么觉得有必要这样做,但是你可以直接使用方法产生所有的数据。
public class NumericStreamInfo {
	public static void main(String[] args) {
		System.out.println(rands().average().getAsDouble());
		System.out.println(rands().max().getAsInt());
		System.out.println(rands().min().getAsInt());
		System.out.println(rands().sum());
		System.out.println(rands().summaryStatistics());
	}
}