動態

詳情 返回 返回

Python3 使用 websockets 調用阿里雲實時語音識別(qbit) - 動態 詳情

前言

  • 技術棧

    Python  3.11.8
    websockets  15.0.1
    aliyun-python-sdk-core  2.16.0
    nls  1.0.0
  • 截至 2025.3.13nls.NlsSpeechTranscriber 不支持異步調用
  • 使用 asyncio.runloop.create_task 將異步調用轉化為同步調用
  • 後文中,為保持字節流和字符串的一致性,定義了不同格式的結束符

    b'$END$' client -> server
     '$END$' server -> client

材料準備

  • 從 GitHub 下載 nls 目錄tests 目錄裏面的 test1.pcm 文件
    image.png
  • nls 目錄放到 site-packages 目錄下,相當於安裝 nls

示例代碼

  • server 端代碼(sync 同步版本)

    # encoding: utf-8
    # author: qbit
    # date: 2025-03-13
    # summary: websocket 服務端結束語言識別請求,調用阿里雲返回識別的文字結果
    
    import time
    import json
    import nls
    from websockets.sync.server import serve
    
    URL = "wss://nls-gateway-cn-shanghai.aliyuncs.com/ws/v1"
    TOKEN = "your_token"  # 參考 https://help.aliyun.com/document_detail/450255.html 獲取token
    APPKEY = "your_key"  # 獲取Appkey請前往控制枱:https://nls-portal.console.aliyun.com/applist
    
    class FuncCallback:
      r"""nls.NlsSpeechTranscriber 需要的回調函數"""
    
      def __init__(self, ):
          self.result = ""
          self.state = ""
    
      def on_sentence_begin(self, message, *args):
          print("on_sentence_begin")
    
      def on_sentence_end(self, message, *args):
          self.result = json.loads(message)["payload"]["result"]
          self.state = "sentence_end"
          print(f"on_sentence_end: {self.result}")
          websocket = args[0]
          websocket.send(self.result)
    
      def on_start(self, message, *args):
          print("on_start")
    
      def on_error(self, message, *args):
          print(f"on_error from aliyun {message}")
          self.state = "error"
    
      def on_close(self, *args):
          print("on_close")
    
      def on_result_chg(self, message, *args):
          self.result = json.loads(message)["payload"]["result"]
          self.state = "result_chg"
          print(f"on_result_chg: {self.result}")
          websocket = args[0]
          websocket.send(self.result)
    
      def on_completed(self, message, *args):
          print("on_completed")
          self.state = "completed"
    
    def asr(websocket):
      r"""語言識別服務,Automatic Speech Recognition (ASR)"""
      print(
          f"Client connected: {websocket.remote_address[0]}:{websocket.remote_address[1]}"
      )
      callback = FuncCallback()
      sr = nls.NlsSpeechTranscriber(
          url=URL,
          token=TOKEN,
          appkey=APPKEY,
          on_sentence_begin=callback.on_sentence_begin,
          on_sentence_end=callback.on_sentence_end,
          on_start=callback.on_start,
          on_result_changed=callback.on_result_chg,
          on_completed=callback.on_completed,
          on_error=callback.on_error,
          on_close=callback.on_close,
          callback_args=[websocket],
      )
      sr.start(
          aformat="pcm",              # 支持格式 pcm, opu, opus
          enable_intermediate_result=True,
          enable_punctuation_prediction=True,
          enable_inverse_text_normalization=True,
      )
      for message in websocket:
          if message == b"$END$":
              break
          sr.send_audio(message)
    
      sr.stop()
    
      while callback.state not in ('completed', 'error'):
          time.sleep(0.1)
          print(f"callback.state: {callback.state}")
      websocket.send("$END$")        # 發送自定義結束標誌
    
    def main():
      host = "127.0.0.1"
      port = 8765
      with serve(asr, host, port) as server:
          print(f"Server started {host}:{port}...")
          server.serve_forever()
    
    if __name__ == "__main__":
      main()
  • client 端代碼

    # encoding: utf-8
    # author: qbit
    # date: 2025-03-13
    # summary: websocket 客户端模擬實時語音請求
    
    import threading
    from websockets.sync.client import connect
    
    def send(websocket):
      r"""
      測試語言識別
      Automatic Speech Recognition (ASR)
      """
      with open("./audio/test1.pcm", "rb") as f:
          data = f.read()
      __slices = zip(*(iter(data),) * 6400)
      for i in __slices:
          websocket.send(bytes(i))
      websocket.send(b"$END$")    # 發送自定義結束標誌
    
    def recv(websocket):
      r"""
      測試語言識別
      Automatic Speech Recognition (ASR)
      """
      while True:
          message = websocket.recv()
          print(f"Received message: {message}")
          if message == "$END$":
              break
    
    if __name__ == "__main__":
      uri = "ws://localhost:8765"
      print(f"Connecting to {uri}...")
      with connect(uri) as websocket:
          t1 = threading.Thread(target=send, args=(websocket,))
          t2 = threading.Thread(target=recv, args=(websocket,))
          t1.start()
          t2.start()
          t1.join()
          t2.join()

