众所周知 RxJava 是基于观察者模式的响应式编程框架。其中主要有2个主要对象:
- Observable 被观察者
- Observer 观察者
不带线程切换的基本用法:
|  1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
 | Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
    @Override
    public void subscribe(ObservableEmitter<Integer> emitter) {
        emitter.onNext(1);
        emitter.onComplete();
    }
});
Observer<Integer> observer = new Observer<Integer>() {
    @Override
    public void onSubscribe(Disposable d) {
    }
    @Override
    public void onNext(Integer o) {
    }
    @Override
    public void onError(Throwable e) {
    }
    @Override
    public void onComplete() {
    }
};
observable.subscribe(observer);
 | 
 
好了,一般创建被观察者都是使用静态方法 Observable.create 传递一个 ObservableOnSubscribe 对象。
来,看  Observable 的 create 方法:
| 1
2
3
4
 | public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
    ObjectHelper.requireNonNull(source, "source is null");
    return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
 | 
 
查看参数 ObservableOnSubscribe 类:
| 1
2
3
4
5
6
 | public interface ObservableOnSubscribe<T> {
    /**
     * Called for each Observer that subscribes.     
     */
    void subscribe(@NonNull ObservableEmitter<T> emitter) throws Exception;
}
 | 
 
它就是一个接口,有一个 subscribe 方法需要具体实现。
再看 RxJavaPlugins.onAssembly (onAssembly 上面栗子的情况下,没有特殊作用,传什么对象,返回什么对象):
| 1
2
3
4
5
6
7
 | public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
    Function<? super Observable, ? extends Observable> f = onObservableAssembly;
    if (f != null) {
        return apply(f, source);
    }
    return source;
}
 | 
 
看 onAssembly 的参数, ObservableCreate 类:
| 1
2
3
4
5
6
7
8
 | public final class ObservableCreate<T> extends Observable<T> {
    final ObservableOnSubscribe<T> source;
    public ObservableCreate(ObservableOnSubscribe<T> source) {
        this.source = source;
    }
    ....
}
 | 
 
它继承自 Observable ,并通过构造函数将 ObservableOnSubscribe 保存为自己的属性。
所以在创建 Observable 对象的过程中,进行了1次包装,最终返回了一个 ObservableCreate 被观察者对象:


看 Observable 的 subscribe 方法
主要代码如下:
|  1
 2
 3
 4
 5
 6
 7
 8
 9
10
 | public final void subscribe(Observer<? super T> observer) {    
    try {
        observer = RxJavaPlugins.onSubscribe(this, observer);
        subscribeActual(observer);
    } catch (NullPointerException e) {
        throw e;
    } catch (Throwable e) {    
        RxJavaPlugins.onError(e);                
    }
}
 | 
 
subscribe 方法需要接收一个 Observer 对象,看下 Observer:
|  1
 2
 3
 4
 5
 6
 7
 8
 9
10
 | public interface Observer<T> {
    void onSubscribe(@NonNull Disposable d);
    void onNext(@NonNull T t);
    void onError(@NonNull Throwable e);
    void onComplete();
}
 | 
 
是个接口,观察者实例需要实现这几个方法。
然后继续 subscribe 方法:
RxJavaPlugins.onSubscribe(this, observer):
| 1
2
3
4
5
6
7
 | public static <T> Observer<? super T> onSubscribe(@NonNull Observable<T> source, @NonNull Observer<? super T> observer) {
    BiFunction<? super Observable, ? super Observer, ? extends Observer> f = onObservableSubscribe;
    if (f != null) {
        return apply(f, source, observer);
    }
    return observer;
}
 | 
 
没有做特别的,直接返回传进去的 Observer。
接着看 subscribeActual:
| 1
 | protected abstract void subscribeActual(Observer<? super T> observer);
 | 
 
