file.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344
  1. # Copyright (c) Alibaba, Inc. and its affiliates.
  2. import contextlib
  3. import os
  4. import tempfile
  5. from abc import ABCMeta, abstractmethod
  6. from pathlib import Path
  7. from typing import Generator, Union
  8. import requests
  9. from urllib.parse import urlparse
  10. def download_from_url(url):
  11. result = urlparse(url)
  12. file_path = None
  13. if result.scheme is not None and len(result.scheme) > 0:
  14. storage = HTTPStorage()
  15. # bytes
  16. data = storage.read(url)
  17. work_dir = tempfile.TemporaryDirectory().name
  18. if not os.path.exists(work_dir):
  19. os.makedirs(work_dir)
  20. file_path = os.path.join(work_dir, os.path.basename(url))
  21. with open(file_path, 'wb') as fb:
  22. fb.write(data)
  23. assert file_path is not None, f"failed to download: {url}"
  24. return file_path
  25. class Storage(metaclass=ABCMeta):
  26. """Abstract class of storage.
  27. All backends need to implement two apis: ``read()`` and ``read_text()``.
  28. ``read()`` reads the file as a byte stream and ``read_text()`` reads
  29. the file as texts.
  30. """
  31. @abstractmethod
  32. def read(self, filepath: str):
  33. pass
  34. @abstractmethod
  35. def read_text(self, filepath: str):
  36. pass
  37. @abstractmethod
  38. def write(self, obj: bytes, filepath: Union[str, Path]) -> None:
  39. pass
  40. @abstractmethod
  41. def write_text(self,
  42. obj: str,
  43. filepath: Union[str, Path],
  44. encoding: str = 'utf-8') -> None:
  45. pass
  46. class LocalStorage(Storage):
  47. """Local hard disk storage"""
  48. def read(self, filepath: Union[str, Path]) -> bytes:
  49. """Read data from a given ``filepath`` with 'rb' mode.
  50. Args:
  51. filepath (str or Path): Path to read data.
  52. Returns:
  53. bytes: Expected bytes object.
  54. """
  55. with open(filepath, 'rb') as f:
  56. content = f.read()
  57. return content
  58. def read_text(self,
  59. filepath: Union[str, Path],
  60. encoding: str = 'utf-8') -> str:
  61. """Read data from a given ``filepath`` with 'r' mode.
  62. Args:
  63. filepath (str or Path): Path to read data.
  64. encoding (str): The encoding format used to open the ``filepath``.
  65. Default: 'utf-8'.
  66. Returns:
  67. str: Expected text reading from ``filepath``.
  68. """
  69. with open(filepath, 'r', encoding=encoding) as f:
  70. value_buf = f.read()
  71. return value_buf
  72. def write(self, obj: bytes, filepath: Union[str, Path]) -> None:
  73. """Write data to a given ``filepath`` with 'wb' mode.
  74. Note:
  75. ``write`` will create a directory if the directory of ``filepath``
  76. does not exist.
  77. Args:
  78. obj (bytes): Data to be written.
  79. filepath (str or Path): Path to write data.
  80. """
  81. dirname = os.path.dirname(filepath)
  82. if dirname and not os.path.exists(dirname):
  83. os.makedirs(dirname, exist_ok=True)
  84. with open(filepath, 'wb') as f:
  85. f.write(obj)
  86. def write_text(self,
  87. obj: str,
  88. filepath: Union[str, Path],
  89. encoding: str = 'utf-8') -> None:
  90. """Write data to a given ``filepath`` with 'w' mode.
  91. Note:
  92. ``write_text`` will create a directory if the directory of
  93. ``filepath`` does not exist.
  94. Args:
  95. obj (str): Data to be written.
  96. filepath (str or Path): Path to write data.
  97. encoding (str): The encoding format used to open the ``filepath``.
  98. Default: 'utf-8'.
  99. """
  100. dirname = os.path.dirname(filepath)
  101. if dirname and not os.path.exists(dirname):
  102. os.makedirs(dirname, exist_ok=True)
  103. with open(filepath, 'w', encoding=encoding) as f:
  104. f.write(obj)
  105. @contextlib.contextmanager
  106. def as_local_path(
  107. self,
  108. filepath: Union[str,
  109. Path]) -> Generator[Union[str, Path], None, None]:
  110. """Only for unified API and do nothing."""
  111. yield filepath
  112. class HTTPStorage(Storage):
  113. """HTTP and HTTPS storage."""
  114. def read(self, url):
  115. # TODO @wenmeng.zwm add progress bar if file is too large
  116. r = requests.get(url)
  117. r.raise_for_status()
  118. return r.content
  119. def read_text(self, url):
  120. r = requests.get(url)
  121. r.raise_for_status()
  122. return r.text
  123. @contextlib.contextmanager
  124. def as_local_path(
  125. self, filepath: str) -> Generator[Union[str, Path], None, None]:
  126. """Download a file from ``filepath``.
  127. ``as_local_path`` is decorated by :meth:`contextlib.contextmanager`. It
  128. can be called with ``with`` statement, and when exists from the
  129. ``with`` statement, the temporary path will be released.
  130. Args:
  131. filepath (str): Download a file from ``filepath``.
  132. Examples:
  133. >>> storage = HTTPStorage()
  134. >>> # After existing from the ``with`` clause,
  135. >>> # the path will be removed
  136. >>> with storage.get_local_path('http://path/to/file') as path:
  137. ... # do something here
  138. """
  139. try:
  140. f = tempfile.NamedTemporaryFile(delete=False)
  141. f.write(self.read(filepath))
  142. f.close()
  143. yield f.name
  144. finally:
  145. os.remove(f.name)
  146. def write(self, obj: bytes, url: Union[str, Path]) -> None:
  147. raise NotImplementedError('write is not supported by HTTP Storage')
  148. def write_text(self,
  149. obj: str,
  150. url: Union[str, Path],
  151. encoding: str = 'utf-8') -> None:
  152. raise NotImplementedError(
  153. 'write_text is not supported by HTTP Storage')
  154. class OSSStorage(Storage):
  155. """OSS storage."""
  156. def __init__(self, oss_config_file=None):
  157. # read from config file or env var
  158. raise NotImplementedError(
  159. 'OSSStorage.__init__ to be implemented in the future')
  160. def read(self, filepath):
  161. raise NotImplementedError(
  162. 'OSSStorage.read to be implemented in the future')
  163. def read_text(self, filepath, encoding='utf-8'):
  164. raise NotImplementedError(
  165. 'OSSStorage.read_text to be implemented in the future')
  166. @contextlib.contextmanager
  167. def as_local_path(
  168. self, filepath: str) -> Generator[Union[str, Path], None, None]:
  169. """Download a file from ``filepath``.
  170. ``as_local_path`` is decorated by :meth:`contextlib.contextmanager`. It
  171. can be called with ``with`` statement, and when exists from the
  172. ``with`` statement, the temporary path will be released.
  173. Args:
  174. filepath (str): Download a file from ``filepath``.
  175. Examples:
  176. >>> storage = OSSStorage()
  177. >>> # After existing from the ``with`` clause,
  178. >>> # the path will be removed
  179. >>> with storage.get_local_path('http://path/to/file') as path:
  180. ... # do something here
  181. """
  182. try:
  183. f = tempfile.NamedTemporaryFile(delete=False)
  184. f.write(self.read(filepath))
  185. f.close()
  186. yield f.name
  187. finally:
  188. os.remove(f.name)
  189. def write(self, obj: bytes, filepath: Union[str, Path]) -> None:
  190. raise NotImplementedError(
  191. 'OSSStorage.write to be implemented in the future')
  192. def write_text(self,
  193. obj: str,
  194. filepath: Union[str, Path],
  195. encoding: str = 'utf-8') -> None:
  196. raise NotImplementedError(
  197. 'OSSStorage.write_text to be implemented in the future')
  198. G_STORAGES = {}
  199. class File(object):
  200. _prefix_to_storage: dict = {
  201. 'oss': OSSStorage,
  202. 'http': HTTPStorage,
  203. 'https': HTTPStorage,
  204. 'local': LocalStorage,
  205. }
  206. @staticmethod
  207. def _get_storage(uri):
  208. assert isinstance(uri,
  209. str), f'uri should be str type, but got {type(uri)}'
  210. if '://' not in uri:
  211. # local path
  212. storage_type = 'local'
  213. else:
  214. prefix, _ = uri.split('://')
  215. storage_type = prefix
  216. assert storage_type in File._prefix_to_storage, \
  217. f'Unsupported uri {uri}, valid prefixs: '\
  218. f'{list(File._prefix_to_storage.keys())}'
  219. if storage_type not in G_STORAGES:
  220. G_STORAGES[storage_type] = File._prefix_to_storage[storage_type]()
  221. return G_STORAGES[storage_type]
  222. @staticmethod
  223. def read(uri: str) -> bytes:
  224. """Read data from a given ``filepath`` with 'rb' mode.
  225. Args:
  226. filepath (str or Path): Path to read data.
  227. Returns:
  228. bytes: Expected bytes object.
  229. """
  230. storage = File._get_storage(uri)
  231. return storage.read(uri)
  232. @staticmethod
  233. def read_text(uri: Union[str, Path], encoding: str = 'utf-8') -> str:
  234. """Read data from a given ``filepath`` with 'r' mode.
  235. Args:
  236. filepath (str or Path): Path to read data.
  237. encoding (str): The encoding format used to open the ``filepath``.
  238. Default: 'utf-8'.
  239. Returns:
  240. str: Expected text reading from ``filepath``.
  241. """
  242. storage = File._get_storage(uri)
  243. return storage.read_text(uri)
  244. @staticmethod
  245. def write(obj: bytes, uri: Union[str, Path]) -> None:
  246. """Write data to a given ``filepath`` with 'wb' mode.
  247. Note:
  248. ``write`` will create a directory if the directory of ``filepath``
  249. does not exist.
  250. Args:
  251. obj (bytes): Data to be written.
  252. filepath (str or Path): Path to write data.
  253. """
  254. storage = File._get_storage(uri)
  255. return storage.write(obj, uri)
  256. @staticmethod
  257. def write_text(obj: str, uri: str, encoding: str = 'utf-8') -> None:
  258. """Write data to a given ``filepath`` with 'w' mode.
  259. Note:
  260. ``write_text`` will create a directory if the directory of
  261. ``filepath`` does not exist.
  262. Args:
  263. obj (str): Data to be written.
  264. filepath (str or Path): Path to write data.
  265. encoding (str): The encoding format used to open the ``filepath``.
  266. Default: 'utf-8'.
  267. """
  268. storage = File._get_storage(uri)
  269. return storage.write_text(obj, uri)
  270. @contextlib.contextmanager
  271. def as_local_path(uri: str) -> Generator[Union[str, Path], None, None]:
  272. """Only for unified API and do nothing."""
  273. storage = File._get_storage(uri)
  274. with storage.as_local_path(uri) as local_path:
  275. yield local_path