博客 / 詳情

返回

Kotlin之Channel實戰(3)

  • 通道

    • 認識Channel
    • 容量與迭代
    • produce與actor
    • Channel的關閉
    • BroadcastChannel
  • 多路複用

    • 什麼是多路複用
    • 複用多個await
    • 複用多個Channel
    • SelectClause
    • Flow實現多路複用
  • 併發安全

    • 協程的併發工具
    • Mutex
    • Semaphore

    認識Channel

    Channel實際上是一個併發安全的隊列,它可以用來連接協程,實現不同協程的通信。
    image.png
    看個生產者消費者的例子:

      @Test
      fun testChannel() = runBlocking {
          val channel = Channel<Int>()
          // 生產者
          val producer = GlobalScope.launch {
              var i = 1
              while (true) {
                  delay(1000)
                  println("sending $i")
                  channel.send(i++)
              }
          }
          // 消費者
          val consumer = GlobalScope.launch {
              while (true) {
                  val element = channel.receive()
                  println("receive: $element")
              }
          }
          joinAll(producer, consumer)
      }

    生產者每隔一秒生產一個元素,然後立刻被消費者消費掉。

Channel的容量

Channel實際上就是一個隊列,隊列中一定存在緩衝區,那麼一旦這個緩衝區滿了,並且也一直沒有人調用receive並取走元素,send就需要掛起。若故意讓接收端的節奏放慢,發現send總是會掛起,知道receive之後才會繼續往下執行。

public fun <E> Channel(
    capacity: Int = RENDEZVOUS,
    onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND,
    onUndeliveredElement: ((E) -> Unit)? = null
): Channel<E>

Channel 默認有一個容量大小RENDEZVOUS,值為0。上面的例子如果調慢消費者的節奏,那麼就會依照消費者的節奏,每當消費者消費一個元素,生產者才會生產一個,send總是會掛起,等待消費者消費。

    @Test
    fun testChannel() = runBlocking {
        val channel = Channel<Int>()
        val start = System.currentTimeMillis()
        // 生產者
        val producer = GlobalScope.launch {
            var i = 1
            while (true) {
                delay(1000)
                println("sending $i, ${System.currentTimeMillis() - start}")
                channel.send(i++)
            }
        }
        // 消費者
        val consumer = GlobalScope.launch {
            while (true) {
                delay(5000)
                val element = channel.receive()
                println("receive: $element, ${System.currentTimeMillis() - start}")
            }
        }
        joinAll(producer, consumer)
    }
sending 1, 1018
receive: 1, 5017
sending 2, 6022
receive: 2, 10021
sending 3, 11024
receive: 3, 15026
...

由於緩衝區默認為0,所以生產者每次不得不等待消費者消費掉元素後再生產。

迭代Channel

Channel本身像序列,在讀取的時候,可以直接獲取一個Channel的iterator。

    @Test
    fun testChannelIterator() = runBlocking {
//        val channel = Channel<Int>()
        val channel = Channel<Int>(Channel.UNLIMITED)
        val start = System.currentTimeMillis()
        // 生產者
        val producer = GlobalScope.launch {
            for (i in 1..5) {
                println("sending $i, ${System.currentTimeMillis() - start}")
                channel.send(i)
            }
        }
        // 消費者
        val consumer = GlobalScope.launch {
            val it = channel.iterator()
            while (it.hasNext()) {
                val element = it.next()
                println("receive: $element, ${System.currentTimeMillis() - start}")
                delay(2000)
            }
        }
        joinAll(producer, consumer)
    }
sending 1, 8
sending 2, 12
sending 3, 12
sending 4, 12
sending 5, 12
receive: 1, 15
receive: 2, 2023
receive: 3, 4026
receive: 4, 6031
receive: 5, 8037

上面就是迭代的方法,把緩衝區設置成UNLIMITED,看到生產者一下子把5個元素生產完發送出來,消費者一個一個按照自己的節奏消費。如果緩衝區還是默認,那麼和上一個例子一樣,還是消費一個後再生產一個。

