原创

Java JDK9 Flow API特性详解

简介

响应式编程, 这是一种数据消费者控制数据流的编程方式。需要指出是,当消费速度低于生产速度时,消费者要求生产者降低速度以完全消费数据,这个现象称作背压(back pressure)。这种处理方式不是在制造混乱,你可能已经使用过这种模式,只是最近因为在主要框架和平台上使用才变得更流行,比如Java9,Spring5。另外在分布式系统中处理大规模数据传输时也使用到了这种模式。

回顾过去可以帮我们更好的理解这种模式。几年前,最常见的消费数据模式是pull-based。client端不断轮询服务端以获取数据。这种模式的优点是当client端资源有限时可以更好的控制数据流(停止轮询),而缺点是当服务端没有数据时轮询是对计算资源和网络资源的浪费。

随着时间推移,处理数据的模式转变为push-based,生产者不关心消费者的消费能力,直接推送数据。这种模式的缺点是当消费资源低于生产资源时会造成缓冲区溢出从而数据丢失,当丢失率维持在较小的数值时还可以接受,但是当这个比率变大时我们会希望生产者降速以避免大规模数据丢失。

响应式编程是一种pull-push混合模式以综合他们的优点,这种模式下消费者负责请求数据以控制生产者数据流,同时当处理资源不足时也可以选择阻断或者丢弃数据(消费者内部缓冲区,缓冲区满之后阻塞发布者submit方法,Flow默认缓冲区大小256),接下来我们会看到一个典型案例。

Flow与Stream

Java8中引入的StreamAPI通过map,reduce以及其他操作可以完美的处理数据集,而FlowAPI则专注于处理数据的流通,比如对数据的请求,减速,丢弃,阻塞等。同时你可以使用Streams作为数据源(publisher),当必要时阻塞丢弃其中的数据。你也可以在Subscriber中使用Streams以进行数据的归并操作。更值得一提的时reactive streams不仅兼容传统编程方式,而且还支持函数式编程以极大的提高可读性和可维护性。

如果你需要在两个系统间传输数据,同时进行转形操作,如何使用Flows和Streams来完成?这种情况下,我们使用Java8的Function来做数据转换,但是如何在Publisher和Subscriber之间使用StreamAPI呢?答案是我们可以在Publisher和Subscriber之间再加一个subscriber,她可以从最初的publisher获取数据,转换,然后再作为一个新的publisher,而使最初的subscriber订阅这个新的publisher,也是Java9中的接口Flow.Processor<T,R>,我们只需要实现这个接口并编写转换数据的functions。

Flow类定义

package java.util.concurrent;

public final class Flow {

    private Flow() {} // uninstantiable

    @FunctionalInterface
    public static interface Publisher<T> {

        public void subscribe(Subscriber<? super T> subscriber);
    }

    public static interface Subscriber<T> {

        public void onSubscribe(Subscription subscription);

        public void onNext(T item);

        public void onError(Throwable throwable);

        public void onComplete();
    }

    public static interface Subscription {

        public void request(long n);

        public void cancel();
    }

    public static interface Processor<T,R> extends Subscriber<T>, Publisher<R> {
    }

    // 默认缓冲区大小256
    static final int DEFAULT_BUFFER_SIZE = 256;

    public static int defaultBufferSize() {
        return DEFAULT_BUFFER_SIZE;
    }

}

Interface Flow.Publisher<T>发布者

定义了生产数据和控制事件的方法。

Interface Flow.Subscriber<T>订阅者

定义了消费数据和事件的方法。

Interface Flow.Subscription订阅信息

定义了链接Publisher和Subscriber的方法。

Interface Flow.Processor<T, R>

定义了转换Publisher到Subscriber的方法,Processor 位于 Publisher 和 Subscriber 之间,用于做数据转换。可以有多个 Processor 同时使用,组成一个处理链,链中最后一个处理器的处理结果发送给 Subscriber。JDK 没有提供任何具体的处理器。处理器同时是订阅者和发布者,接口的定义也是继承了两者,作为订阅者接收数据,然后进行处理,处理完后作为发布者,再发布出去。

class SubmissionPublisher<T>

是Flow.Publisher<T>的实现,她可以灵活的生产数据,同时与Reactive Stream兼容。

应用场景

file

本文中给出的示例代码是以杂志出版商为模型。假设出版商有两个订阅客户。

