test_storage.py 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150
  1. from __future__ import annotations
  2. import os
  3. import shutil
  4. from abc import ABC
  5. from dataclasses import dataclass, field
  6. from io import StringIO
  7. from typing import Dict, List, Optional
  8. from unittest import TestCase
  9. from unittest.mock import patch
  10. from openhands.storage.files import FileStore
  11. from openhands.storage.google_cloud import GoogleCloudFileStore
  12. from openhands.storage.local import LocalFileStore
  13. from openhands.storage.memory import InMemoryFileStore
  14. class _StorageTest(ABC):
  15. store: FileStore
  16. def get_store(self) -> FileStore:
  17. return self.store
  18. def test_basic_fileops(self):
  19. filename = 'test.txt'
  20. store = self.get_store()
  21. store.write(filename, 'Hello, world!')
  22. self.assertEqual(store.read(filename), 'Hello, world!')
  23. self.assertEqual(store.list(''), [filename])
  24. store.delete(filename)
  25. with self.assertRaises(FileNotFoundError):
  26. store.read(filename)
  27. def test_complex_path_fileops(self):
  28. filenames = ['foo.bar.baz', './foo/bar/baz', 'foo/bar/baz', '/foo/bar/baz']
  29. store = self.get_store()
  30. for filename in filenames:
  31. store.write(filename, 'Hello, world!')
  32. self.assertEqual(store.read(filename), 'Hello, world!')
  33. store.delete(filename)
  34. with self.assertRaises(FileNotFoundError):
  35. store.read(filename)
  36. def test_list(self):
  37. store = self.get_store()
  38. store.write('foo.txt', 'Hello, world!')
  39. store.write('bar.txt', 'Hello, world!')
  40. store.write('baz.txt', 'Hello, world!')
  41. file_names = store.list('')
  42. file_names.sort()
  43. self.assertEqual(file_names, ['bar.txt', 'baz.txt', 'foo.txt'])
  44. store.delete('foo.txt')
  45. store.delete('bar.txt')
  46. store.delete('baz.txt')
  47. def test_deep_list(self):
  48. store = self.get_store()
  49. store.write('foo/bar/baz.txt', 'Hello, world!')
  50. store.write('foo/bar/qux.txt', 'Hello, world!')
  51. store.write('foo/bar/quux.txt', 'Hello, world!')
  52. self.assertEqual(store.list(''), ['foo/'])
  53. self.assertEqual(store.list('foo'), ['foo/bar/'])
  54. file_names = store.list('foo/bar')
  55. file_names.sort()
  56. self.assertEqual(
  57. file_names, ['foo/bar/baz.txt', 'foo/bar/quux.txt', 'foo/bar/qux.txt']
  58. )
  59. store.delete('foo/bar/baz.txt')
  60. store.delete('foo/bar/qux.txt')
  61. store.delete('foo/bar/quux.txt')
  62. class TestLocalFileStore(TestCase, _StorageTest):
  63. def setUp(self):
  64. os.makedirs('./_test_files_tmp', exist_ok=True)
  65. self.store = LocalFileStore('./_test_files_tmp')
  66. def tearDown(self):
  67. shutil.rmtree('./_test_files_tmp')
  68. class TestInMemoryFileStore(TestCase, _StorageTest):
  69. def setUp(self):
  70. self.store = InMemoryFileStore()
  71. class TestGoogleCloudFileStore(TestCase, _StorageTest):
  72. def setUp(self):
  73. with patch('google.cloud.storage.Client', _MockGoogleCloudClient):
  74. self.store = GoogleCloudFileStore('dear-liza')
  75. # I would have liked to use cloud-storage-mocker here but the python versions were incompatible :(
  76. # If we write tests for the S3 storage class I would definitely recommend we use moto.
  77. class _MockGoogleCloudClient:
  78. def bucket(self, name: str):
  79. assert name == 'dear-liza'
  80. return _MockGoogleCloudBucket()
  81. @dataclass
  82. class _MockGoogleCloudBucket:
  83. blobs_by_path: Dict[str, _MockGoogleCloudBlob] = field(default_factory=dict)
  84. def blob(self, path: Optional[str] = None) -> _MockGoogleCloudBlob:
  85. return self.blobs_by_path.get(path) or _MockGoogleCloudBlob(self, path)
  86. def list_blobs(self, prefix: Optional[str] = None) -> List[_MockGoogleCloudBlob]:
  87. blobs = list(self.blobs_by_path.values())
  88. if prefix and prefix != '/':
  89. blobs = [blob for blob in blobs if blob.name.startswith(prefix)]
  90. return blobs
  91. @dataclass
  92. class _MockGoogleCloudBlob:
  93. bucket: _MockGoogleCloudBucket
  94. name: str
  95. content: Optional[str | bytes] = None
  96. def open(self, op: str):
  97. if op == 'r':
  98. if self.content is None:
  99. raise FileNotFoundError()
  100. return StringIO(self.content)
  101. if op == 'w':
  102. return _MockGoogleCloudBlobWriter(self)
  103. def delete(self):
  104. del self.bucket.blobs_by_path[self.name]
  105. @dataclass
  106. class _MockGoogleCloudBlobWriter:
  107. blob: _MockGoogleCloudBlob
  108. content: str | bytes = None
  109. def __enter__(self):
  110. return self
  111. def write(self, __b):
  112. assert (
  113. self.content is None
  114. ) # We don't support buffered writes in this mock for now, as it is not needed
  115. self.content = __b
  116. def __exit__(self, exc_type, exc_val, exc_tb):
  117. blob = self.blob
  118. blob.content = self.content
  119. blob.bucket.blobs_by_path[blob.name] = blob