task.py 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137
  1. import os
  2. import queue
  3. import asyncio
  4. import sys
  5. import threading
  6. import time
  7. from bililive_api import BiliLiveAPI
  8. from detect_list import DetectList
  9. from conf.settings import WORK_DIR,CONF_DIR,OUTPUT,logger,Detection_Interval
  10. from detection import Detection
  11. Thread_que = queue.Queue()
  12. scan_live_busy = False
  13. detection = Detection()
  14. detect_list = DetectList()
  15. biliapi = BiliLiveAPI()
  16. def scan_lives():
  17. logger.debug(f"Detection_Interval :{Detection_Interval} , run scan_lives... ")
  18. # str_list = detect_list.get_all_str()
  19. # 数组遍历过程中,不能删除数据,否则下标会错乱,需要先临时记录要删除的下标到 del_index 中,循环结束后再删除
  20. del_index = []
  21. for index in range(len(detect_list.list)):
  22. item = detect_list.list[index]
  23. logger.debug(f'analyze item:{item}')
  24. key, url = detect_list.get_best_key(item)
  25. if not key:
  26. del_index.append(index)
  27. continue
  28. res = detection.analyze_link(url)
  29. if not res:
  30. continue
  31. if res.get("homepage"):
  32. logger.debug(f'update_list:{index} {res}')
  33. # 将用户拷贝的字符串替换成解析结果,以便下次使用
  34. detect_list.update_list(index, res)
  35. elif res.get("live_url"):
  36. post_res = biliapi.add_live(res.get("live_url"))
  37. del_index.append(index)
  38. detect_list.delete_list(del_index)
  39. logger.debug(f"scan_lives finish,detect_list = {detect_list.list}")
  40. async def auto_detect_interval(que:asyncio.Queue, finish_event:asyncio.Event):
  41. while True:
  42. # await que.put 是因为 thread_scan_lives 执行过久,需要先等待它执行完毕,才能重新开始计时
  43. await que.put('detect')
  44. await finish_event.wait()
  45. finish_event.clear()
  46. await asyncio.sleep(Detection_Interval)
  47. # await asyncio.sleep(3)
  48. async def auto_detect_task(auto_detect_queue:asyncio.Queue):
  49. loop = asyncio.get_event_loop()
  50. finish_event = asyncio.Event()
  51. asyncio.create_task(auto_detect_interval(auto_detect_queue, finish_event))
  52. while 1:
  53. msg = await auto_detect_queue.get()
  54. logger.debug(f'get msg {msg}')
  55. if msg == 'stop':
  56. break
  57. elif msg == 'detect':
  58. thread_scan_lives = threading.Thread(target=scan_lives)
  59. thread_scan_lives.start()
  60. await loop.run_in_executor(None, thread_scan_lives.join)
  61. finish_event.set()
  62. else :
  63. add_lives = msg.get('add', [])
  64. if add_lives:
  65. detect_list.append_list(add_lives)
  66. continue
  67. thread_add_lives = threading.Thread(target=add_live, args=add_lives)
  68. thread_add_lives.start()
  69. await loop.run_in_executor(None, thread_add_lives.join)
  70. def add_live(lives):
  71. detect_list.append_list(lives)
  72. scan_lives()
  73. # while True:
  74. # msg = await input_msg.get()
  75. # logger.info(msg)
  76. # # 新开一个浏览器,否则会和 auto_detect_task 中的浏览器重叠
  77. # res = Detection().analyze_link(msg)
  78. # if res:
  79. # detect_list.append_list(res)
  80. async def main_task():
  81. logger.debug('start task')
  82. analyze_queue = asyncio.Queue()
  83. auto_detect_queue = asyncio.Queue()
  84. # asyncio.create_task(anayze_input_string(analyze_queue))
  85. task_auto_detect = asyncio.create_task(auto_detect_task(auto_detect_queue))
  86. while True:
  87. if Thread_que.empty():
  88. await asyncio.sleep(0.1)
  89. continue
  90. msg = Thread_que.get_nowait()
  91. logger.debug(f"get msg from Thread_que:{msg}")
  92. if msg == "stop":
  93. logger.debug('wait task_auto_detect')
  94. detection.page.close_tabs()
  95. # await task_auto_detect
  96. break
  97. else:
  98. auto_detect_queue.put_nowait(msg)
  99. logger.debug('break')
  100. def run_task():
  101. asyncio.run(main_task())
  102. # 模拟服务器发送队列信息
  103. def imitate_server():
  104. thread = threading.Thread(target=run_task)
  105. thread.start()
  106. while True:
  107. try:
  108. indat = input('input:')
  109. Thread_que.put_nowait(indat)
  110. # Thread_que.put_nowait("123")
  111. # time.sleep(1)
  112. except KeyboardInterrupt:
  113. logger.info("KeyboardInterrupt")
  114. detection.page.close_tabs()
  115. break
  116. Thread_que.put_nowait("stop")
  117. logger.info('put stop msg')
  118. thread.join()
  119. logger.info('exit')
  120. if __name__ == "__main__":
  121. imitate_server()