定義
協程基於線程,是輕量級的線程
作用
- 處理耗時任務,這種任務常常會阻塞主線程
- 保證主線程安全,即確保安全地從主線程調用任何suspend函數
特點
- 讓異步邏輯同步化
- 最核心的點就是,函數或者一段程序能夠被掛起,稍後再在掛起得位置恢復
掛起函數
- 使用suspend關鍵字修飾的函數
- 掛起函數只能在協程體內或其他掛起函數內調用
掛起和阻塞的區別
- 掛起不會阻塞主線程,主線程可以正常刷新UI,但阻塞就會導致主線程ANR
協程調度器
- Dispatchers.Main:主線程上處理UI交互相關,更新LiveData
- Dispatchers.IO:非主線程,磁盤讀寫和網絡IO
- Dispatchers.Default:非主線程,CPU密集型任務,排序,JSON數據解析等
任務泄漏
- 當某個協程任務丟失,無法追蹤,會導致內存、CPU、磁盤等資源浪費,甚至發送一個無用的網絡請求,這種稱為任務泄漏
- 為了避免,引入了結構化併發機制
結構化併發
- 可以取消任務、追蹤任務、協程失敗時發出錯誤信號
協程作用域CoroutineScope
- 可以追蹤所有協程,也可以取消協程
- GlobalScope:生命週期是Process級別,即使Activity或Fragment已經被銷燬,協程仍然運行
- MainScope:在activity中使用,可以在onDestroy中取消協程
- ViewModelScope:只能在ViewModel中使用,綁定ViewModel生命週期
- lifecycleScope:只能在Activity、Fragment中使用,會綁定Activity、Fragment的生命週期
協程構建器
launch和async構建器都用來啓動新協程
- launch,返回一個Job並且不附帶任何結果
- async,返回一個Deferred,Deferred也是一個Job,可以使用.await()在一個延期的值上得到最終的結果
- launch 是非阻塞的 而 runBlocking 是阻塞的。多個 withContext 任務是串行的, 且withContext 可直接返回耗時任務的結果。 多個 async 任務是並行的,async 返回的是一個Deferred<T>,需要調用其await()方法獲取結果
- runBlocking一般用在測試中,會阻塞當前線程,會等到包裹的子協程都執行完畢才退出
- 事實上await()也不一定導致協程會被掛起,await() 只有在 async 未執行完成返回結果時,才會掛起協程。若 async 已經有結果了,await() 則直接獲取其結果並賦值給變量,此時不會掛起協程
| 構建器 | 是否立即啓動? | 串行?並行? | 是否阻塞當前線程? | 返回結果 |
|---|---|---|---|---|
| launch | 是 | 根據包裹的子協程類型而定 | 否 | Job對象 |
| async | 是 | 任務之間是並行 | 否 | Deferred,可以用await()方法獲取結果 |
| runBlocking | 是 | 根據包裹的子協程類型而定 | 阻塞 | 子協程都執行完畢後才退出 |
| withContext | 不是 | 任務之間是串行 | 否 | 可以直接返回耗時任務結果,協程體最後一行內容 |
doAsync和async
- doAsync 的源碼它的實現都是基於Java的 Future 類進行異步處理和通過Handler進行線程切換 ,從而封裝的一個擴展函數方便線程切換。
- 與 async 關係不大,因為 doAsync並沒有用到協程庫中的東西
- 可以通過 uiThread { } 來切換會主線程
btn.setOnClickListener {
doAsync {
Log.e("TAG", " doAsync... [當前線程為:${Thread.currentThread().name}]")
uiThread {
Log.e("TAG", " uiThread.... [當前線程為:${Thread.currentThread().name}]")
}
}
}
Job對象的生命週期
- 每一個通過launch或者async創建的協程,都會返回一個Job實例,該實例時協程的唯一標識,負責管理協程的生命週期
- 一個任務包含一系列狀態:新創建(New)、活躍(Active)、完成中(Completing)、已完成(Completed)、取消中(Canceling)和已取消(Cancelled)。我們無法直接訪問這些狀態,可以通過訪問Job的屬性:isActive、isCancelled和isCompleted
- 如果協程處於活躍狀態,協程運行出錯或是調用job.cancel(),都會將當前任務置為取消中(Cancelling)狀態(isActive=false,isCancelled=true)。當所有子協程都完成後,協程會進入已取消(Cancelled)狀態,此時isCompleted=true
- 協程完成,可能是正常完成,也可能是被取消了
等待一個作業
由launch啓動的協程用join()方法;用async啓動的協程用await()
@Test
fun `test coroutine join`() = runBlocking {
val job1 = launch {
delay(200)
println("job1 finished")
}
//這樣可以確保job1執行完再執行後面的job2和job3
job1.join()
val job2 = launch {
delay(200)
println("job2 finished")
//返回結果
"job2 result"
}
val job3 = launch {
delay(200)
println("job3 finished")
//返回結果
"job2 result"
}
}
組合併發
@Test
fun `test async`() = runBlocking {
val time = measureTimeMillis {
val one = doOne()
val two = doTwo()
//輸出是30
println("result: ${one + two}")
}
//輸出是2秒多,也就是是串行的
println(time)
}
//併發
@Test
fun `test combine async`() = runBlocking {
val time = measureTimeMillis {
val one = async { doOne() }
val two = async { doTwo() }
//輸出是30
println("result: ${one.await() + two.await()}")
}
//輸出是1秒多,也就是是並行的
println(time)
}
private suspend fun doOne(): Int{
delay(1000)
return 10
}
private suspend fun doTwo(): Int{
delay(1000)
return 20
}
注意async的寫法不能是:
val one = async { doOne() }.await()
val two = async { doTwo() }.await()
這樣起不到併發效果,而是等到one執行完,再執行two
協程的啓動模式
- DEFAULT:協程創建後,立即開始調度,在調度前如果協程被取消,其將直接進去取消響應狀態
- ATOMIC:協程創建後,立即開始調度,協程執行到第一個掛起點之前不響應取消
需要注意的是,立即調度不等於立即執行
- LAZY:只有協程被需要時,包括主動調用協程的start、join或者await等函數時才會開始調度,如果調度前就被取消,那麼該協程將直接進入異常結束狀態
@Test
fun `test start mode`() = runBlocking {
val job = async(start = CoroutineStart.LAZY) {
//
}
//...其他代碼
//啓動協程
job.await()
}
- UNDISPATCHED:協程創建後立即在當前函數調用棧中執行,直到遇到第一個真正的掛起點
@Test
fun `test start mode`() = runBlocking {
val job = async(context = Dispatchers.IO, start = CoroutineStart.UNDISPATCHED) {
println("thread:"+ Thread.currentThread().name)
}
}
//上面輸出的線程名字是主線程,因為UNDISPATCHED會立即在當前線程中執行,而runBlocking是在主線程中
協程作用域構建器 coroutineScope、runBlocking、supervisorScope
- runBlocking是常規函數,會阻塞當前線程;coroutineScope是掛起函數,不會阻塞當前線程
- 它們都會等待協程體以及所有子協程結束,一個是阻塞線程等待,一個是掛起等待
協程作用域構建器 coroutineScope、supervisorScope
- coroutineScope,一個協程失敗了,所有其他兄弟協程也會被取消
- supervisorScope,一個子協程失敗了,不會影響其他兄弟協程,但如果是作用域有異常失敗了,則所有的子協程都會失敗退出
coroutineScope和CoroutineScope
- coroutineScope是一個掛起函數,是協程作用域構建器,CoroutineScope()是一個普通函數
- coroutineScope後面的協程作用域的協程上下文是繼承父協程作用域的上下文
- CoroutineScope()有自己的作用域上下文
- 都能夠進行解構化併發,可以很好的管理多個子協程
協程的取消
- 取消作用域會取消它的子協程
- 被取消的子協程不會影響其餘兄弟協程
- 協程通過拋出一個特殊的異常CancellationException來處理取消操作
- 所有kotlinx.coroutines中的掛起函數(withContext、delay等)都是可取消的
- CPU密集型任務無法直接用cancel來取消
CPU密集型任務的取消
- 通過isActive來判斷取消,因為取消的任務isActive為false
- 通過ensureActive()來取消,如果被取消,任務isActive為false,會拋一個異常
- yield函數會檢查所在協程的狀態,如果已經取消,則拋出CancellationException予以響應。此外,它還會嘗試出讓線程的執行權,給其他協程提供執行的機會
協程取消的副作用
- 在finally中釋放資源
@Test
fun `test release resources`() = runBlocking {
var br = BufferedReader(FileReader("xxx"))
with(br){
var line:String?
try {
while (true){
line = readLine() ?: break
println(line)
}
}finally {
//關閉資源
close()
}
}
}
- 用use函數:該函數只能被實現了Closeable的對象使用,程序結束的時候會自動調用close方法,適合文件對象
//use函數在文件使用完畢後會自動調用close函數
BufferedReader(FileReader("xxx")).use {
var line:String?
while (true){
line = readLine() ?: break
println(line)
}
}
不能取消的任務
協程被取消後,finally裏面還有掛起函數,可以用withContext(NonCancellable)
@Test
fun `test cancel with noncancellable`() = runBlocking {
val job = launch {
try {
repeat(1000){
println("job: i'm sleeping $it")
delay(500L)
}
}finally {
//不用withContext(NonCancellable),delay後面的打印不會執行
withContext(NonCancellable){
println("running finally")
delay(1000L)
println("job: noncancellable")
}
}
}
delay(1300)
println("main: waiting")
job.cancelAndJoin()
println("main: i can quit")
}
超時任務
withTimeout()方法可以開啓超時任務,默認超時會拋出異常
/*
* 超時任務
* */
@Test
fun `test deal with timeout`() = runBlocking {
withTimeout(1300){
repeat(1000){
println("job: sleeping $it")
delay(500L)
}
}
}
如果不想拋出異常,可以用withTimeoutOrNull
/*
* 超時任務,超時會返回null,不超時返回最後的done
* */
@Test
fun `test deal with timeout ornull`() = runBlocking {
val result = withTimeoutOrNull(1300){
repeat(1000){
println("job: sleeping $it")
delay(500L)
}
"done"
}
println("result: $result")
}
歡迎關注我的微信公眾號,和我一起每天進步一點點!
-