運行與輸出

  • 先啓動 server 代碼,再啓動 client 代碼
  • server 輸出

    Server started 127.0.0.1:8765...
    Client connected: 127.0.0.1:53387
    on_start
    on_sentence_begin
    on_result_chg: 因
    on_result_chg: 一
    on_result_chg: 12
    on_result_chg: 123
    on_result_chg: 1234
    on_result_chg: 12345
    on_result_chg: 123456
    on_result_chg: 1234567
    on_result_chg: 12345678
    on_result_chg: 1 2 3 4 5 6 7 8 9 10
    on_sentence_end: 1 2 3 4 5 6 7 8 9 10。
    on_completed
    on_close
  • client 輸出

    Connecting to ws://localhost:8765...
    Received message: 因
    Received message: 一
    Received message: 12
    Received message: 123
    Received message: 1234
    Received message: 12345
    Received message: 123456
    Received message: 1234567
    Received message: 12345678
    Received message: 1 2 3 4 5 6 7 8 9 10
    Received message: 1 2 3 4 5 6 7 8 9 10。
    Received message: $END$

async 異步版 Server 代碼

  • 本節的異步版本代碼,可以與 fastapi.WebSocket 結合使用
  • asyncio.run 版本

    # encoding: utf-8
    # author: qbit
    # date: 2025-03-13
    # summary: websocket 服務端結束語言識別請求,調用阿里雲返回識別的文字結果
    
    import json
    import nls
    import asyncio
    from websockets.asyncio.server import serve
    
    URL = "wss://nls-gateway-cn-shanghai.aliyuncs.com/ws/v1"
    TOKEN = "your_token"  # 參考 https://help.aliyun.com/document_detail/450255.html 獲取token
    APPKEY = "your_key"  # 獲取Appkey請前往控制枱:https://nls-portal.console.aliyun.com/applist
    
    class FuncCallback:
      r"""nls.NlsSpeechTranscriber 需要的回調函數"""
      def __init__(self, ):
          self.result = ""
          self.state = ""
    
      def on_sentence_begin(self, message, *args):
          print("on_sentence_begin")
    
      def on_sentence_end(self, message, *args):
          self.result = json.loads(message)["payload"]["result"]
          self.state = "sentence_end"
          print(f"on_sentence_end: {self.result}")
          websocket = args[0]
          asyncio.run(websocket.send(self.result))    # 將異步調用轉化為同步調用
    
      def on_start(self, message, *args):
          print("on_start")
    
      def on_error(self, message, *args):
          print(f"on_error from aliyun {message}")
          self.state = "error"
    
      def on_close(self, *args):
          print("on_close")
    
      def on_result_chg(self, message, *args):
          self.result = json.loads(message)["payload"]["result"]
          self.state = "result_chg"
          print(f"on_result_chg: {self.result}")
          websocket = args[0]
          asyncio.run(websocket.send(self.result))    # 將異步調用轉化為同步調用
    
      def on_completed(self, message, *args):
          print("on_completed")
          self.state = "completed"
    
    async def asr(websocket):
      callback = FuncCallback()
      sr = nls.NlsSpeechTranscriber(
          url=URL,
          token=TOKEN,
          appkey=APPKEY,
          on_sentence_begin=callback.on_sentence_begin,
          on_sentence_end=callback.on_sentence_end,
          on_start=callback.on_start,
          on_result_changed=callback.on_result_chg,
          on_completed=callback.on_completed,
          on_error=callback.on_error,
          on_close=callback.on_close,
          callback_args=[websocket],
      )
      sr.start(
          aformat="pcm",
          enable_intermediate_result=True,
          enable_punctuation_prediction=True,
          enable_inverse_text_normalization=True,
      )
      async for message in websocket:
          if message == b"$END$":
              break
          sr.send_audio(message)
    
      sr.stop()
    
      while callback.state not in ("completed", "error"):
          await asyncio.sleep(0.1)
          print(f"callback.state: {callback.state}")
    
      await websocket.send("$END$")  # 發送自定義結束標誌
    
    async def main():
      host = "127.0.0.1"
      port = 8765
      async with serve(asr, host, port) as server:
          print(f"Server started {host}:{port}...")
          await server.serve_forever()
    
    if __name__ == "__main__":
      asyncio.run(main())
  • 因為 asyncio.run 新開了事件循環有些重,花了不少時間弄出下面的 loop.create_task 版本

    # encoding: utf-8
    # author: qbit
    # date: 2025-03-14
    # summary: websocket 服務端結束語言識別請求,調用阿里雲返回識別的文字結果
    
    import json
    import nls
    import asyncio
    from websockets.asyncio.server import serve
    
    URL = "wss://nls-gateway-cn-shanghai.aliyuncs.com/ws/v1"
    TOKEN = "your_token"  # 參考 https://help.aliyun.com/document_detail/450255.html 獲取token
    APPKEY = "your_key"  # 獲取Appkey請前往控制枱:https://nls-portal.console.aliyun.com/applist
    
    class FuncCallback:
      r"""nls.NlsSpeechTranscriber 需要的回調函數"""
    
      def __init__(self):
          self.result = ""
          self.state = ""
    
      def on_sentence_begin(self, message, *args):
          print("on_sentence_begin")
    
      def on_sentence_end(self, message, *args):
          self.result = json.loads(message)["payload"]["result"]
          self.state = "sentence_end"
          print(f"on_sentence_end: {self.result}")
          websocket = args[0]
          loop: asyncio.windows_events.ProactorEventLoop = args[1]
          loop.create_task(websocket.send(self.result))
    
      def on_start(self, message, *args):
          print("on_start")
    
      def on_error(self, message, *args):
          print(f"on_error from aliyun {message}")
          self.state = "error"
    
      def on_close(self, *args):
          print("on_close")
    
      def on_result_chg(self, message, *args):
          self.result = json.loads(message)["payload"]["result"]
          self.state = "result_chg"
          print(f"on_result_chg: {self.result}")
          websocket = args[0]
          loop: asyncio.windows_events.ProactorEventLoop = args[1]
          loop.create_task(websocket.send(self.result))
    
      def on_completed(self, message, *args):
          print("on_completed")
          self.state = "completed"
    
    async def asr(websocket):
      callback = FuncCallback()
      # loop = asyncio.get_running_loop()
      loop = asyncio.get_event_loop()
      sr = nls.NlsSpeechTranscriber(
          url=URL,
          token=TOKEN,
          appkey=APPKEY,
          on_sentence_begin=callback.on_sentence_begin,
          on_sentence_end=callback.on_sentence_end,
          on_start=callback.on_start,
          on_result_changed=callback.on_result_chg,
          on_completed=callback.on_completed,
          on_error=callback.on_error,
          on_close=callback.on_close,
          callback_args=[websocket, loop],
      )
      sr.start(
          aformat="pcm",  # 支持格式 pcm, opu, opus
          enable_intermediate_result=True,
          enable_punctuation_prediction=True,
          enable_inverse_text_normalization=True,
      )
    
      async for message in websocket:
          if message == b"$END$":
              break
          sr.send_audio(message)
    
      sr.stop()
      # 下一行代碼很重要!沒有的話 FuncCallback 裏面的 loop.create_task 不會被觸發
      await asyncio.sleep(0)
    
      while callback.state not in ("completed", "error"):
          await asyncio.sleep(0.1)
          print(f"callback.state: {callback.state}")
    
      await websocket.send("$END$")  # 發送自定義結束標誌
    
    async def main():
      host = "127.0.0.1"
      port = 8765
      async with serve(asr, host, port) as server:
          print(f"Server started {host}:{port}...")
          await server.serve_forever()
    
    if __name__ == "__main__":
      asyncio.run(main())

相關閲讀

  • 阿里雲實時語音識別官方文檔 https://help.aliyun.com/zh/isi/developer-reference/sdk-for-py...
  • websockets GitHub 倉庫:https://github.com/python-websockets/websockets
本文出自 qbit sanp
user avatar mingtiaoiv 頭像 taoqix 頭像 fanudekaixinguo 頭像 aitechshare 頭像 openhacking 頭像 yolindeng 頭像 longronglang 頭像
點贊 7 用戶, 點贊了這篇動態!
點贊

Add a new 評論

Some HTML is okay.