| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137 |
- import os
- import queue
- import asyncio
- import sys
- import threading
- import time
- from bililive_api import BiliLiveAPI
- from detect_list import DetectList
- from conf.settings import WORK_DIR,CONF_DIR,OUTPUT,logger,Detection_Interval
- from detection import Detection
- Thread_que = queue.Queue()
- scan_live_busy = False
- detection = Detection()
- detect_list = DetectList()
- biliapi = BiliLiveAPI()
- def scan_lives():
- logger.debug(f"Detection_Interval :{Detection_Interval} , run scan_lives... ")
- # str_list = detect_list.get_all_str()
- # 数组遍历过程中,不能删除数据,否则下标会错乱,需要先临时记录要删除的下标到 del_index 中,循环结束后再删除
- del_index = []
- for index in range(len(detect_list.list)):
- item = detect_list.list[index]
- logger.debug(f'analyze item:{item}')
- key, url = detect_list.get_best_key(item)
- if not key:
- del_index.append(index)
- continue
-
- res = detection.analyze_link(url)
- if not res:
- continue
- if res.get("homepage"):
- logger.debug(f'update_list:{index} {res}')
- # 将用户拷贝的字符串替换成解析结果,以便下次使用
- detect_list.update_list(index, res)
- elif res.get("live_url"):
- post_res = biliapi.add_live(res.get("live_url"))
- del_index.append(index)
-
- detect_list.delete_list(del_index)
- logger.debug(f"scan_lives finish,detect_list = {detect_list.list}")
- async def auto_detect_interval(que:asyncio.Queue, finish_event:asyncio.Event):
- while True:
- # await que.put 是因为 thread_scan_lives 执行过久,需要先等待它执行完毕,才能重新开始计时
- await que.put('detect')
- await finish_event.wait()
- finish_event.clear()
- await asyncio.sleep(Detection_Interval)
- # await asyncio.sleep(3)
- async def auto_detect_task(auto_detect_queue:asyncio.Queue):
- loop = asyncio.get_event_loop()
- finish_event = asyncio.Event()
- asyncio.create_task(auto_detect_interval(auto_detect_queue, finish_event))
- while 1:
- msg = await auto_detect_queue.get()
- logger.debug(f'get msg {msg}')
- if msg == 'stop':
- break
- elif msg == 'detect':
- thread_scan_lives = threading.Thread(target=scan_lives)
- thread_scan_lives.start()
- await loop.run_in_executor(None, thread_scan_lives.join)
- finish_event.set()
- else :
- add_lives = msg.get('add', [])
- if add_lives:
- detect_list.append_list(add_lives)
- continue
- thread_add_lives = threading.Thread(target=add_live, args=add_lives)
- thread_add_lives.start()
- await loop.run_in_executor(None, thread_add_lives.join)
-
-
- def add_live(lives):
- detect_list.append_list(lives)
- scan_lives()
- # while True:
- # msg = await input_msg.get()
- # logger.info(msg)
- # # 新开一个浏览器,否则会和 auto_detect_task 中的浏览器重叠
- # res = Detection().analyze_link(msg)
- # if res:
- # detect_list.append_list(res)
-
-
- async def main_task():
- logger.debug('start task')
- analyze_queue = asyncio.Queue()
- auto_detect_queue = asyncio.Queue()
- # asyncio.create_task(anayze_input_string(analyze_queue))
- task_auto_detect = asyncio.create_task(auto_detect_task(auto_detect_queue))
- while True:
- if Thread_que.empty():
- await asyncio.sleep(0.1)
- continue
- msg = Thread_que.get_nowait()
- logger.debug(f"get msg from Thread_que:{msg}")
- if msg == "stop":
- logger.debug('wait task_auto_detect')
- detection.page.close_tabs()
- # await task_auto_detect
- break
- else:
- auto_detect_queue.put_nowait(msg)
- logger.debug('break')
-
- def run_task():
- asyncio.run(main_task())
- # 模拟服务器发送队列信息
- def imitate_server():
-
- thread = threading.Thread(target=run_task)
- thread.start()
- while True:
- try:
- indat = input('input:')
- Thread_que.put_nowait(indat)
- # Thread_que.put_nowait("123")
- # time.sleep(1)
- except KeyboardInterrupt:
- logger.info("KeyboardInterrupt")
- detection.page.close_tabs()
- break
- Thread_que.put_nowait("stop")
- logger.info('put stop msg')
- thread.join()
- logger.info('exit')
-
- if __name__ == "__main__":
- imitate_server()
|