test_storage.py 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154
  1. from __future__ import annotations
  2. import os
  3. import shutil
  4. from abc import ABC
  5. from dataclasses import dataclass, field
  6. from io import StringIO
  7. from typing import Dict, List, Optional
  8. from unittest import TestCase
  9. from unittest.mock import patch
  10. from openhands.storage.files import FileStore
  11. from openhands.storage.google_cloud import GoogleCloudFileStore
  12. from openhands.storage.local import LocalFileStore
  13. from openhands.storage.memory import InMemoryFileStore
  14. class _StorageTest(ABC):
  15. store: FileStore
  16. def get_store(self) -> FileStore:
  17. try:
  18. self.store.delete('')
  19. except Exception:
  20. pass
  21. return self.store
  22. def test_basic_fileops(self):
  23. filename = 'test.txt'
  24. store = self.get_store()
  25. store.write(filename, 'Hello, world!')
  26. self.assertEqual(store.read(filename), 'Hello, world!')
  27. self.assertEqual(store.list(''), [filename])
  28. store.delete(filename)
  29. with self.assertRaises(FileNotFoundError):
  30. store.read(filename)
  31. def test_complex_path_fileops(self):
  32. filenames = ['foo.bar.baz', './foo/bar/baz', 'foo/bar/baz', '/foo/bar/baz']
  33. store = self.get_store()
  34. for filename in filenames:
  35. store.write(filename, 'Hello, world!')
  36. self.assertEqual(store.read(filename), 'Hello, world!')
  37. store.delete(filename)
  38. with self.assertRaises(FileNotFoundError):
  39. store.read(filename)
  40. def test_list(self):
  41. store = self.get_store()
  42. store.write('foo.txt', 'Hello, world!')
  43. store.write('bar.txt', 'Hello, world!')
  44. store.write('baz.txt', 'Hello, world!')
  45. file_names = store.list('')
  46. file_names.sort()
  47. self.assertEqual(file_names, ['bar.txt', 'baz.txt', 'foo.txt'])
  48. store.delete('foo.txt')
  49. store.delete('bar.txt')
  50. store.delete('baz.txt')
  51. def test_deep_list(self):
  52. store = self.get_store()
  53. store.write('foo/bar/baz.txt', 'Hello, world!')
  54. store.write('foo/bar/qux.txt', 'Hello, world!')
  55. store.write('foo/bar/quux.txt', 'Hello, world!')
  56. self.assertEqual(store.list(''), ['foo/'])
  57. self.assertEqual(store.list('foo'), ['foo/bar/'])
  58. file_names = store.list('foo/bar')
  59. file_names.sort()
  60. self.assertEqual(
  61. file_names, ['foo/bar/baz.txt', 'foo/bar/quux.txt', 'foo/bar/qux.txt']
  62. )
  63. store.delete('foo/bar/baz.txt')
  64. store.delete('foo/bar/qux.txt')
  65. store.delete('foo/bar/quux.txt')
  66. class TestLocalFileStore(TestCase, _StorageTest):
  67. def setUp(self):
  68. os.makedirs('./_test_files_tmp', exist_ok=True)
  69. self.store = LocalFileStore('./_test_files_tmp')
  70. def tearDown(self):
  71. shutil.rmtree('./_test_files_tmp')
  72. class TestInMemoryFileStore(TestCase, _StorageTest):
  73. def setUp(self):
  74. self.store = InMemoryFileStore()
  75. class TestGoogleCloudFileStore(TestCase, _StorageTest):
  76. def setUp(self):
  77. with patch('google.cloud.storage.Client', _MockGoogleCloudClient):
  78. self.store = GoogleCloudFileStore('dear-liza')
  79. # I would have liked to use cloud-storage-mocker here but the python versions were incompatible :(
  80. # If we write tests for the S3 storage class I would definitely recommend we use moto.
  81. class _MockGoogleCloudClient:
  82. def bucket(self, name: str):
  83. assert name == 'dear-liza'
  84. return _MockGoogleCloudBucket()
  85. @dataclass
  86. class _MockGoogleCloudBucket:
  87. blobs_by_path: Dict[str, _MockGoogleCloudBlob] = field(default_factory=dict)
  88. def blob(self, path: Optional[str] = None) -> _MockGoogleCloudBlob:
  89. return self.blobs_by_path.get(path) or _MockGoogleCloudBlob(self, path)
  90. def list_blobs(self, prefix: Optional[str] = None) -> List[_MockGoogleCloudBlob]:
  91. blobs = list(self.blobs_by_path.values())
  92. if prefix and prefix != '/':
  93. blobs = [blob for blob in blobs if blob.name.startswith(prefix)]
  94. return blobs
  95. @dataclass
  96. class _MockGoogleCloudBlob:
  97. bucket: _MockGoogleCloudBucket
  98. name: str
  99. content: Optional[str | bytes] = None
  100. def open(self, op: str):
  101. if op == 'r':
  102. if self.content is None:
  103. raise FileNotFoundError()
  104. return StringIO(self.content)
  105. if op == 'w':
  106. return _MockGoogleCloudBlobWriter(self)
  107. def delete(self):
  108. del self.bucket.blobs_by_path[self.name]
  109. @dataclass
  110. class _MockGoogleCloudBlobWriter:
  111. blob: _MockGoogleCloudBlob
  112. content: str | bytes = None
  113. def __enter__(self):
  114. return self
  115. def write(self, __b):
  116. assert (
  117. self.content is None
  118. ) # We don't support buffered writes in this mock for now, as it is not needed
  119. self.content = __b
  120. def __exit__(self, exc_type, exc_val, exc_tb):
  121. blob = self.blob
  122. blob.content = self.content
  123. blob.bucket.blobs_by_path[blob.name] = blob