前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布

Reactor

作者头像
spilledyear
发布2019-05-15 15:10:17
8230
发布2019-05-15 15:10:17
举报
文章被收录于专栏:小白鼠

Flux.just

从以下两行简单代码入手

代码语言:javascript
复制
Flux<String> fewWords = Flux.just("Hello", "World");
fewWords.subscribe(System.out::println);

Flux.subscribe是一个final方法,如下,最终入参consumer被封装成一个 LambdaSubscriber

代码语言:javascript
复制
public final Disposable subscribe(Consumer<? super T> consumer) {
    Objects.requireNonNull(consumer, "consumer");
    return subscribe(consumer, null, null);
}

public final Disposable subscribe(
        @Nullable Consumer<? super T> consumer,
        @Nullable Consumer<? super Throwable> errorConsumer,
        @Nullable Runnable completeConsumer,
        @Nullable Consumer<? super Subscription> subscriptionConsumer) {
    return subscribeWith(new LambdaSubscriber<>(consumer, errorConsumer,
            completeConsumer,
            subscriptionConsumer));
}

LambdaSubscriber继承关系

内部流程
  1. Flux.just("Hello", "World") 返回一个 FluxArray 对象,两个参数被封装成一个数组,并作为 FluxArray 的属性 array
  2. fewWords.subscribe(System.out::println),入参 System.out::println 被封装成一个 LambdaSubscriber ,然后在 FluxArray.subscribe 方法中调用 LambdaSubscriber.onSubscribe 方法(以自己和array为参数,封装一个ArraySubscription对象作为onSubscribe方法的参数) ,即 LambdaSubscriber.onSubscribe(new ArraySubscription<>(LambdaSubscriber, array))
  3. LambdaSubscriber.onSubscribe 方法内部调用 ArraySubscription.request 方法(入参为Long.MAX_VALUE);
  4. ArraySubscription.request 方法内部调用 LambdaSubscriber.onNext/onError/onComplete 方法;
  5. LambdaSubscriber的onNext方法内部调用真正的逻辑;

Flux.map

代码语言:javascript
复制
Flux<String> fewWords = Flux.just("Hello", "World").map(v -> v.toUpperCase());
fewWords.subscribe(System.out::println);
内部流程
  1. Flux.just("Hello", "World") 返回一个 FluxArray 对象,两个参数被封装成一个数组,并作为 FluxArray 的属性 array
  2. Flux.map(v -> v.toUpperCase()) 返回一个 FluxMapFuseable 对象,创建 FluxMapFuseable 对象的时候以 FluxArrayFunction 作为参数,即 return onAssembly(new FluxMapFuseable<>(this, mapper)) //这里的this即代表 FluxArray
  3. fewWords.subscribe(System.out::println),即调用 FluxMapFuseable.subscribe 方法,在该方法内调用 source.subscribe 方法(这里的source 即指 FluxArray),此时会传入一个 MapFuseableSubscriber 对象,在创建 MapFuseableSubscriber 时候以 actual(消费者) 和 mapper(map对应的Function入参) 作为入参, 即 source.subscribe(new MapFuseableSubscriber<>(actual, mapper))
  4. FluxArray.subscribe 方法内部调用 MapFuseableSubscriber.onSubscribe 方法(入参为 ArraySubscription ,创建 ArraySubscription 的时候以 MapFuseableSubscriberarray(真实数据) 为入参),即 s.onSubscribe(new ArraySubscription<>(s, array)) //这里的s代表 MapFuseableSubscriber
  5. MapFuseableSubscriber.onSubscribe 方法内部, 赋值 入参 ArraySubscription 为自己的成员变量 s,然后以自己(MapFuseableSubscriber)为入参,调用 actual即LambdaSubscriber.onSubscribe 方法, 即 actual.onSubscribe(this)
  6. LambdaSubscriber.onSubscribe 方法内部调用 (入参)MapFuseableSubscriber.request(Long.MAX_VALUE) 方法;
  7. MapFuseableSubscriber.request 方法内部,调用成员变量s(s前面说过,即ArraySubscription) 即 ArraySubscription.request 方法;
  8. ArraySubscription.request 方法内部,调用成员变量actual的相关方法(onNext/onError/onComplete)。前面对 ArraySubscription 分析过,是在调用 FluxArray.subscribe 方法的时候创建的,创建的时候以 MapFuseableSubscriberarray(真实数据) 为入参,所以这里的 actual即代表MapFuseableSubscriber,所以这里也就是调用 MapFuseableSubscriber 的相关方法(onNext/onError/onComplete);
  9. MapFuseableSubscriber.onNext/onError/onComplete 方法内部,如果不是onNext方法,会直接调用成员变量actual(LambdaSubscriber).onError/onComplete 相关方法;如果是onNext方法,会先执行 mapper的相关逻辑,得到一个结果 v,然后再以 v 为入参,调用成员变量actual(LambdaSubscriber).onNext 方法;【调用成员变量actual(LambdaSubscriber)的相关方法也就是执行真正的消费者逻辑】