出版商将为每个订阅客户出版20本杂志。出版商知道他们的客户有时在邮递杂志时会不在家,而当他们的邮箱(subscriber buffer)不巧被塞满时邮递员会退回或丢弃杂志,出版商不希望出现这种情况。

于是出版商发明了一个邮递系统:当客户在家时再给出版商致电,出版商会立即邮递一份杂志。出版商打算在办公室为每个客户保留一个小号的邮箱以防当杂志出版时客户没有第一时间致电获取。出版商认为为每个客户预留一个可以容纳8份杂志的邮件已经足够(publisher buffer)。

于是一名员工提出了以下不同的场景:

  1. 如果客户请求杂志足够迅速,将不会存在邮箱容量的问题。
  2. 如果客户没有以杂志出版的速度发出请求,那么邮箱将被塞满。这位员工提出以下几种处理方案:
    a. 增加邮箱容量,为每位客户提供可容纳20份杂志的邮箱。(publisher增加buffer)
    b. 直到客户请求下一份杂志之前停止印刷,并且根据客户请求的速度降低印刷速度以清空邮箱。
    c. 新的杂志直接丢掉。
    d. 一个折中的方案: 如果邮箱满了,在下次打印之前等待一段时间,如果还是没有足够的空间则丢弃新的杂志。

出版商无法承受花费过多的资源仅仅是因为一个速度慢的客户,那将是巨大的浪费,最终选择了方案d,最大程度上减少客户损失。

本文示例代码中选用了方案d是因为如果我们使用了一个虚拟的无穷buffer,这对理解Reactive模式的中概念是不利的,代码也将变得过于简易,无法与其他方案进行比较,接下来让我们来看代码吧。

Java9 Flow 代码示例

Subscriber订阅者

从订阅者开始,MagazineSubscriber实现了Flow.Subscriber<Integer>,订阅者将收到一个数字,但请假设这是一份杂志正如上面的使用场景提到的。class中实现了必要的方法如下:

  • onSubscriber(subscription): Publisher在被指定一个新的Subscriber时调用此方法。 一般来说你需要在subscriber内部保存这个subscrition实例,因为后面会需要通过她向publisher发送信号来完成:请求更多数据,或者取消订阅。 一般在这里我们会直接请求第一个数据,正如代码中所示。
  • onNext(magazineNumber): 每当新的数据产生,这个方法会被调用。在我们的示例中,我们用到了最经典的使用方式:处理这个数据的同时再请求下一个数据。然而我们在这中间添加了一段可配置的sleep时间,这样我们可以尝试订阅者在不同场景下的表现。剩下的一段逻辑判断仅仅是记录下丢失的杂志(当publisher出现丢弃数据的时候)。
  • onError(throwable): 当publisher出现异常时会调用subscriber的这个方法。在我们的实现中publisher丢弃数据时会产生异常。
  • onComplete(): 当publisher数据推送完毕时会调用此方法,于是整个订阅过程结束。
package com.weilai.webfluxdemo;

import org.slf4j.LoggerFactory;
import org.slf4j.Logger;
import java.util.concurrent.Flow;
import java.util.stream.IntStream;

/**
 * 杂志订阅者
 * @author weilai
 * @email 352342845@qq.com
 * @date 2020/8/7 3:53 下午
 */
public class MagazineSubscriber implements Flow.Subscriber<Integer> {

    public static final String JACK = "Jack";
    public static final String PETE = "Pete";

    private static final Logger log = LoggerFactory.
            getLogger(MagazineSubscriber.class);

    private final long sleepTime;
    private final String subscriberName;
    private Flow.Subscription subscription;
    private int nextMagazineExpected;
    private int totalRead;

    MagazineSubscriber(final long sleepTime, final String subscriberName) {
        this.sleepTime = sleepTime;
        this.subscriberName = subscriberName;
        this.nextMagazineExpected = 1;
        this.totalRead = 0;
    }

    /**
     * 当订阅时
     */
    @Override
    public void onSubscribe(final Flow.Subscription subscription) {
        this.subscription = subscription;
        subscription.request(1);
    }

    @Override
    public void onNext(final Integer magazineNumber) {
        if (magazineNumber != nextMagazineExpected) {
            IntStream.range(nextMagazineExpected, magazineNumber).forEach(
                    (msgNumber) ->
                            log("Oh no! I missed the magazine " + msgNumber)
            );
            // Catch up with the number to keep tracking missing ones
            nextMagazineExpected = magazineNumber;
        }
        log("Great! I got a new magazine: " + magazineNumber);
        takeSomeRest();
        nextMagazineExpected++;
        totalRead++;

        log("I'll get another magazine now, next one should be: " +
                nextMagazineExpected);
        subscription.request(1);
    }

