简单的使用 RxJava,并结合线程切换:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
|
Observable.create(new ObservableOnSubscribe<Object>() {
@Override
public void subscribe(ObservableEmitter<Object> emitter) {
Log.d("TAG", "[subscribe]" + Thread.currentThread().getName());
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
emitter.onComplete();
}
}).subscribeOn(Schedulers.io())
.observeOn(AndroidScheduler.mainThread())
.subscribe(new Observer<Object>() {
@Override
public void onSubscribe(Disposable d) {
Log.d("TAG", "[onSubscribe]" + Thread.currentThread().getName());
}
@Override
public void onNext(Object o) {
Log.d("TAG", "[onNext]" + Thread.currentThread().getName());
}
@Override
public void onError(Throwable e) {}
@Override
public void onComplete() {
Log.d("TAG", "[onComplete]" + Thread.currentThread().getName());
}
});
|
上次已经分析过不带线程的订阅和观察过程,现在直接从 subscribeOn
和 observerOn
开始分析。由上次得知 Observable.create
创建最后获得的是 ObservableCreate
被观察者对象。
先分析 Observable.subscribeOn
1
2
3
4
5
|
public final Observable<T> subscribeOn(Scheduler scheduler) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
// 这里的 this 是上层的 ObservableCreate 对象,又添加一层封装,返回 ObservableSubscribeOn 对象
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}
|
看下 ObservableSubscribeOn 构造函数:
1
2
3
4
|
public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
super(source);
this.scheduler = scheduler;
}
|
就是保存下 source
和被观察者的 scheduler
。
到这里,最初的 ObservableOnSubscribe 已经被包装了3层:
然后到执行订阅的地方,事件的发出都是在订阅之后,所以来到 Observable.subscribe 方法,上篇也分析了最终会走到当前被观察者的 subscribeActual
方法,现在的被观察者被包装为了 ObservableSubscribeOn
,
所以直接到 ObservableSubscribeOn 的 subscribeActual 中:
1
2
3
4
5
6
7
8
|
public void subscribeActual(final Observer<? super T> s) {
// 将观察者包装一层
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
// 调用观察者的 onSubscribe,这里还没有进行线程切换,所以是发生在当前 Observable 被创建的线程
s.onSubscribe(parent);
// 创建一个 SubscribeTask,然后由线程调度器调度执行。这里进行了线程的切换工作
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
|
这里先将我们自己的观察者 Observer
包装为了 SubscribeOnObserver
对象。
然后重要的是创建了一个 SubscribeTask
,接着调用 scheduler.scheduDirect
执行。
先看下 SubscribeTask
类:
1
2
3
4
5
6
7
8
9
10
11
12
13
|
final class SubscribeTask implements Runnable {
private final SubscribeOnObserver<T> parent;
SubscribeTask(SubscribeOnObserver<T> parent) {
this.parent = parent;
}
@Override
public void run() {
// SubscribeTask 是 ObservableSubscribeOn 的内部类, 所以能直接访问 source 对象
source.subscribe(parent);
}
}
|
SubscribeTask 实现了 Runnable
,并在 run()
中调用了被观察者的 Observeable.subscribe()
,从而执行我们自己的事件发送代码。
再看 scheduler.scheduDirect()
方法:
1
2
3
|
public Disposable scheduleDirect(@NonNull Runnable run) {
return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
}
|
往下:
1
2
3
4
5
6
7
8
9
10
11
12
|
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
// 创建 worker
final Worker w = createWorker();
// 将传入的 Runnable 封装一层,实际还是传入的 Runnable
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
// 将 worker 和 runnable 包装为 DisposeTask
DisposeTask task = new DisposeTask(decoratedRun, w);
// 调用 worker 执行
w.schedule(task, delay, unit);
return task;
}
|
Scheduler 是个抽象类,createWorker
需要子类实现,就选常用的 IoScheduler
的 createWorker
查看:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
|
final AtomicReference<CachedWorkerPool> pool;
public Worker createWorker() {
// 创建一个 EventLoopWorker,并传入一个 Worker 缓存池
return new EventLoopWorker(pool.get());
}
static final class EventLoopWorker extends Scheduler.Worker {
private final CompositeDisposable tasks;
private final CachedWorkerPool pool;
private final ThreadWorker threadWorker;
final AtomicBoolean once = new AtomicBoolean();
EventLoopWorker(CachedWorkerPool pool) {
this.pool = pool;
this.tasks = new CompositeDisposable();
// 从缓存中获取一个 Woker
this.threadWorker = pool.get();
}
public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
//...
// Runnable 交给 threadWorker 去执行,这里的 Runnable 的 run() 方法中执行的就是我们在被观察者发送事件的代码
return threadWorker.scheduleActual(action, delayTime, unit, tasks);
}
}
|
看下 CachedWorkerPool
Worker 缓存池的实现:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
static final class CachedWorkerPool implements Runnable {
//...
ThreadWorker get() {
if (allWorkers.isDisposed()) {
return SHUTDOWN_THREAD_WORKER;
}
while (!expiringWorkerQueue.isEmpty()) {
// 如果缓存池不空,就取一个 threadWorker
ThreadWorker threadWorker = expiringWorkerQueue.poll();
if (threadWorker != null) {
return threadWorker;
}
}
// 如果空的,就创建一个新的
ThreadWorker w = new ThreadWorker(threadFactory);
allWorkers.add(w);
return w;
}
}
|
继续看 threadWorker.schedulerActual
实现,ThreadWorker
没有实现这个方法,看下它的父类 NewThreadWorker
:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
|
public class NewThreadWorker extends Scheduler.Worker implements Disposable {
public NewThreadWorker(ThreadFactory threadFactory) {
// 构造时获取一个 ScheduledExecutorService
executor = SchedulerPoolFactory.create(threadFactory);
}
public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
// 还是传入的 runnable 对象
Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
// 将 decoratedRun 包装为一个新的 runnable
ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
Future<?> f;
try {
if (delayTime <= 0) {
// 线程池中立即执行
f = executor.submit((Callable<Object>)sr);
} else {
// 延迟执行
f = executor.schedule((Callable<Object>)sr, delayTime, unit);
}
} catch (RejectedExecutionException ex) {
//...
}
return sr;
}
}
|
到这里,SubscribeTask
的 run()
方法最终会在线程池中被执行,也就是我们在 subscribe
方法中写的发送事件的代码在这里执行。
observerOn 操作
上面的栗子中,是这样的:
1
2
|
// 指定观察者在 Android Main Thread 接受事件结果
.observeOn(AndroidScheduler.mainThread())
|
Observable类的 observeOn()
方法:
1
2
3
4
5
6
7
8
9
|
public final Observable<T> observeOn(Scheduler scheduler) {
return observeOn(scheduler, false, bufferSize());
}
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
//...
// 将自身包装为新的被观察者对象,因为进行 subscribeOn 时也包装了一层,所以现在一共4层了
return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}
|
ObservableObserveOn
类:
1
2
3
4
5
6
|
public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
super(source);
this.scheduler = scheduler;
this.delayError = delayError;
this.bufferSize = bufferSize;
}
|
就是存一些属性。
然后就是一样的操作,直接来到订阅的地方,ObservableObserveOn
的 subscribeActual
:
1
2
3
4
5
6
7
8
9
10
11
12
13
|
protected void subscribeActual(Observer<? super T> observer) {
if (scheduler instanceof TrampolineScheduler) {
// 如果是当前线程,直接执行上一层的 subsribe
// 执行最里面的 ObservableSubscribeOn 的 subscribe() 方法
source.subscribe(observer);
} else {
// 创建 worker,栗子中的 scheduler 是 AndroidSchedulers.mainThread()
Scheduler.Worker w = scheduler.createWorker();
// 执行subscribe(),这里是在上面提到的 SubscribeTask 的 run() 中执行
// 将观察者包装为 ObserveOnObserver
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}
|
加上上面的 subscribeOn 操作,观察者已经被包装了2层:
source.subscribe()
中将会把事件通过 onNext
onError
onComplete
方法发送出去,所以看下 ObserveOnObserver
的 onNext
:
1
2
3
4
5
6
7
8
|
public void onNext(T t) {
if (sourceMode != QueueDisposable.ASYNC) {
// 将结果存入队列
queue.offer(t);
}
// 调用 schedule()
schedule();
}
|
ObserveOnObserver 的 schdule 方法:
1
2
3
4
5
6
7
8
|
void schedule() {
if (getAndIncrement() == 0) {
// ObserveOnObserver 实现了 Runnable 接口,所以把自己交给 worker 去执行
// 这里的 worker 由 Android MainThread Schduler 提供,它实际是通过向 Android MainThread 的 Looper 发送 Message 实现的线程切换。
// 构造 callback 为 this 的 Message 发送到主线程,主线程消费这条消息时,就执行 callback 的 run() 方法,即这里 this 对象 ObserveOnObserver 的 run() 方法 。
worker.schedule(this);
}
}
|
ObserveOnObserver 实现了 Runnable 接口,所以它的 run() 方法将会在主线程调用。
ObserveOnObserver 的 run() 方法:
1
2
3
4
5
6
7
8
|
public void run() {
// outputFused 默认为 false
if (outputFused) {
drainFused();
} else {
drainNormal();
}
}
|
ObserveOnObserver 的 drainNormal() 方法:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
void drainNormal() {
// 存消息的队列
final SimpleQueue<T> q = queue;
// 上层观察者,SubscribeOnObserver
final Observer<? super T> a = actual;
for (;;) {
for (;;) {
T v;
try {
// 从队列取消息
v = q.poll();
} catch (Throwable ex) {
//...
return;
}
//...
// 调用 SubscribeOnObserver 的 onNext()
a.onNext(v);
}
//...
}
}
|
SubscribeOnObserver 的 onNext 没做特别的事情,就是调用原始观察者的 onNext:
1
2
3
4
5
6
7
8
9
10
11
12
|
static final class SubscribeOnObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable {
SubscribeOnObserver(Observer<? super T> actual) {
this.actual = actual;
this.s = new AtomicReference<Disposable>();
}
@Override
public void onNext(T t) {
actual.onNext(t);
}
}
|
因此 Observer 的 onNext() 就会在 Android 的主线程执行了。其他的 onError
onComplete
与 onNext
类似。