produce與actor

  • 構造生產者與消費者的便捷方法
  • 可以通過produce方法啓動一個生產者協程,並返回一個ReceiveChannel,其他協程就可以用這個Channel來接收數據了。反過來,可以用actor啓動一個消費者協程。
    看個例子,使用produce創建一個receiveChannel,然後啓動一個協程消費receiveChannel中的元素。

      @Test
      fun testProducer() = runBlocking {
          val receiveChannel = GlobalScope.produce(capacity = 50) {
              repeat(5) {
                  delay(1000)
                  println("produce $it")
                  send(it)
              }
          }
          val consumer = GlobalScope.launch {
              for (i in receiveChannel) {
                  delay(3000)
                  println("consume: $i")
              }
          }
          consumer.join()
      }
    produce 0
    produce 1
    produce 2
    consume: 0
    produce 3
    produce 4
    consume: 1
    consume: 2
    consume: 3
    consume: 4
    
    Process finished with exit code 0

    produce的源碼如下,容量默認為0。所以在上面例子中創建receiveChannel的時候不設置容量,那麼就會變成:生產一個元素,消費一個元素,交替進行。設置了50個容量後,可以一下子產生多個元素。當然,該例子的消費者消費元素的時間是delay 3秒鐘,所以每次delay3秒的期間,生產者(模擬每秒鐘生產1個元素)生產了3個元素。

    public fun <E> CoroutineScope.produce(
      context: CoroutineContext = EmptyCoroutineContext,
      capacity: Int = 0,
      @BuilderInference block: suspend ProducerScope<E>.() -> Unit
    ): ReceiveChannel<E>

    再來看看actor:

    public fun <E> CoroutineScope.actor(
      context: CoroutineContext = EmptyCoroutineContext,
      capacity: Int = 0, // todo: Maybe Channel.DEFAULT here?
      start: CoroutineStart = CoroutineStart.DEFAULT,
      onCompletion: CompletionHandler? = null,
      block: suspend ActorScope<E>.() -> Unit
    ): SendChannel<E> 

    用actor可以創建一個sendChannel,然後啓動協程使用sendChannel發送元素。例子:

      @Test
      fun testActor() = runBlocking<Unit> {
          val sendChannel = GlobalScope.actor<Int> {
              while (true) {
                  val element = receive()
                  println("receive: $element")
              }
          }
    
          val producer = GlobalScope.launch {
              for (i in 1..3) {
                  println("send: $i")
                  sendChannel.send(i)
              }
          }
    
          producer.join()
      }

    Channel的關閉

  • produce和actor返回的channel都會隨着對應的協程執行完畢而關閉,也正是如此,Channel才被稱為熱數據流
  • 對於一個Channel,如果調用了它的close方法,會立即停止接收新元素,也就是説,這時它的isClosedForSend會立即返回true。而由於Channel緩衝區的存在,這時候可能還有一些元素沒有被處理完,因此要等緩衝區中所有的元素被讀取之後isClosedForReceive才會返回true。
  • Channel的生命週期最好由主導方來維護,建議由主導的一方實現關閉

      @Test
      fun testClose() = runBlocking<Unit> {
          val channel = Channel<Int>(3)
          val producer = GlobalScope.launch {
              List(3) {
                  channel.send(it)
                  println("send $it")
              }
              channel.close()
              println("close channel. closeForSend: ${channel.isClosedForSend}, closeFoReceive: ${channel.isClosedForReceive}")
          }
          val consumer = GlobalScope.launch {
              for (e in channel) {
                  println("receive: $e")
                  delay(1000)
              }
              println("after consuming. closeForSend: ${channel.isClosedForSend}, closeFoReceive: ${channel.isClosedForReceive}")
          }
          joinAll(producer, consumer)
      }
    send 0
    send 1
    send 2
    receive: 0
    close channel. closeForSend: true, closeFoReceive: false
    receive: 1
    receive: 2
    after consuming. closeForSend: true, closeFoReceive: true
    
    Process finished with exit code 0

    從上面的例子看到,消費者每秒鐘消費一個元素,有1秒的處理時間,這期間生產者把3個元素都send出來然後關閉Channel,這時候剛剛消費了一個元素,所以closeForSend是true,closeForReceive是false,等到消費完畢所有元素後,值都為true。

廣播BroadcastChannel

前面提到,發送端和接收端在Channel中存在一對多的情形,從數據處理本身來講,雖然有多個接收端,但是同一個元素只會被一個接收端讀到。廣播則不然,多個接收端不存在互斥行為。

    @Test
    fun testBroadcast() = runBlocking {
        val broadcastChannel = BroadcastChannel<Int>(Channel.BUFFERED)
        val producer = GlobalScope.launch {
            List(3) {
                delay(100)
                broadcastChannel.send(it)
            }
            broadcastChannel.close()
        }
        List(3) { index ->
            GlobalScope.launch {
                val receiveChannel = broadcastChannel.openSubscription()
                for (i in receiveChannel) {
                    println("#$index received $i")
                }
            }
        }.joinAll()
    }
#2 received 0
#0 received 0
#1 received 0
#1 received 1
#0 received 1
#2 received 1
#1 received 2
#0 received 2
#2 received 2

Process finished with exit code 0

