This page looks best with JavaScript enabled

RxJava's Thread Dispatch

 ·  ☕ 5 min read

简单的使用 RxJava,并结合线程切换:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
Observable.create(new ObservableOnSubscribe<Object>() {
    @Override
    public void subscribe(ObservableEmitter<Object> emitter) {
        Log.d("TAG", "[subscribe]" + Thread.currentThread().getName());
        emitter.onNext(1);
        emitter.onNext(2);
        emitter.onNext(3);
        emitter.onComplete();
    }
}).subscribeOn(Schedulers.io())
  .observeOn(AndroidScheduler.mainThread())
        .subscribe(new Observer<Object>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.d("TAG", "[onSubscribe]" + Thread.currentThread().getName());
            }
            @Override
            public void onNext(Object o) {
                Log.d("TAG", "[onNext]" + Thread.currentThread().getName());
            }
            @Override
            public void onError(Throwable e) {}
            @Override
            public void onComplete() {
                Log.d("TAG", "[onComplete]" + Thread.currentThread().getName());
            }
        });

上次已经分析过不带线程的订阅和观察过程,现在直接从 subscribeOnobserverOn 开始分析。由上次得知 Observable.create 创建最后获得的是 ObservableCreate 被观察者对象。

ObservableCreate

先分析 Observable.subscribeOn

1
2
3
4
5
public final Observable<T> subscribeOn(Scheduler scheduler) {
    ObjectHelper.requireNonNull(scheduler, "scheduler is null");
    // 这里的 this 是上层的 ObservableCreate 对象,又添加一层封装,返回 ObservableSubscribeOn 对象
    return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}

看下 ObservableSubscribeOn 构造函数:

1
2
3
4
public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
   super(source);
   this.scheduler = scheduler;
}

就是保存下 source 和被观察者的 scheduler

到这里,最初的 ObservableOnSubscribe 已经被包装了3层:

ObservableSubscribeOn

然后到执行订阅的地方,事件的发出都是在订阅之后,所以来到 Observable.subscribe 方法,上篇也分析了最终会走到当前被观察者的 subscribeActual 方法,现在的被观察者被包装为了 ObservableSubscribeOn
所以直接到 ObservableSubscribeOn 的 subscribeActual 中:

1
2
3
4
5
6
7
8
public void subscribeActual(final Observer<? super T> s) {
    // 将观察者包装一层
    final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
    // 调用观察者的 onSubscribe,这里还没有进行线程切换,所以是发生在当前 Observable 被创建的线程
    s.onSubscribe(parent);
    // 创建一个 SubscribeTask,然后由线程调度器调度执行。这里进行了线程的切换工作
    parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}

这里先将我们自己的观察者 Observer 包装为了 SubscribeOnObserver 对象。

SubscribeOnObserver

然后重要的是创建了一个 SubscribeTask,接着调用 scheduler.scheduDirect 执行。

先看下 SubscribeTask 类:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
final class SubscribeTask implements Runnable {
        private final SubscribeOnObserver<T> parent;

        SubscribeTask(SubscribeOnObserver<T> parent) {
            this.parent = parent;
        }

        @Override
        public void run() {
            // SubscribeTask 是 ObservableSubscribeOn 的内部类, 所以能直接访问 source 对象
            source.subscribe(parent);
        }
}

SubscribeTask 实现了 Runnable,并在 run() 中调用了被观察者的 Observeable.subscribe(),从而执行我们自己的事件发送代码。

再看 scheduler.scheduDirect() 方法:

1
2
3
public Disposable scheduleDirect(@NonNull Runnable run) {
    return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
}

往下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
    // 创建 worker
    final Worker w = createWorker();
    // 将传入的 Runnable 封装一层,实际还是传入的 Runnable
    final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
    // 将 worker 和 runnable 包装为 DisposeTask
    DisposeTask task = new DisposeTask(decoratedRun, w);
    // 调用 worker 执行
    w.schedule(task, delay, unit);

    return task;
}

Scheduler 是个抽象类,createWorker 需要子类实现,就选常用的 IoSchedulercreateWorker查看:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
final AtomicReference<CachedWorkerPool> pool;

public Worker createWorker() {
    // 创建一个 EventLoopWorker,并传入一个 Worker 缓存池
    return new EventLoopWorker(pool.get());
}

static final class EventLoopWorker extends Scheduler.Worker {
    private final CompositeDisposable tasks;
    private final CachedWorkerPool pool;
    private final ThreadWorker threadWorker;

    final AtomicBoolean once = new AtomicBoolean();

    EventLoopWorker(CachedWorkerPool pool) {
        this.pool = pool;
        this.tasks = new CompositeDisposable();
        // 从缓存中获取一个 Woker
        this.threadWorker = pool.get();
    }

    public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
        //...
        // Runnable 交给 threadWorker 去执行,这里的 Runnable 的 run() 方法中执行的就是我们在被观察者发送事件的代码
        return threadWorker.scheduleActual(action, delayTime, unit, tasks);
    }
}

看下 CachedWorkerPool Worker 缓存池的实现:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
static final class CachedWorkerPool implements Runnable {

    //...

    ThreadWorker get() {
        if (allWorkers.isDisposed()) {
            return SHUTDOWN_THREAD_WORKER;
        }
        while (!expiringWorkerQueue.isEmpty()) {
            // 如果缓存池不空,就取一个 threadWorker
            ThreadWorker threadWorker = expiringWorkerQueue.poll();
            if (threadWorker != null) {
                return threadWorker;
            }
        }
        // 如果空的,就创建一个新的
        ThreadWorker w = new ThreadWorker(threadFactory);
        allWorkers.add(w);
        return w;
    }

}

