Jelajahi Sumber

exit when send all data for python ws client (#627)

* keep blank for english asr result

* exit when finished for python ws client
zhaomingwork 2 tahun lalu
induk
melakukan
38ccf84873
1 mengubah file dengan 39 tambahan dan 34 penghapusan
  1. 39 34
      funasr/runtime/python/websocket/wss_client_asr.py

+ 39 - 34
funasr/runtime/python/websocket/wss_client_asr.py

@@ -71,7 +71,6 @@ print(args)
 from queue import Queue
 
 voices = Queue()
-
 ibest_writer = None
 if args.output_dir is not None:
     writer = DatadirWriter(args.output_dir)
@@ -118,9 +117,11 @@ async def record_from_scp(chunk_begin, chunk_size):
         wavs = wavs[chunk_begin:chunk_begin + chunk_size]
     for wav in wavs:
         wav_splits = wav.strip().split()
+ 
         wav_name = wav_splits[0] if len(wav_splits) > 1 else "demo"
         wav_path = wav_splits[1] if len(wav_splits) > 1 else wav_splits[0]
-
+        if not len(wav_path.strip())>0:
+           continue
         if wav_path.endswith(".pcm"):
             with open(wav_path, "rb") as f:
                 audio_bytes = f.read()
@@ -142,47 +143,43 @@ async def record_from_scp(chunk_begin, chunk_size):
         # send first time
         message = json.dumps({"mode": args.mode, "chunk_size": args.chunk_size, "chunk_interval": args.chunk_interval,
                               "wav_name": wav_name, "is_speaking": True})
-        voices.put(message)
+        #voices.put(message)
+        await websocket.send(message)
         is_speaking = True
         for i in range(chunk_num):
 
             beg = i * stride
             data = audio_bytes[beg:beg + stride]
             message = data
-            voices.put(message)
+            #voices.put(message)
+            await websocket.send(message)
             if i == chunk_num - 1:
                 is_speaking = False
                 message = json.dumps({"is_speaking": is_speaking})
-                voices.put(message)
+                #voices.put(message)
+                await websocket.send(message)
             # print("data_chunk: ", len(data_chunk))
             # print(voices.qsize())
             sleep_duration = 0.001 if args.send_without_sleep else 60 * args.chunk_size[1] / args.chunk_interval / 1000
             await asyncio.sleep(sleep_duration)
+    while not voices.empty():
+         await asyncio.sleep(1)
+    await asyncio.sleep(3)
+    await websocket.close()
+     
+ 
+ 
 
-async def ws_send():
-    global voices
-    global websocket
-    print("started to sending data!")
-    while True:
-        while not voices.empty():
-            data = voices.get()
-            voices.task_done()
-            try:
-                await websocket.send(data)
-            except Exception as e:
-                print('Exception occurred:', e)
-                traceback.print_exc()
-                exit(0)
-            await asyncio.sleep(0.005)
-        await asyncio.sleep(0.005)
-
+ 
+             
 async def message(id):
-    global websocket
+    global websocket,voices
     text_print = ""
     text_print_2pass_online = ""
     text_print_2pass_offline = ""
-    while True:
-        try:
+    try:
+       while True:
+        
             meg = await websocket.recv()
             meg = json.loads(meg)
             wav_name = meg.get("wav_name", "demo")
@@ -213,10 +210,11 @@ async def message(id):
                 os.system('clear')
                 print("\rpid" + str(id) + ": " + text_print)
 
-        except Exception as e:
+    except Exception as e:
             print("Exception:", e)
-            traceback.print_exc()
-            exit(0)
+            #traceback.print_exc()
+            #await websocket.close()
+ 
 
 
 async def print_messge():
@@ -228,11 +226,16 @@ async def print_messge():
             print(meg)
         except Exception as e:
             print("Exception:", e)
-            traceback.print_exc()
+            #traceback.print_exc()
             exit(0)
 
 async def ws_client(id, chunk_begin, chunk_size):
-    global websocket
+  if args.audio_in is None:
+       chunk_begin=0
+       chunk_size=1
+  global websocket,voices
+  for i in range(chunk_begin,chunk_begin+chunk_size):
+    voices = Queue()
     if args.ssl == 1:
         ssl_context = ssl.SSLContext()
         ssl_context.check_hostname = False
@@ -242,14 +245,16 @@ async def ws_client(id, chunk_begin, chunk_size):
         uri = "ws://{}:{}".format(args.host, args.port)
         ssl_context = None
     print("connect to", uri)
-    async for websocket in websockets.connect(uri, subprotocols=["binary"], ping_interval=None, ssl=ssl_context):
+    async with websockets.connect(uri, subprotocols=["binary"], ping_interval=None, ssl=ssl_context) as websocket:
         if args.audio_in is not None:
-            task = asyncio.create_task(record_from_scp(chunk_begin, chunk_size))
+            task = asyncio.create_task(record_from_scp(i, 1))
         else:
             task = asyncio.create_task(record_microphone())
-        task2 = asyncio.create_task(ws_send())
+        #task2 = asyncio.create_task(ws_send())
         task3 = asyncio.create_task(message(id))
-        await asyncio.gather(task, task2, task3)
+        await asyncio.gather(task, task3)
+  exit(0)
+    
 
 def one_thread(id, chunk_begin, chunk_size):
     asyncio.get_event_loop().run_until_complete(ws_client(id, chunk_begin, chunk_size))