創建一個BroadcastChannel,廣播數據,開啓多個協程訂閲廣播,每個協程都能接收到廣播數據。
Channel可以轉換成BroadcastChannel,如下效果一樣:

    @Test
    fun testBroadcast2() = runBlocking {
        //val broadcastChannel = BroadcastChannel<Int>(Channel.BUFFERED)
        val channel = Channel<Int>()
        val broadcastChannel = channel.broadcast(Channel.BUFFERED)
        val producer = GlobalScope.launch {
            List(3) {
                delay(100)
                broadcastChannel.send(it)
            }
            broadcastChannel.close()
        }
        List(3) { index ->
            GlobalScope.launch {
                val receiveChannel = broadcastChannel.openSubscription()
                for (i in receiveChannel) {
                    println("#$index received $i")
                }
            }
        }.joinAll()
    }

什麼事多路複用

數據通信系統或者計算機網絡系統中,傳輸媒體的帶寬或容量往往大於傳輸單一信號的需求,為了有效地利用通信線路,希望一個信道同時傳輸多路信號,這就是多路複用技術(Multiplexing)。

複用多個await

兩個API分別從網絡和本地緩存獲取數據,期望哪個先返回就先用哪個做展示。
image.png
看例子:

    private suspend fun CoroutineScope.getUserFromLocal(name: String) = async {
        delay(1000)
        "local $name"
    }

    private suspend fun CoroutineScope.getUserFromRemote(name: String) = async {
        delay(2000)
        "remote $name"
    }

    data class Response<T>(val value: T, val isLocal: Boolean = false)

    @Test
    fun testSelectAwait() = runBlocking<Unit> {
        GlobalScope.launch {
            val localUser = getUserFromLocal("Jack")
            val remoteUser = getUserFromRemote("Mike")
            val userResponse = select<Response<String>> {
                localUser.onAwait { Response(it, true) }
                remoteUser.onAwait { Response(it, false) }
            }
            println("render on UI: ${userResponse.value}")
        }.join()
    }
render on UI: local Jack

Process finished with exit code 0

local方法延遲時間短,先返回了,所以select選取的是local數據。若把remote時間縮短,那麼select就會選擇remote返回的數據。

複用多個Channel

跟await類似,會接收到最快的那個Channel消息。看下面的例子:

    @Test
    fun testSelectChannel() = runBlocking<Unit> {
        val channels = listOf(Channel<Int>(), Channel<Int>())
        GlobalScope.launch {
            delay(100)
            channels[0].send(100)
        }
        GlobalScope.launch {
            delay(50)
            channels[1].send(50)
        }
        val result = select<Int?> {
            channels.forEach { channel -> channel.onReceive { it } }
        }
        println("result $result")
        delay(1000)
    }
result 50

Process finished with exit code 0

兩個Channel發送數據元素,第一個延遲100毫秒發送,第二個延遲50毫秒發送,用select接收的時候,收到延遲50毫秒發送的那個。

SelectClause

  • 怎麼知道哪些事件可以被select呢?其實所有能被select的事件都是SelectClauseN類型,包括:

    • SelectClause0: 對應事件沒有返回值,例如join沒有返回值,那麼onJoin就是SelectClauseN類型,使用時,onJoin的參數是一個無參函數。
    • SelectClause1: 對應事件有返回值,前面的onAwait和onReceive都是類似情況。
    • SelectClause2: 對應事件有返回值,此外還需要一個額外的參數,例如Channel.onSend有兩個參數,第一個是Channel數據類型的值,表示即將發送的值第二個是發送成功的回調參數
  • 如果想要確認掛起函數時否支持select,只需要查看其是否存在對應的SelectClauseN類型可回調即可。
    看一個無參函數的例子:

      @Test
      fun testSelectClause0() = runBlocking<Unit> {
          val job1 = GlobalScope.launch {
              delay(100)
              println("job 1")
          }
    
          val job2 = GlobalScope.launch {
              delay(10)
              println("job 2")
          }
          select<Unit> {
              job1.onJoin { println("job 1 onJoin") }
              job2.onJoin { println("job 2 onJoin") }
          }
          delay(1000)
      }
    job 2
    job 2 onJoin
    job 1
    
    Process finished with exit code 0

    啓動兩個協程job1和job2,job2延遲少,先打印,所以select中選擇了job2 (打印了job 2 onJoin),因為兩個協程沒有返回值,或者説返回值是Unit,所以在select後面聲明。

下面看一個兩個參數的例子,第一個是值,第二個是回調參數。

    @Test
    fun testSelectClause2() = runBlocking<Unit> {
        val channels = listOf(Channel<Int>(), Channel<Int>())
        println(channels)
        launch(Dispatchers.IO) {
            select<Unit> {
                launch {
                    delay(10)
                    channels[1].onSend(10) { sendChannel ->
                        println("sent on $sendChannel")
                    }
                }
                launch {
                    delay(100)
                    channels[0].onSend(100) { sendChannel ->
                        println("sent on $sendChannel")
                    }
                }
            }
        }
        GlobalScope.launch {
            println(channels[0].receive())
        }
        GlobalScope.launch {
            println(channels[1].receive())
        }
        delay(1000)
    }
