最近使用 Android Jetpack 中 WorkManager 组件做了一个上报一些做分析用途数据的需求, 用着感觉挺香的. 于是想看下其内部的实现原理.
背景
使用 WorkManager 的原因在于 Android 系统在每个版本上都对 App 运行后台任务加了限制, 不同版本有不同的限制, 其目的都是为了节省用户设备的电量消耗. 参考: https://developer.android.com/guide/background .
WorkManager 的功能
https://developer.android.com/topic/libraries/architecture/workmanager
-
当用户设备满足一些条件的情况下才执行的任务, 比如用户手机使用的是无线网络,电量充足,储存空间充足的情况下才执行某些任务.
-
向后兼容到 API 14
- API 23+ 使用 JobScheduler
- API 14-22 结合 BroadcastReceiver 和 AlarmManager
-
可以为任务添加约束, 例如无线网络可用或充电状态下才执行
-
可以提交一次性或定期任务
-
可以监视和管理提交的任务
-
可以将多个任务链接在一起(并行、串行、组合)
-
即使应用或设备重启也能保证任务的执行
-
类似 Doze 模式一样节电
-
运行在非主线程
简单例子
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
|
// 实现 Worker
class UploadWorker(appContext: Context, workerParams: WorkerParameters) : Worker(appContext, workerParams) {
override fun doWork(): Result {
// 这里运行在非主线程
uploadImages()
// 返回任务执行成功、失败、重试、取消
return Result.success()
}
}
// 创建约束条件
val constraints = Constraints.Builder()
.setRequiresBatteryNotLow(true) // 电量不低
.setRequiredNetworkType(NetworkType.CONNECTED) // 连接了网络
.setRequiresCharging(true) // 充电中
.setRequiresStorageNotLow(true) // 储存空间不低
.setRequiresDeviceIdle(true) // 设备空闲中
.build()
// 定义输入数据
// 输入数据通过 key value 匹配, key 为 String, value 为基本数据类型和 String
val imageData = workDataOf(Constants.KEY_IMAGE_URI to imageUriString)
// 创建请求
val request = OneTimeWorkRequestBuilder<UploadWorker>()
.setInputData(imageData) // 输入数据
.setConstraints(constraints) // 约束条件
.setBackoffCriteria( // 重试任务时的策略
BackoffPolicy.LINEAR,
OneTimeWorkRequest.MIN_BACKOFF_MILLIS,
TimeUnit.MILLISECONDS)
.build()
//提交任务
WorkManager.getInstance(context).enque(request);
|
实现原理
在实现原理上, 这里先提出几个问题:
- 如何确保任务一定会被执行, 即使在应用重启或手机重启之后
- 如何确保是在非主线程运行的
- 如何去匹配我们创建请求时提交的约束的
- 如何监控和管理任务
整体流程
从上图中可以看出一个 WorkRequest 被提交之后的流程:
- WorkRequest 的信息会经 Internal TaskExecutor 储存到数据库
- 当满足约束条件时, WorkFactory 从数据库拿出 WorkRequest 的信息构造出 Worker, 然后在 Executor 中执行 Worker 的
doWork
方法.
有几个组件:
- WorkRequest: 一个接口, 定义了 Worker 的相关信息都在这个里面, 有两个实现类
OneTimeWorkRequest
和 PeriodicWorkRequest
分别对应一次性任务和周期任务
- Internal TaskExecutor: WorkManager 内部的线程池, 用来执行将提交的 WorkRequest 储存到数据库的动作
- WorkerFactory: 根据 WorkRequest 里的信息创建 Worker 实例的工厂类
- Worker: 我们自己实现的 Woker, 例子中的
UploadWorker
- Executor: 执行 Worker
doWork
方法的线程池, 默认是调用 Executors.newFixedThreadPool
创建的线程池, 也可以自己配置这个线程池
WorkManager 的初始化
WorkManger 将会是一个单例对象, 但是从例子代码中没有看的调用初始化的地方. 通过文档可知 WorkManager 有两种初始化方式:
- 应用启动之后, 它自己自动初始化
- 按需初始化, 到了需要用到的地方才初始化. 可以避免初始化影响应用的启动速度
自动初始化
利用 ContentProvider, ContentProvider 会在应用启动之后调用 onCreate
方法, 所以自动初始化就是在 onCreate
中执行 WorkManager 的初始化
1
2
3
4
5
6
7
8
|
public class WorkManagerInitializer extends ContentProvider {
@Override
public boolean onCreate() {
// Initialize WorkManager with the default configuration.
WorkManager.initialize(getContext(), new Configuration.Builder().build());
return true;
}
}
|
按需初始化
因为 WorkManagerInitializer 这个 ContentProvider 会被 WorkManager 库注册到应用的 manifest.xml 中, 所以如果要使用按需初始化, 需要主动移除这个 ContentProvider 并让 Application 实现 Configuration.Provider 接口:
1
2
3
4
5
|
<provider
android:name="androidx.work.impl.WorkManagerInitializer"
android:authorities="${applicationId}.workmanager-init"
tools:node="remove"
android:exported="false" />
|
1
2
3
4
5
6
7
|
public final class MApp extends Application implements Configuration.Provider {
@Override
public Configuration getWorkManagerConfiguration() {
return new Configuration.Builder().build();
}
}
|
这样之后, 就不会在应用启动时调用 WorkManagerInitializer 的 onCreate 了. 但是每次获取实例就需要使用 getInstance(Context ctx)
带参数的那个, 因为要在 getInstance
里做初始化.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
public static @NonNull WorkManagerImpl getInstance(@NonNull Context context) {
synchronized (sLock) {
WorkManagerImpl instance = getInstance();
if (instance == null) {
Context appContext = context.getApplicationContext();
if (appContext instanceof Configuration.Provider) {
initialize(appContext, ((Configuration.Provider) appContext).getWorkManagerConfiguration());
instance = getInstance(appContext);
} else {
}
}
return instance;
}
}
|
初始化
初始化工作的 initialize
方法里:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
public static void initialize(@NonNull Context context, @NonNull Configuration configuration) {
synchronized (sLock) {
if (sDelegatedInstance == null) {
context = context.getApplicationContext();
if (sDefaultInstance == null) {
// 创建单例
sDefaultInstance = new WorkManagerImpl(
context,
configuration,
new WorkManagerTaskExecutor(configuration.getTaskExecutor()));
}
sDelegatedInstance = sDefaultInstance;
}
}
}
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
public WorkManagerImpl(
@NonNull Context context,
@NonNull Configuration configuration,
@NonNull TaskExecutor workTaskExecutor,
boolean useTestDatabase) {
Context applicationContext = context.getApplicationContext();
// 创建数据库
WorkDatabase database = WorkDatabase.create(applicationContext, configuration.getTaskExecutor(), useTestDatabase);
Logger.setLogger(new Logger.LogcatLogger(configuration.getMinimumLoggingLevel()));
// 创建 schedulers
List<Scheduler> schedulers = createSchedulers(applicationContext, workTaskExecutor);
// 创建 processor
Processor processor = new Processor(
context,
configuration,
workTaskExecutor,
database,
schedulers);
internalInit(context, configuration, workTaskExecutor, database, schedulers, processor);
}
|
创建任务
通过例子中的代码能看到最终组装出来的是一个 WorkRequest 对象, 那么这个对象其实就是定义了我们要执行的任务信息, 每个组装信息都能通过 Builder 构建.
WorkRequest
WorkRequest 的 Buidler 是一个抽象类, 一次性和周期任务分别实现了自己的 Buidler.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
// WorkRequest.Builder
public abstract static class Builder<B extends Builder, W extends WorkRequest> {
boolean mBackoffCriteriaSet = false;
UUID mId;
WorkSpec mWorkSpec;
Set<String> mTags = new HashSet<>();
Builder(@NonNull Class<? extends ListenableWorker> workerClass) {
mId = UUID.randomUUID(); // 分配一个 uuid
mWorkSpec = new WorkSpec(mId.toString(), workerClass.getName()); // 创建 WorkSpec
addTag(workerClass.getName()); // 传入的 Worker 的 className 作为一个 tag
}
public final W build() {
W returnValue = buildInternal(); // 子类实现 buildInternal 方法
// Create a new id and WorkSpec so this WorkRequest.Builder can be used multiple times.
mId = UUID.randomUUID();
mWorkSpec = new WorkSpec(mWorkSpec);
mWorkSpec.id = mId.toString();
return returnValue;
}
}
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
// OneTimeWorkRequest.Builder
public static final class Builder extends WorkRequest.Builder<Builder, OneTimeWorkRequest> {
public Builder(@NonNull Class<? extends ListenableWorker> workerClass) {
super(workerClass);
// 为 inputMergerClassName 单独赋个值
mWorkSpec.inputMergerClassName = OverwritingInputMerger.class.getName();
}
@Override
OneTimeWorkRequest buildInternal() {
if (mBackoffCriteriaSet
&& Build.VERSION.SDK_INT >= 23
&& mWorkSpec.constraints.requiresDeviceIdle()) {
throw new IllegalArgumentException("Cannot set backoff criteria on an idle mode job");
}
return new OneTimeWorkRequest(this);
}
}
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
// PeriodicWorkRequest.Builder
public static final class Builder extends WorkRequest.Builder<Builder, PeriodicWorkRequest> {
public Builder(
@NonNull Class<? extends ListenableWorker> workerClass,
long repeatInterval,
@NonNull TimeUnit repeatIntervalTimeUnit) {
super(workerClass);
// 周期任务的间隔执行时间
mWorkSpec.setPeriodic(repeatIntervalTimeUnit.toMillis(repeatInterval));
}
PeriodicWorkRequest buildInternal() {
if (mBackoffCriteriaSet
&& Build.VERSION.SDK_INT >= 23
&& mWorkSpec.constraints.requiresDeviceIdle()) {
throw new IllegalArgumentException("Cannot set backoff criteria on an idle mode job");
}
return new PeriodicWorkRequest(this);
}
}
|
WorkSpec
查看 WorkSpec 的定义能发现其利用 Room 实现的数据库字段和类字段的映射
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
public class WorkSpec {
@ColumnInfo(name = "id") @PrimaryKey @NonNull public String id;
@ColumnInfo(name = "state") @NonNull public WorkInfo.State state = ENQUEUED;
@ColumnInfo(name = "worker_class_name") @NonNull public String workerClassName;
@ColumnInfo(name = "input_merger_class_name") public String inputMergerClassName;
@ColumnInfo(name = "input") @NonNull public Data input = Data.EMPTY;
@ColumnInfo(name = "output") @NonNull public Data output = Data.EMPTY;
@ColumnInfo(name = "initial_delay") public long initialDelay;
@ColumnInfo(name = "interval_duration") public long intervalDuration;
@ColumnInfo(name = "flex_duration") public long flexDuration;
@Embedded @NonNull public Constraints constraints = Constraints.NONE;
@ColumnInfo(name = "run_attempt_count") @IntRange(from = 0) public int runAttemptCount;
@ColumnInfo(name = "backoff_policy") @NonNull public BackoffPolicy backoffPolicy = BackoffPolicy.EXPONENTIAL;
@ColumnInfo(name = "backoff_delay_duration") public long backoffDelayDuration = WorkRequest.DEFAULT_BACKOFF_DELAY_MILLIS;
}
|
在创建任务的时候, 每个任务会:
- 分配一个 uuid
- 创建一个 WorkSpec
- 添加 Worker 的类名为 tag
- 一次性任务设置 inputMergerClassName
- 周期任务设置周期执行的间隔时间
提交任务
看下第一个问题: 如何确保任务一定会被执行, 即使在应用重启或手机重启之后.
在应用重启或手机重启之后依然能执行提交的任务, 那说明任务肯定需要存储到磁盘的. 看流程图中也能看出有数据库的参与, 带着这个猜想看下源码. 任务的提交从调用 enqueue
开始:
1
2
|
WorkManager.getInstance(context).enqueue(request);
// getInstance 返回的是 WorkManagerImpl 单列对象
|
1
2
3
4
5
6
7
8
|
//WorkManager#enqueue(WorkRequest)
public final Operation enqueue(@NonNull WorkRequest workRequest) {
// 单个 workReeust 会被装到 List 中, 后续方法都接收 List<WorkReuest>
return enqueue(Collections.singletonList(workRequest));
}
//最终调用到抽象方法 enqueue, 也就是调用 WorkManagerImpl 的实现
public abstract Operation enqueue(@NonNull List<? extends WorkRequest> requests);
|
1
2
3
4
5
6
7
8
|
//WorkManagerImpl#enqueue
public Operation enqueue(List<? extends WorkRequest> workRequests) {
if (workRequests.isEmpty()) {
throw new IllegalArgumentException("enqueue needs at least one WorkRequest.");
}
// 后续工作交给 WorkContinuationImpl
return new WorkContinuationImpl(this, workRequests).enqueue();
}
|
1
2
3
4
5
6
7
8
9
10
11
12
13
|
//WorkContinuationImpl#enqueue
public Operation enqueue() {
// 防止重复提交, mEnqueued 会在 EnqueueRunnable 中被标记为 true
if (!mEnqueued) {
EnqueueRunnable runnable = new EnqueueRunnable(this);
// EnqueueRunnable 被提交到线程池执行
mWorkManagerImpl.getWorkTaskExecutor().executeOnBackgroundThread(runnable);
mOperation = runnable.getOperation();
} else {
Logger.get().warning(TAG, String.format("Already enqueued work ids (%s)", TextUtils.join(", ", mIds)));
}
return mOperation;
}
|
EnqueueRunnable
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
//EnqueueRunnable#run
public void run() {
try {
// 判断 Worker 是否有循环引用
if (mWorkContinuation.hasCycles()) {
throw new IllegalStateException(String.format("WorkContinuation has cycles (%s)", mWorkContinuation));
}
// 储存到数据库, 并返回是否需要执行
boolean needsScheduling = addToDatabase();
if (needsScheduling) {
// 需要执行:
// 启用 RescheduleReceiver: 一个广播接收器
final Context context = mWorkContinuation.getWorkManagerImpl().getApplicationContext();
PackageManagerHelper.setComponentEnabled(context, RescheduleReceiver.class, true);
// 在后台执行 work
scheduleWorkInBackground();
}
mOperation.setState(Operation.SUCCESS);
} catch (Throwable exception) {
mOperation.setState(new Operation.State.FAILURE(exception));
}
}
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
//EnqueueRunnable#addToDatabase
public boolean addToDatabase() {
WorkManagerImpl workManagerImpl = mWorkContinuation.getWorkManagerImpl();
WorkDatabase workDatabase = workManagerImpl.getWorkDatabase();
// 开始一个事务
workDatabase.beginTransaction();
try {
boolean needsScheduling = processContinuation(mWorkContinuation);
workDatabase.setTransactionSuccessful();
return needsScheduling;
} finally {
// 关闭事务
workDatabase.endTransaction();
}
}
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
|
// addToDatabase 经过几个调用
// EnqueueRunnable#processContinuation
// EnqueueRunnable#enqueueContinuation
// 最终会走到: EnqueueRunnable#enqueueWorkWithPrerequisites
private static boolean enqueueWorkWithPrerequisites(
WorkManagerImpl workManagerImpl,
@NonNull List<? extends WorkRequest> workList,
String[] prerequisiteIds,
String name,
ExistingWorkPolicy existingWorkPolicy) {
boolean needsScheduling = false;
// prerequisiteIds 这里会为空, 与 WorkContinuation 的构造函数的 parent 参数有关
//...
for (WorkRequest work : workList) {
// 获取 WorkRequest 中的 WorkSpec
WorkSpec workSpec = work.getWorkSpec();
if (hasPrerequisite && !hasCompletedAllPrerequisites) {
//...
} else {
if (!workSpec.isPeriodic()) {
// 如果是周期任务, 则将周期执行的开始时间设为当前时间
workSpec.periodStartTime = currentTimeMillis;
} else {
workSpec.periodStartTime = 0L;
}
}
// 在 Api 23~35 将带有电量和存储空间约束的 Worker 代理给 ConstraintTrackingWorker
if (Build.VERSION.SDK_INT >= WorkManagerImpl.MIN_JOB_SCHEDULER_API_LEVEL
&& Build.VERSION.SDK_INT <= 25) {
tryDelegateConstrainedWorkSpec(workSpec);
} else if (Build.VERSION.SDK_INT <= WorkManagerImpl.MAX_PRE_JOB_SCHEDULER_API_LEVEL
&& usesScheduler(workManagerImpl, Schedulers.GCM_SCHEDULER)) {
tryDelegateConstrainedWorkSpec(workSpec);
}
if (workSpec.state == ENQUEUED) {
needsScheduling = true;
}
// 将 WorkSpace 插入到 WorkSpec 表
workDatabase.workSpecDao().insertWorkSpec(workSpec);
for (String tag : work.getTags()) {
// 将 tag 和 worker 的 uuid 绑定, 插入到 WorkTag 表
workDatabase.workTagDao().insert(new WorkTag(tag, work.getStringId()));
}
if (isNamed) {
// 如果是命名 Worker, 将 name 和 worker 的 uuid 绑定, 插入到 WorkName 表
workDatabase.workNameDao().insert(new WorkName(name, work.getStringId()));
}
}
return needsScheduling;
}
|
整体流程图
执行任务
//TODO