execute_server.py 1.1 KB

123456789101112131415161718192021222324252627282930313233
  1. # execute_server.py, this file is used in development to test the runtime client. It is not used in production.
  2. import asyncio
  3. import websockets
  4. import pexpect
  5. from websockets.exceptions import ConnectionClosed
  6. import json
  7. # Function for testing execution of shell commands
  8. async def execute_command(websocket, path):
  9. shell = pexpect.spawn('/bin/bash', encoding='utf-8')
  10. shell.expect(r'[$#] ')
  11. try:
  12. async for message in websocket:
  13. try:
  14. print(f"Received command: {message}")
  15. shell.sendline(message)
  16. shell.expect(r'[$#] ')
  17. output = shell.before.strip().split('\r\n', 1)[1].strip()
  18. print("Yufan:",output)
  19. await websocket.send(output)
  20. except Exception as e:
  21. await websocket.send(f"Error: {str(e)}")
  22. except ConnectionClosed:
  23. print("Connection closed")
  24. finally:
  25. shell.close()
  26. start_server = websockets.serve(execute_command, "0.0.0.0", 8080)
  27. asyncio.get_event_loop().run_until_complete(start_server)
  28. asyncio.get_event_loop().run_forever()