[RendezvousChannel@78aab498{EmptyQueue}, RendezvousChannel@7ee955a8{EmptyQueue}]
10
sent on RendezvousChannel@7ee955a8{EmptyQueue}

Process finished with exit code 0

兩個Channel對象78aab498和7ee955a8,啓動兩個協程,分別延遲10毫秒和100毫秒,分別使用onSend發送數據10和100,第二個參數是回調;
然後啓動兩個協程分別接收兩個Channel對象發送的數據,可以看到結果選擇了較少延遲的那個協程。

再來看上面"是否存在對應的SelectClauseN類型"的意思:

public val onSend: SelectClause2<E, SendChannel<E>>
public val onJoin: SelectClause0

可以看到源碼中,onSend和onJoin都有SelectClauseN接口,所以都支持select。

使用Flow實現多路複用

多數情況下,可以通過構造合適的Flow來實現多路複用的效果。

fun CoroutineScope.getUserFromLocal(name: String) = async {
    delay(1000)
    "local $name"
}

fun CoroutineScope.getUserFromRemote(name: String) = async {
    delay(2000)
    "remote $name"
}

data class Response<T>(val value: T, val isLocal: Boolean = false)

class ChannelTest2 {

    @Test
    fun testSelectFlow() = runBlocking<Unit> {
        // 函數 -> 協程 -> Flow -> Flow合併
        val name = "guest"
        coroutineScope {
            coroutineScope {
                listOf(::getUserFromLocal, ::getUserFromRemote) // 函數
                    .map { function -> function.call(name) }    // 協程
                    .map { deferred ->
                        flow { emit(deferred.await()) }         // Flow
                    }
                    .merge()                                    // Flow 合併
                    .collect { user -> println("collect $user") }
            }
        }
    }
}
collect local guest
collect remote guest

Process finished with exit code 0

如上例,這個和select是有區別的,例子中把兩個操作合併到末端操作符collect上。

併發安全

不安全的併發訪問

使用線程在解決併發問題的時候總是會遇到線程安全的問題,而Java平台上的Kotlin協程實現免不了存在併發調度的情況,因此線程安全同樣值得留意。

    @Test
    fun testUnsafeConcurrent() = runBlocking<Unit> {
        var count = 0
        List(1000) {
            GlobalScope.launch { count++ }
        }.joinAll()
        println(count)
    }
984

Process finished with exit code 0

啓動1000個協程對count進行加一操作,運行多次,可能每次結果都不一樣,有的協程拿到count後加操作還沒完成,就被別的協程進行加操作了。所以同樣需要注意線程安全,把count修改為AtomicInteger,修改如下:

    @Test
    fun testSafeConcurrent() = runBlocking<Unit> {
        var count = AtomicInteger(0)
        List(1000) {
            GlobalScope.launch { count.incrementAndGet() }
        }.joinAll()
        println(count.get())
    }
1000

Process finished with exit code 0

協程的併發工具

除了在線程中常用的解決併發問題的手段外,協程框架也提供了一些併發安全地工具,包括:

  • Channel:併發安全地消息通道。這個已經熟悉。
  • Mutex: 輕量級鎖,它的lock和unlock從語義上與線程鎖比較類似,之所以輕量級是因為它在獲取不到鎖時不會阻塞線程,而是掛起等待鎖的釋放。
  • Semaphore: 輕量級信號量,信號量可以有多個,協程在獲取到信號量後即可執行併發操作。當Semaphore的參數為1時,效果等價於Mutex。
    看下Mutex輕量級鎖的例子:

      @Test
      fun testMutex() = runBlocking<Unit> {
          var count = 0
          val mutex = Mutex()
          List(1000) {
              GlobalScope.launch {
                  mutex.withLock {
                      count++
                  }
              }
          }.joinAll()
          println(count)
      }

    輸出1000,使用mutex,為加操作加上鎖,能保證線程安全。
    再看下Semaphore輕量級信號量的例子:

      @Test
      fun testSemaphore() = runBlocking<Unit> {
          var count = 0
          val semaphore = Semaphore(1)
          List(1000) {
              GlobalScope.launch {
                  semaphore.withPermit {
                      count++
                  }
              }
          }.joinAll()
          println(count)
      }

    加上信號量機制,也能保證線程安全。

避免訪問外部可變狀態

編寫函數的時候,要求它不得訪問外部狀態,只能基於參數做運算,通過返回值提供運算結果。

    @Test
    fun testAvoidAccessOuter() = runBlocking<Unit> {
        val count = 0
        val result = count + List(1000) {
            GlobalScope.async { 1 }
        }.map { it.await() }.sum()
        println(result)
    }

這個是一個比較極端的例子,把count作為外部變量,和協程隔開,也就不存在併發的安全性問題了。

user avatar
0 位用戶收藏了這個故事!

發佈 評論

Some HTML is okay.