分享几个网络请求开发中常用的 flow 方法
一、基本任务
1. 倒计时任务
fun countDownFlow(dispatcher: CoroutineDispatcher, // 指定协程运行的调度器scope: CoroutineScope, // 指定作用域total: Int, // 总计时数onTick: (Int) -> Unit, // 倒计时时执行的任务onStart: (() -> Unit)? = null, // 启动倒计时时执行onFinish: (() -> Unit)? = null // 倒计时结束时执行
): Job {return flow {for (i in total downTo 0) {emit(i)if (i != 0) delay(1000)}}.flowOn(dispatcher).onEach { onTick.invoke(it) }.onStart { onStart?.invoke() }.onCompletion { onFinish?.invoke() }.launchIn(scope)
}
一个简单的倒计时任务,很好理解不做解析。适用于需要执行倒计时任务的场景例如闪屏页中倒计时完成后的页面跳转。要注意的一点是生命周期的管理,如果在别处提前执行了倒计时结束的任务的话记得手动取消这个任务,否则就会重复执行。倒计时间隔写死了1秒,可自行修改或者改为参数传入,附一个调用例子:
viewBinding.countDown.setOnClickListener {countDownJob?.let { it.cancel() }startActivity(Intent(this,MainActivity::class.java))finish()
}
countDownJob=countDownFlow(Dispatchers.Main,lifecycleScope,2,onTick = {viewBinding.countDown.text = "${it.plus(1).toString()} S"},onFinish = {startActivity(Intent(this,MainActivity::class.java))finish()}
)
2. 无限循环任务
fun infiniteLoop(scope: CoroutineScope, // 指定作用域dispatcher: CoroutineDispatcher=Dispatchers.IO, // 指定调度器,默认为IO线程intervalMillis: Long, // 循环间隔action: () -> Unit // 循环任务
): Job {return scope.launch(dispatcher) {while (isActive) {action()delay(intervalMillis) // 指定间隔}}
}
二、异步任务
1. 简单请求
在实现方法之前不妨先想一想我们在开发中通常会遇到哪些异步情景,首先想到的肯定就是最简单也是最基本的异步请求任务了,例如:在子线程中执行耗时请求并在请求完成获取结果后通知主线程更新UI,再细致一些还能加上请求之前弹出loading,请求完成后隐藏loading... 那么此时我们就有了一个大致的请求方法结构了:
suspend fun <T> requestFlowResponse1(before: (() -> Unit)? = null, // 前置准备after: (() -> Unit)? = null, // 后置工作request: suspend () -> Response<T>?, // 请求errorHandler: ((Throwable) -> Unit)? = null, // 错误处理timeout: Long = 10 // 指定超时限制
): Flow<Response<T>?> {val flow = flow {// 设置超时val response = withTimeout(timeout * 1000) {request()}if(response!=null){if (!response.isSuccessful()) {throw Throwable(response.errorMsg)}}else{throw Throwable("无法获取数据")}emit(response)}.flowOn(Dispatchers.IO).onStart {Log.e(TAG, "请求数据1")before?.invoke() // 前置准备,一般是弹出加载框}// 请求完成:成功/失败.onCompletion {if(it==null){Log.e(TAG, "请求完成1")after?.invoke()}else{Log.e(TAG, "请求异常1")}}// 捕获异常.catch { e ->Log.e(TAG, "请求异常1")e.printStackTrace()errorHandler?.invoke(e)}return flow
}
在这里我根据接口文档定义了一个响应体对象结构以方便异步方法兼容更多的情景,其余部分还可以根据实际情况自行修改/优化
public class Response<T> {public T data = null;public int code = 0;public String message = "";public boolean isSuccessful(){return code==200;}@Overridepublic String toString() {return "Response{" +"code=" + code +", message='" + message + '\'' +", data=" + data +'}';}
}
2. 复合请求
某些时候我们会遇到稍微复杂一些的情况,例如我们需要请求多个接口之后再根据获取的结果进行下一步处理,这时我们就可以利用 combine 方法把多个请求结合起来
suspend fun <T,R> requestFlowConcurrent1(requests: List<suspend () -> Response<T>?>,requestBefore: (() -> Unit)? = null,requestAfter: (() -> Unit)? = null,requestErrorHandler: ((Throwable) -> Unit)? = null,before: (() -> Unit)? = null,after: (() -> Unit)? = null,errorHandler:((Throwable)->Unit)? = null,parser:(Array<Response<T>?>) -> R?
) : R? {var result:R? = nullval flows = requests.map { request ->requestFlowResponse1(requestBefore, requestAfter, request, requestErrorHandler)}combine(flows){ results ->Log.e(TAG, "执行数据处理2")for ((index, baseResponse) in results.withIndex()) {Log.i(TAG, "$index : $baseResponse" )}parser.invoke(results)}.flowOn(Dispatchers.IO).onStart {Log.e(TAG, "任务启动2")before?.invoke()}.onCompletion {if(it==null){Log.e(TAG, "任务完成2")after?.invoke()}else{Log.e(TAG, "任务异常2")}}.flowOn(Dispatchers.Main).catch {Log.e(TAG, "任务异常2")errorHandler?.invoke(it)}.collect {Log.e(TAG, "输出结果2: ${it.toString()}", )result = it}return result
}
3. 简单多任务
再次扩展一下,让我们把他改造为不仅仅局限于网络请求而是适用于其他所有的耗时任务的方法,例如:我需要先从网络请求获取某个数据然后再从数据库中取出某个数据最后再把做运算然后切回主线程更新UI。这时因为从数据库或是其他方式取出来的对象不符合之前定义 Response 对象所以原本的简单请求方法也就不适用了,我们需要稍微改造一下:
suspend fun <T:Any?> requestFlowResponse2(before: (() -> Unit)? = null, // 前置准备after: (() -> Unit)? = null, // 后置工作request: suspend () -> T, // 请求errorHandler: ((Throwable) -> Unit)? = null, // 错误处理timeout: Long = 10
): Flow<T> {val flow = flow {// 设置超时val response = withTimeout(timeout * 1000) {request()}if(response!=null){emit(response)}else{throw Exception("无法获取数据")}}.flowOn(Dispatchers.IO).onStart {Log.e(TAG, "请求数据3")before?.invoke() // 前置准备,一般是弹出加载框}// 请求完成:成功/失败.onCompletion {if(it==null){Log.e(TAG, "请求成功3")after?.invoke()}else{Log.e(TAG, "请求异常3")}}.flowOn(Dispatchers.Main)// 捕获异常.catch { e ->Log.e(TAG, "请求错误3")e.printStackTrace()errorHandler?.invoke(e)}return flow
}
一般情况下三个耗时任务已经能够满足大部分的使用情景了
suspend fun <L:Any?, X:Any?, T:Any?, R> requestFlowConcurrent2(request1: suspend () -> L,request2: (suspend () -> X?)?=null,request3: (suspend () -> T?)?=null,requestBefore: (() -> Unit)? = null,requestAfter: (() -> Unit)? = null,requestErrorHandler: ((Throwable) -> Unit)? = null,before: (() -> Unit)? = null,after: (() -> Unit)? = null,errorHandler:((Throwable)->Unit)? = null,parser: (L, X?, T?)->R?
):R? {var result:R? = nullval flow1 = requestFlowResponse2(requestBefore, requestAfter, request1, requestErrorHandler)val flow2 = if(request2!=null){requestFlowResponse2(requestBefore, requestAfter, request2, requestErrorHandler)}else{flow { emit(null) }}val flow3 = if(request3!=null){requestFlowResponse2(requestBefore, requestAfter, request3, requestErrorHandler)}else{flow { emit(null) }}combine(flow1,flow2,flow3){ l,x,t ->Log.i(TAG, "执行数据处理4\nl:$l \nx:$x \nt:$t" )parser.invoke(l,x,t)}.flowOn(Dispatchers.IO).onStart {Log.e(TAG, "任务启动4")before?.invoke()}.onCompletion {if(it==null){Log.e(TAG, "任务完成4")after?.invoke()}else{Log.e(TAG, "任务异常4")}}.flowOn(Dispatchers.Main).catch {Log.e(TAG, "任务异常4")errorHandler?.invoke(it)}.collect {Log.e(TAG, "输出结果4: ${it.toString()}", )result = it}return result
}
模拟使用例子:
从本地数据库获取用户列表,从网络请求获取热键列表,再将用户数量和热键的数量相加输出最终结果
viewModelScope.launch {val data = requestFlowConcurrent4<MutableList<User>?, BaseResponse<MutableList<Hotkey>>,Unit,Int>(request1 = {userService.getLocalUsers()},request2 = {userService.getHotkey()},request3 = null,requestBefore = {loge("请求开始")},requestAfter = {loge("请求完成")},requestErrorHandler = {loge("请求失败:${it.message}")},before = {loge("任务启动")},after = {loge("任务完成")},errorHandler = {loge("任务失败:${it.message}")},parser = { l,x,t ->var a = 0;var b = 0;l?.let { a=it.size }x?.let {if(it.isSuccessful){b=it.data.size}}return@requestFlowConcurrent4 a+b})data?.let { total.postValue(it) }}