ws_client.py 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182
  1. # -*- encoding: utf-8 -*-
  2. import os
  3. import time
  4. import websockets
  5. import asyncio
  6. # import threading
  7. import argparse
  8. import json
  9. parser = argparse.ArgumentParser()
  10. parser.add_argument("--host",
  11. type=str,
  12. default="localhost",
  13. required=False,
  14. help="host ip, localhost, 0.0.0.0")
  15. parser.add_argument("--port",
  16. type=int,
  17. default=10095,
  18. required=False,
  19. help="grpc server port")
  20. parser.add_argument("--chunk_size",
  21. type=str,
  22. default="5, 10, 5",
  23. help="chunk")
  24. parser.add_argument("--chunk_interval",
  25. type=int,
  26. default=10,
  27. help="chunk")
  28. parser.add_argument("--audio_in",
  29. type=str,
  30. default=None,
  31. help="audio_in")
  32. args = parser.parse_args()
  33. args.chunk_size = [int(x) for x in args.chunk_size.split(",")]
  34. # voices = asyncio.Queue()
  35. from queue import Queue
  36. voices = Queue()
  37. # 其他函数可以通过调用send(data)来发送数据,例如:
  38. async def record_microphone():
  39. is_finished = False
  40. import pyaudio
  41. #print("2")
  42. global voices
  43. FORMAT = pyaudio.paInt16
  44. CHANNELS = 1
  45. RATE = 16000
  46. chunk_size = 60*args.chunk_size[1]/args.chunk_interval
  47. CHUNK = int(RATE / 1000 * chunk_size)
  48. p = pyaudio.PyAudio()
  49. stream = p.open(format=FORMAT,
  50. channels=CHANNELS,
  51. rate=RATE,
  52. input=True,
  53. frames_per_buffer=CHUNK)
  54. is_speaking = True
  55. while True:
  56. data = stream.read(CHUNK)
  57. data = data.decode('ISO-8859-1')
  58. message = json.dumps({"chunk_size": args.chunk_size, "chunk_interval": args.chunk_interval, "audio": data, "is_speaking": is_speaking, "is_finished": is_finished})
  59. voices.put(message)
  60. #print(voices.qsize())
  61. await asyncio.sleep(0.005)
  62. # 其他函数可以通过调用send(data)来发送数据,例如:
  63. async def record_from_scp():
  64. import wave
  65. global voices
  66. is_finished = False
  67. if args.audio_in.endswith(".scp"):
  68. f_scp = open(args.audio_in)
  69. wavs = f_scp.readlines()
  70. else:
  71. wavs = [args.audio_in]
  72. for wav in wavs:
  73. wav_splits = wav.strip().split()
  74. wav_path = wav_splits[1] if len(wav_splits) > 1 else wav_splits[0]
  75. # bytes_f = open(wav_path, "rb")
  76. # bytes_data = bytes_f.read()
  77. with wave.open(wav_path, "rb") as wav_file:
  78. # 获取音频参数
  79. params = wav_file.getparams()
  80. # 获取头信息的长度
  81. # header_length = wav_file.getheaders()[0][1]
  82. # 读取音频帧数据,跳过头信息
  83. # wav_file.setpos(header_length)
  84. frames = wav_file.readframes(wav_file.getnframes())
  85. # 将音频帧数据转换为字节类型的数据
  86. audio_bytes = bytes(frames)
  87. # stride = int(args.chunk_size/1000*16000*2)
  88. stride = int(60*args.chunk_size[1]/args.chunk_interval/1000*16000*2)
  89. chunk_num = (len(audio_bytes)-1)//stride + 1
  90. # print(stride)
  91. is_speaking = True
  92. for i in range(chunk_num):
  93. if i == chunk_num-1:
  94. is_speaking = False
  95. beg = i*stride
  96. data = audio_bytes[beg:beg+stride]
  97. data = data.decode('ISO-8859-1')
  98. message = json.dumps({"chunk_size": args.chunk_size, "chunk_interval": args.chunk_interval, "is_speaking": is_speaking, "audio": data, "is_finished": is_finished})
  99. voices.put(message)
  100. # print("data_chunk: ", len(data_chunk))
  101. # print(voices.qsize())
  102. await asyncio.sleep(60*args.chunk_size[1]/args.chunk_interval/1000)
  103. is_finished = True
  104. message = json.dumps({"is_finished": is_finished})
  105. voices.put(message)
  106. async def ws_send():
  107. global voices
  108. global websocket
  109. print("started to sending data!")
  110. while True:
  111. while not voices.empty():
  112. data = voices.get()
  113. voices.task_done()
  114. try:
  115. await websocket.send(data) # 通过ws对象发送数据
  116. except Exception as e:
  117. print('Exception occurred:', e)
  118. await asyncio.sleep(0.005)
  119. await asyncio.sleep(0.005)
  120. async def message():
  121. global websocket
  122. text_print = ""
  123. while True:
  124. try:
  125. meg = await websocket.recv()
  126. meg = json.loads(meg)
  127. # print(meg, end = '')
  128. # print("\r")
  129. text = meg["text"][0]
  130. text_print += text
  131. text_print = text_print[-55:]
  132. os.system('clear')
  133. print("\r"+text_print)
  134. except Exception as e:
  135. print("Exception:", e)
  136. async def print_messge():
  137. global websocket
  138. while True:
  139. try:
  140. meg = await websocket.recv()
  141. meg = json.loads(meg)
  142. print(meg)
  143. except Exception as e:
  144. print("Exception:", e)
  145. async def ws_client():
  146. global websocket # 定义一个全局变量ws,用于保存websocket连接对象
  147. # uri = "ws://11.167.134.197:8899"
  148. uri = "ws://{}:{}".format(args.host, args.port)
  149. #ws = await websockets.connect(uri, subprotocols=["binary"]) # 创建一个长连接
  150. async for websocket in websockets.connect(uri, subprotocols=["binary"], ping_interval=None):
  151. if args.audio_in is not None:
  152. task = asyncio.create_task(record_from_scp()) # 创建一个后台任务录音
  153. else:
  154. task = asyncio.create_task(record_microphone()) # 创建一个后台任务录音
  155. task2 = asyncio.create_task(ws_send()) # 创建一个后台任务发送
  156. task3 = asyncio.create_task(message()) # 创建一个后台接收消息的任务
  157. await asyncio.gather(task, task2, task3)
  158. asyncio.get_event_loop().run_until_complete(ws_client()) # 启动协程
  159. asyncio.get_event_loop().run_forever()