Flux.create

代码语言:javascript
复制
Flux.create(new Consumer<FluxSink<String>>() {
    @Override
    public void accept(FluxSink<String> fluxSink) {
        fluxSink.next("发送数据耶");
    }
}).subscribe(System.out::println);

FluxCreate

内部流程
  1. Flux.create 方法发返回一个 FluxCreate 对象,即 return onAssembly(new FluxCreate<>(emitter, backpressure, FluxCreate.CreateMode.PUSH_PULL))
  2. FluxCreate.subscribe 方法中,先以 actual(消费者)被压策略 创建 BaseSink 对象,这里返回一个 BufferAsyncSink 对象;然后调用 actual.onSubscribe(sink) 方法(这里的sink就是BaseSink对象);然后再 调用 source.accept(createMode == CreateMode.PUSH_PULL ? new SerializedSink<>(sink) :sink) 方法(这里的source指的是create方法中传入的Function,即 匿名内部类)
代码语言:javascript
复制
public void subscribe(CoreSubscriber<? super T> actual) {
    // (1). 创建BaseSink对象,这里返回一个 BufferAsyncSink 对象
    BaseSink<T> sink = createSink(actual, backpressure);
    
    // (2). 调用消费者的onSubscribe方法
    actual.onSubscribe(sink);

    try {
        // (3). source指的是create方法中传入的 Function,即 匿名内部类
        source.accept(createMode == CreateMode.PUSH_PULL ? new SerializedSink<>(sink) :sink);
    } catch (Throwable ex) {
        Exceptions.throwIfFatal(ex);
        sink.error(Operators.onOperatorError(ex, actual.currentContext()));
    }
}
  1. actual.onSubscribe(sink) 方法中调用 sink即BufferAsyncSink .request 方法(BufferAsyncSink 继承了 BaseSink 对象,request 方法在 BaseSink 中);
  2. BaseSink.request 方法中调用子类 BufferAsyncSink.onRequestedFromDownstream 方法;
  3. BufferAsyncSink.onRequestedFromDownstream 方法中 调用 BufferAsyncSink.drain 方法;
  4. BufferAsyncSink.drain 方法中,会判断队列中是有有数据,如果有就会执行 actual.onNext 方法,因为此时队列中没有数据,所以会直接返回;
  5. 先以 BufferAsyncSink 为入参创建 SerializedSink 对象,然后以 SerializedSink 为入参,调用 Function(create方法的入参).accept 方法(即执行我们自己写的逻辑),然后调用 入参 SerializedSink.next 方法,即 fluxSink.next("发送数据耶")
  6. SerializedSink.next 方法中调用 BufferAsyncSink.next 方法;
  7. BufferAsyncSink.next 方法中,先将数据放入队列,然后再执行 BufferAsyncSink.drain 方法;注意,因为此时队列中有数据了,所以在接下来的 BufferAsyncSink.drain 方法调用中,会调用 actual.onNext(o) 方法(即执行我们自定义的消费者onNext方法逻辑);

FluxSink

本文参与?腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2019.05.12 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客?前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与?腾讯云自媒体同步曝光计划? ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Flux.just
  • Flux.map
  • Flux.create
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
http://www.vxiaotou.com