    /**
     * 错误时
     */
    @Override
    public void onError(final Throwable throwable) {
        log("我从发布者那里收到错误消息: " + throwable.getMessage());
    }

    /**
     * 完成时
     */
    @Override
    public void onComplete() {
        log("Finally! I completed the subscription, I got in total " +
                totalRead + " magazines.");
    }

    private void log(final String logMessage) {
        log.info("<=========== [" + subscriberName + "] : " + logMessage);
    }

    public String getSubscriberName() {
        return subscriberName;
    }

    private void takeSomeRest() {
        try {
            Thread.sleep(sleepTime);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }
}

通过Java9 SubmissionPublisher发送数据

我们将使用Java9 SubmissionPublisher类来创建publisher。正如javadoc所述, 当subscribers消费过慢,就像Reactive Streams中的Publisher一样她会阻塞或丢弃数据。在深入理解之前让我们先看代码。

magazineDeliveryExample中我们为两个不同的subscribers设置了两个不同的等待时间, 并且设置了缓存容量maxStorageInPo
步骤如下:

  1. 创建SubmissionPublisher并设置一个标准的线程池(每个subscriber拥有一个线程)
  2. 创建两个subscribers,通过传递变量设置不同的消费时间和不同的名字,以在log中方便区别
    用20个数字的的stream数据集作为数据源以扮演“杂志打印机”,我们调用offer,并传递以下变量:
    a. 提供给subscribers的数据。
    b. 第二和第三个变量是等待subscribers获取杂志的最大时间。
    c. 控制器以处理数据丢弃的情况。这里我们抛出了一个异常,返回false意味着告诉publisher不需要重试。
  3. 当丢弃数据发生时,offer方法返回一个负数,否则将返回publisher的最大容量(以供最慢的subscriber消费),同时打印这个数字。
    5 . 最后我们添加了一个循环等待以防止主进程过早结束。这里一个是等待publisher清空缓存数据,另外等待最慢的subscriber收到onComplete回调信号(close()调用之后)

main()方法中使用不同参数调用以上逻辑三次,以模拟之前介绍的三种不同真是场景。