继续看 threadWorker.schedulerActual 实现,ThreadWorker没有实现这个方法,看下它的父类 NewThreadWorker

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class NewThreadWorker extends Scheduler.Worker implements Disposable {
    public NewThreadWorker(ThreadFactory threadFactory) {
        // 构造时获取一个 ScheduledExecutorService 
        executor = SchedulerPoolFactory.create(threadFactory);
    }

    public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
        // 还是传入的 runnable 对象
        Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
        // 将 decoratedRun 包装为一个新的 runnable
        ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);

        Future<?> f;
        try {
            if (delayTime <= 0) {
                // 线程池中立即执行
                f = executor.submit((Callable<Object>)sr);
            } else {
                // 延迟执行
                f = executor.schedule((Callable<Object>)sr, delayTime, unit);
            }            
        } catch (RejectedExecutionException ex) {
            //...
        }

        return sr;
    }
}

到这里,SubscribeTaskrun() 方法最终会在线程池中被执行,也就是我们在 subscribe 方法中写的发送事件的代码在这里执行。

observerOn 操作

上面的栗子中,是这样的:

1
2
// 指定观察者在 Android Main Thread 接受事件结果
.observeOn(AndroidScheduler.mainThread())

Observable类的 observeOn()方法:

1
2
3
4
5
6
7
8
9
public final Observable<T> observeOn(Scheduler scheduler) {
    return observeOn(scheduler, false, bufferSize());
}

public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
    //...
    // 将自身包装为新的被观察者对象,因为进行 subscribeOn 时也包装了一层,所以现在一共4层了
    return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}

ObservableObserveOn

ObservableObserveOn 类:

1
2
3
4
5
6
public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
    super(source);
    this.scheduler = scheduler;
    this.delayError = delayError;
    this.bufferSize = bufferSize;
}

就是存一些属性。

然后就是一样的操作,直接来到订阅的地方,ObservableObserveOnsubscribeActual

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
protected void subscribeActual(Observer<? super T> observer) {
    if (scheduler instanceof TrampolineScheduler) {
        // 如果是当前线程,直接执行上一层的 subsribe
        // 执行最里面的 ObservableSubscribeOn 的 subscribe() 方法
        source.subscribe(observer);
    } else {
        // 创建 worker,栗子中的 scheduler 是 AndroidSchedulers.mainThread()
        Scheduler.Worker w = scheduler.createWorker();
        // 执行subscribe(),这里是在上面提到的 SubscribeTask 的 run() 中执行
        // 将观察者包装为 ObserveOnObserver
        source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
    }
}

加上上面的 subscribeOn 操作,观察者已经被包装了2层:

ObserveOnObserver

source.subscribe() 中将会把事件通过 onNext onError onComplete 方法发送出去,所以看下 ObserveOnObserveronNext

1
2
3
4
5
6
7
8
public void onNext(T t) {
    if (sourceMode != QueueDisposable.ASYNC) {
        // 将结果存入队列
        queue.offer(t);
    }
    // 调用 schedule()
    schedule();
}

ObserveOnObserver 的 schdule 方法:

1
2
3
4
5
6
7
8
void schedule() {
    if (getAndIncrement() == 0) {
        // ObserveOnObserver 实现了 Runnable 接口,所以把自己交给 worker 去执行
        // 这里的 worker 由 Android MainThread Schduler 提供,它实际是通过向 Android MainThread 的 Looper 发送 Message 实现的线程切换。
        // 构造 callback 为 this 的 Message 发送到主线程,主线程消费这条消息时,就执行 callback 的 run() 方法,即这里 this 对象 ObserveOnObserver 的 run() 方法 。
        worker.schedule(this);
    }
}

ObserveOnObserver 实现了 Runnable 接口,所以它的 run() 方法将会在主线程调用。

ObserveOnObserver 的 run() 方法:

1
2
3
4
5
6
7
8
public void run() {
    // outputFused 默认为 false
    if (outputFused) {
        drainFused();
    } else {
        drainNormal();
    }
}

ObserveOnObserver 的 drainNormal() 方法:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
void drainNormal() {
    
    // 存消息的队列
    final SimpleQueue<T> q = queue;
    // 上层观察者,SubscribeOnObserver
    final Observer<? super T> a = actual;

    for (;;) {
        for (;;) {
            
            T v;
            try {
                // 从队列取消息
                v = q.poll();                
            } catch (Throwable ex) {
                //...
                return;
            }
            //...
            // 调用 SubscribeOnObserver 的 onNext()
            a.onNext(v);
        }
        //...
    }
}

SubscribeOnObserver 的 onNext 没做特别的事情,就是调用原始观察者的 onNext:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
static final class SubscribeOnObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable {

    SubscribeOnObserver(Observer<? super T> actual) {
        this.actual = actual;
        this.s = new AtomicReference<Disposable>();
    }

    @Override
    public void onNext(T t) {
        actual.onNext(t);
    }
}

因此 Observer 的 onNext() 就会在 Android 的主线程执行了。其他的 onError onCompleteonNext 类似。

Support the author with
alipay QR Code
wechat QR Code

Yang
WRITTEN BY
Yang
Developer