test.py 1.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960
  1. import asyncio
  2. import os
  3. import queue
  4. import sys
  5. import threading
  6. import time
  7. sys.path.append(os.path.dirname(os.path.dirname(__file__)))
  8. from conf.settings import WORK_DIR,CONF_DIR,OUTPUT,logger,Detection_Interval
  9. Thread_que = queue.Queue()
  10. # from detection import Detection
  11. # def analyze():
  12. # detection = Detection()
  13. # detection.analyze_link('123')
  14. async def main_task():
  15. stop_event = asyncio.Event()
  16. task_auto_detect = asyncio.create_task(auto_detect_task(stop_event))
  17. while True:
  18. logger.info("m")
  19. await asyncio.sleep(1)
  20. def run_task():
  21. asyncio.run(main_task())
  22. async def auto_detect_task(stop_event:asyncio.Event=None):
  23. loop = asyncio.get_event_loop()
  24. while 1:
  25. # await asyncio.sleep(Detection_Interval)
  26. await asyncio.sleep(1)
  27. logger.debug(f"thread_scan_lives finish:{stop_event.is_set()}")
  28. if stop_event.is_set():
  29. logger.debug("stop event")
  30. break
  31. def imitate_server():
  32. thread = threading.Thread(target=run_task)
  33. thread.start()
  34. while True:
  35. try:
  36. indat = input('input:')
  37. except KeyboardInterrupt:
  38. logger.info("KeyboardInterrupt")
  39. break
  40. # Thread_que.put_nowait("stop")
  41. logger.info('put stop msg')
  42. thread.join()
  43. logger.info('exit')
  44. # sys.exit(0)
  45. if __name__ == "__main__":
  46. # imitate_server()
  47. live_page_user_name = {}
  48. ret = {"live_url": "live_url"}
  49. ret.update(live_page_user_name)
  50. print(ret.update(live_page_user_name))
  51. print(ret)