  1. 消费者消费速度很快,publisher缓存区不会发生问题。
  2. 其中一个消费者速度很慢,以至缓存被填满,然而缓存区足够大以容纳所有所有数据,不会发生丢弃。
  3. 其中一个消费者速度很慢,同时缓存区不够大,这是控制器被出发了多次,subscriber没有收到所有数据。

你还可以尝试其他组合,比如设置MAX_SECONDS_TO_WAIT_WHEN_NO_SPACE为很大的数字,这时offer表象将类似于submit,或者可以尝试将两个消费者速度同时降低(会出现大量丢弃数据)。

package com.weilai.webfluxdemo;

/**
 * @author weilai
 * @email 352342845@qq.com
 * @date 2020/8/7 3:54 下午
 */

import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.SubmissionPublisher;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ReactiveFlowApp {

    /**
     * 杂志数量
     */
    private static final int NUMBER_OF_MAGAZINES = 20;

    /**
     * 没有空间时最多保留秒数
     */
    private static final long MAX_SECONDS_TO_KEEP_IT_WHEN_NO_SPACE = 2;

    private static final Logger log =
            LoggerFactory.getLogger(ReactiveFlowApp.class);

    public static void main(String[] args) throws Exception {
        final ReactiveFlowApp app = new ReactiveFlowApp();

        // 订户速度很快,在这种情况下缓冲区大小不是那么重要。
        app.magazineDeliveryExample(100L, 100L, 8);

        // 订阅者速度较慢,但​​发布者方面的缓冲区大小足够好,可以保留所有项目,直到它们被拿走为止
//        app.magazineDeliveryExample(1000L, 3000L, NUMBER_OF_MAGAZINES);

        // 缓慢的订阅者,发布者方面的缓冲区大小非常有限,因此,请务必控制缓慢的订阅者
//        app.magazineDeliveryExample(1000L, 3000L, 8);

    }

    /**
     * 杂志交付示例
     * @param sleepTimeJack
     * @param sleepTimePete
     * @param maxStorageInPo
     * @throws Exception
     */
    void magazineDeliveryExample(final long sleepTimeJack,
                                 final long sleepTimePete,
                                 final int maxStorageInPo) throws Exception {
        // 发布者
        final SubmissionPublisher<Integer> publisher =
                new SubmissionPublisher<>(ForkJoinPool.commonPool(), maxStorageInPo);

        // jack订阅者
        final MagazineSubscriber jack = new MagazineSubscriber(
                sleepTimeJack,
                MagazineSubscriber.JACK
        );

        // pete订阅者
        final MagazineSubscriber pete = new MagazineSubscriber(
                sleepTimePete,
                MagazineSubscriber.PETE
        );

        // 建立关系
        publisher.subscribe(jack);
        publisher.subscribe(pete);

        log.info("每位订阅者打印20本杂志,出版商可容纳 " + maxStorageInPo + ". 他们有 " + MAX_SECONDS_TO_KEEP_IT_WHEN_NO_SPACE + " 秒消耗每本杂志.");
        IntStream.rangeClosed(1, 20).forEach((number) -> {
            log.info("提供杂志 " + number + " 给消费者");
            final int lag = publisher.offer(
                    // 数据
                    number,
                    // 超时时间
                    MAX_SECONDS_TO_KEEP_IT_WHEN_NO_SPACE,
                    // 时间单位
                    TimeUnit.SECONDS,

                    /**
                     * 当发生缓冲区满无法处理时
                     * subscriber订阅者
                     * msg发布数据
                     */
                    (subscriber, msg) -> {
                        // 触发onError事件
                        subscriber.onError(
                                new RuntimeException(((MagazineSubscriber) subscriber).getSubscriberName() + "! 获取杂志太慢并且没有缓冲区储存!将删除杂志数据: " + msg));
                        // 返回false表示不需要重试
                        return false;
                    });
            if (lag < 0) {
                log("丢掉 " + -lag + " 本杂志");
            } else {
                log("最慢的消费者有 " + lag + " 总共要收集的杂志");
            }
        });

        // 阻塞直到所有订阅者都完成为止(此部分可以通过使用latches来改进,但通过这种方式,我们可以使其变得简单)
        while (publisher.estimateMaximumLag() > 0) {
            Thread.sleep(500L);
        }

        // 关闭发布者,在每个订阅者上调用onComplete()方法
        publisher.close();
        // 给最慢的消费者一些时间来唤醒,并注意到它已经完成
        Thread.sleep(Math.max(sleepTimeJack, sleepTimePete));
    }

    private static void log(final String message) {
        log.info("===========> " + message);
    }

}

Processor示例

package com.weilai.webflux;

import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;

/**
 * @author weilai
 * @email 352342845@qq.com
 * @date 2020/8/5 3:40 下午
 */
public class Test2 {

    public static void main(String[] args) throws InterruptedException {
        SubmissionPublisher<Integer> publisher = new SubmissionPublisher<>();

        MyProcessor myProcessor = new MyProcessor();

        publisher.subscribe(myProcessor);

        Flow.Subscriber<String> subscriber = new Flow.Subscriber<>() {

            private Flow.Subscription subscription;

            @Override
            public void onSubscribe(Flow.Subscription subscription) {
                this.subscription = subscription;
                this.subscription.request(1);
            }

            @Override
            public void onNext(String item) {
                System.out.println(item);
                this.subscription.request(1);
            }

            @Override
            public void onError(Throwable throwable) {
                throwable.printStackTrace();
            }

            @Override
            public void onComplete() {
                System.out.println("complete");
            }
        };

        myProcessor.subscribe(subscriber);

        publisher.submit(1);
        publisher.submit(-1);
        publisher.submit(2);
        publisher.submit(-2);

        Thread.currentThread().join(1000);

    }
}

class MyProcessor extends SubmissionPublisher<String> implements Flow.Processor<Integer, String> {

    private Flow.Subscription subscription;

    @Override
    public void onSubscribe(Flow.Subscription subscription) {
        this.subscription = subscription;
        this.subscription.request(1);
    }

    @Override
    public void onNext(Integer item) {
        if (item > 0) {
            this.submit("转发:" + item);
        }
        this.subscription.request(1);
    }

    @Override
    public void onError(Throwable throwable) {
        throwable.printStackTrace();
    }

    @Override
    public void onComplete() {
        System.out.println("complete");
    }
}

原文链接https://thepracticaldeveloper.com/2018/01/31/reactive-programming-java-9-flow/

正文到此结束
本文目录