grpc_server.py 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117
  1. from concurrent import futures
  2. import grpc
  3. import json
  4. import paraformer_pb2
  5. import paraformer_pb2_grpc
  6. import time
  7. from paraformer_pb2 import Response
  8. from modelscope.pipelines import pipeline
  9. from modelscope.utils.constant import Tasks
  10. class ASRServicer(paraformer_pb2_grpc.ASRServicer):
  11. def __init__(self, user_allowed, model, sample_rate):
  12. print("ASRServicer init")
  13. self.init_flag = 0
  14. self.client_buffers = {}
  15. self.client_transcription = {}
  16. self.auth_user = user_allowed.split("|")
  17. self.inference_16k_pipline = pipeline(task=Tasks.auto_speech_recognition, model=model)
  18. self.sample_rate = sample_rate
  19. def clear_states(self, user):
  20. self.clear_buffers(user)
  21. self.clear_transcriptions(user)
  22. def clear_buffers(self, user):
  23. if user in self.client_buffers:
  24. del self.client_buffers[user]
  25. def clear_transcriptions(self, user):
  26. if user in self.client_transcription:
  27. del self.client_transcription[user]
  28. def disconnect(self, user):
  29. self.clear_states(user)
  30. print("Disconnecting user: %s" % str(user))
  31. def Recognize(self, request_iterator, context):
  32. for req in request_iterator:
  33. if req.user not in self.auth_user:
  34. result = {}
  35. result["success"] = False
  36. result["detail"] = "Not Authorized user: %s " % req.user
  37. result["text"] = ""
  38. yield Response(sentence=json.dumps(result), user=req.user, action="terminate", language=req.language)
  39. elif req.isEnd: #end grpc
  40. print("asr end")
  41. self.disconnect(req.user)
  42. result = {}
  43. result["success"] = True
  44. result["detail"] = "asr end"
  45. result["text"] = ""
  46. yield Response(sentence=json.dumps(result), user=req.user, action="terminate",language=req.language)
  47. elif req.speaking: #continue speaking
  48. if req.audio_data is not None and len(req.audio_data) > 0:
  49. if req.user in self.client_buffers:
  50. self.client_buffers[req.user] += req.audio_data #append audio
  51. else:
  52. self.client_buffers[req.user] = req.audio_data
  53. result = {}
  54. result["success"] = True
  55. result["detail"] = "speaking"
  56. result["text"] = ""
  57. yield Response(sentence=json.dumps(result), user=req.user, action="speaking", language=req.language)
  58. elif not req.speaking: #silence
  59. if req.user not in self.client_buffers:
  60. result = {}
  61. result["success"] = True
  62. result["detail"] = "waiting_for_voice"
  63. result["text"] = ""
  64. yield Response(sentence=json.dumps(result), user=req.user, action="waiting", language=req.language)
  65. else:
  66. begin_time = int(round(time.time() * 1000))
  67. tmp_data = self.client_buffers[req.user]
  68. self.clear_states(req.user)
  69. result = {}
  70. result["success"] = True
  71. result["detail"] = "decoding data: %d bytes" % len(tmp_data)
  72. result["text"] = ""
  73. yield Response(sentence=json.dumps(result), user=req.user, action="decoding", language=req.language)
  74. if len(tmp_data) < 800: #min input_len for asr model
  75. end_time = int(round(time.time() * 1000))
  76. delay_str = str(end_time - begin_time)
  77. result = {}
  78. result["success"] = True
  79. result["detail"] = "finish_sentence_data_is_not_long_enough"
  80. result["server_delay_ms"] = delay_str
  81. result["text"] = ""
  82. print ("user: %s , delay(ms): %s, error: %s " % (req.user, delay_str, "data_is_not_long_enough"))
  83. yield Response(sentence=json.dumps(result), user=req.user, action="finish", language=req.language)
  84. else:
  85. asr_result = self.inference_16k_pipline(audio_in=tmp_data, audio_fs = self.sample_rate)
  86. if "text" in asr_result:
  87. asr_result = asr_result['text']
  88. else:
  89. asr_result = ""
  90. end_time = int(round(time.time() * 1000))
  91. delay_str = str(end_time - begin_time)
  92. print ("user: %s , delay(ms): %s, text: %s " % (req.user, delay_str, asr_result))
  93. result = {}
  94. result["success"] = True
  95. result["detail"] = "finish_sentence"
  96. result["server_delay_ms"] = delay_str
  97. result["text"] = asr_result
  98. yield Response(sentence=json.dumps(result), user=req.user, action="finish", language=req.language)
  99. else:
  100. result = {}
  101. result["success"] = False
  102. result["detail"] = "error, no condition matched! Unknown reason."
  103. result["text"] = ""
  104. self.disconnect(req.user)
  105. yield Response(sentence=json.dumps(result), user=req.user, action="terminate", language=req.language)