唉,需要子类实现。回顾上面 Observable.create 的过程,我们创建的 Observable 实际是 ObservableCreate 对象,所以到 ObservableCreate 里看 subscribeActual方法的具体实现:
|  1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
 | protected void subscribeActual(Observer<? super T> observer) {
    CreateEmitter<T> parent = new CreateEmitter<T>(observer);
    observer.onSubscribe(parent);
    try {
        source.subscribe(parent);
    } catch (Throwable ex) {
        Exceptions.throwIfFatal(ex);
        parent.onError(ex);
    }
}
 | 
 
里面调用了 observer.onSubscribe(parent) 方法。 parent 是一个 CreateEmitter 对象。
看下 CreateEmitter 的源码,发现它实现了 ObservableEmitter 接口和 Disposable 接口,所以它可以多态转为 ObservableEmitter 和 Disposable 实例对象。
继续, observer.onSubscribe(parent) 就是回调观察者的 onSubscribe,并将 CreateEmitter 转为 Disposable 对象传递。
然后 source.subscribe(parent) 是调用被观察者的 subscribe 方法,并将 CreateEmitter 转为 ObservableEmitter 对象传递。
从上面知道 source 就是 ObservableCreate 类保存的 ObservableOnSubscribe 对象,是最上面栗子里创建的被观察者,所以source.subscribe(parent) 就走到了栗子里的代码:
| 1
2
3
4
 | public void subscribe(ObservableEmitter<Integer> emitter) {
        emitter.onNext(1);
        emitter.onComplete();
    }
 | 
 
这里我调用了 emitter 的 onNext 和 onComplete 方法。看下 CreateEmitter 具体实现:
|  1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
 |     static final class CreateEmitter<T>
    extends AtomicReference<Disposable>
    implements ObservableEmitter<T>, Disposable {
        private static final long serialVersionUID = -3434801548987643227L;
        final Observer<? super T> observer;
        CreateEmitter(Observer<? super T> observer) {
            this.observer = observer;
        }
        @Override
        public void onNext(T t) {
            if (t == null) {                
                return;
            }
            if (!isDisposed()) {
                observer.onNext(t);
            }
        }
        @Override
        public void onError(Throwable t) {
            if (!tryOnError(t)) {
                RxJavaPlugins.onError(t);
            }
        }
        @Override
        public boolean tryOnError(Throwable t) {
            if (t == null) {
                t = new NullPointerException("onError ...");
            }
            if (!isDisposed()) {
                try {
                    observer.onError(t);
                } finally {
                    dispose();
                }
                return true;
            }
            return false;
        }
        @Override
        public void onComplete() {
            if (!isDisposed()) {
                try {
                    observer.onComplete();
                } finally {
                    dispose();
                }
            }
        }
        @Override
        public void setDisposable(Disposable d) {
            DisposableHelper.set(this, d);
        }
        @Override
        public void setCancellable(Cancellable c) {
            setDisposable(new CancellableDisposable(c));
        }
        @Override
        public void dispose() {
            DisposableHelper.dispose(this);
        }
        @Override
        public boolean isDisposed() {
            return DisposableHelper.isDisposed(get());
        }
    }
 | 
 
原来 emitter 的 onNext 内会直接调用 观察者的 onNext, emitter 的 onComplete 内直接调用观察者的 onComplete。这里没有涉及线程的切换,所以所有操作都是在同一个线程进行。CreateEmitter 还有其他几个方法:
- onError(Throwable t) 在 emitter 发出 error 事件时调用
- dispose() 取消这次订阅,在 onNext onError onComplete 中都会先判断是否已经 dispose,如果 dispose 了,就不会回调观察者的对应方法。

总结
在不使用 RxJava 的线程切换功能下分析了被观察者的创建和订阅观察者的流程。 主要涉及了 Observable、ObservableOnSubscribe、ObservableCreate、Observer(接口)、CreateEmitter。当被观察者的 subscribe 调用之后,就会走到 ObservableOnSubscribe 的 subscribe,从而触发 emitter 发送事件,emitter 发送事件之后,又由 emitter 调用观察者的对应事件回调方法。