phoneme_tokenizer.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526
  1. import logging
  2. from pathlib import Path
  3. import re
  4. from typing import Iterable
  5. from typing import List
  6. from typing import Optional
  7. from typing import Union
  8. import warnings
  9. # import g2p_en
  10. import jamo
  11. from funasr.text.abs_tokenizer import AbsTokenizer
  12. g2p_choices = [
  13. None,
  14. "g2p_en",
  15. "g2p_en_no_space",
  16. "pyopenjtalk",
  17. "pyopenjtalk_kana",
  18. "pyopenjtalk_accent",
  19. "pyopenjtalk_accent_with_pause",
  20. "pyopenjtalk_prosody",
  21. "pypinyin_g2p",
  22. "pypinyin_g2p_phone",
  23. "espeak_ng_arabic",
  24. "espeak_ng_german",
  25. "espeak_ng_french",
  26. "espeak_ng_spanish",
  27. "espeak_ng_russian",
  28. "espeak_ng_greek",
  29. "espeak_ng_finnish",
  30. "espeak_ng_hungarian",
  31. "espeak_ng_dutch",
  32. "espeak_ng_english_us_vits",
  33. "espeak_ng_hindi",
  34. "g2pk",
  35. "g2pk_no_space",
  36. "korean_jaso",
  37. "korean_jaso_no_space",
  38. ]
  39. def split_by_space(text) -> List[str]:
  40. if " " in text:
  41. text = text.replace(" ", " <space> ")
  42. return [c.replace("<space>", " ") for c in text.split(" ")]
  43. else:
  44. return text.split(" ")
  45. def pyopenjtalk_g2p(text) -> List[str]:
  46. import pyopenjtalk
  47. # phones is a str object separated by space
  48. phones = pyopenjtalk.g2p(text, kana=False)
  49. phones = phones.split(" ")
  50. return phones
  51. def pyopenjtalk_g2p_accent(text) -> List[str]:
  52. import pyopenjtalk
  53. import re
  54. phones = []
  55. for labels in pyopenjtalk.run_frontend(text)[1]:
  56. p = re.findall(r"\-(.*?)\+.*?\/A:([0-9\-]+).*?\/F:.*?_([0-9]+)", labels)
  57. if len(p) == 1:
  58. phones += [p[0][0], p[0][2], p[0][1]]
  59. return phones
  60. def pyopenjtalk_g2p_accent_with_pause(text) -> List[str]:
  61. import pyopenjtalk
  62. import re
  63. phones = []
  64. for labels in pyopenjtalk.run_frontend(text)[1]:
  65. if labels.split("-")[1].split("+")[0] == "pau":
  66. phones += ["pau"]
  67. continue
  68. p = re.findall(r"\-(.*?)\+.*?\/A:([0-9\-]+).*?\/F:.*?_([0-9]+)", labels)
  69. if len(p) == 1:
  70. phones += [p[0][0], p[0][2], p[0][1]]
  71. return phones
  72. def pyopenjtalk_g2p_kana(text) -> List[str]:
  73. import pyopenjtalk
  74. kanas = pyopenjtalk.g2p(text, kana=True)
  75. return list(kanas)
  76. def pyopenjtalk_g2p_prosody(text: str, drop_unvoiced_vowels: bool = True) -> List[str]:
  77. """Extract phoneme + prosoody symbol sequence from input full-context labels.
  78. The algorithm is based on `Prosodic features control by symbols as input of
  79. sequence-to-sequence acoustic modeling for neural TTS`_ with some r9y9's tweaks.
  80. Args:
  81. text (str): Input text.
  82. drop_unvoiced_vowels (bool): whether to drop unvoiced vowels.
  83. Returns:
  84. List[str]: List of phoneme + prosody symbols.
  85. Examples:
  86. >>> from funasr.text.phoneme_tokenizer import pyopenjtalk_g2p_prosody
  87. >>> pyopenjtalk_g2p_prosody("こんにちは。")
  88. ['^', 'k', 'o', '[', 'N', 'n', 'i', 'ch', 'i', 'w', 'a', '$']
  89. .. _`Prosodic features control by symbols as input of sequence-to-sequence acoustic
  90. modeling for neural TTS`: https://doi.org/10.1587/transinf.2020EDP7104
  91. """
  92. import pyopenjtalk
  93. labels = pyopenjtalk.run_frontend(text)[1]
  94. N = len(labels)
  95. phones = []
  96. for n in range(N):
  97. lab_curr = labels[n]
  98. # current phoneme
  99. p3 = re.search(r"\-(.*?)\+", lab_curr).group(1)
  100. # deal unvoiced vowels as normal vowels
  101. if drop_unvoiced_vowels and p3 in "AEIOU":
  102. p3 = p3.lower()
  103. # deal with sil at the beginning and the end of text
  104. if p3 == "sil":
  105. assert n == 0 or n == N - 1
  106. if n == 0:
  107. phones.append("^")
  108. elif n == N - 1:
  109. # check question form or not
  110. e3 = _numeric_feature_by_regex(r"!(\d+)_", lab_curr)
  111. if e3 == 0:
  112. phones.append("$")
  113. elif e3 == 1:
  114. phones.append("?")
  115. continue
  116. elif p3 == "pau":
  117. phones.append("_")
  118. continue
  119. else:
  120. phones.append(p3)
  121. # accent type and position info (forward or backward)
  122. a1 = _numeric_feature_by_regex(r"/A:([0-9\-]+)\+", lab_curr)
  123. a2 = _numeric_feature_by_regex(r"\+(\d+)\+", lab_curr)
  124. a3 = _numeric_feature_by_regex(r"\+(\d+)/", lab_curr)
  125. # number of mora in accent phrase
  126. f1 = _numeric_feature_by_regex(r"/F:(\d+)_", lab_curr)
  127. a2_next = _numeric_feature_by_regex(r"\+(\d+)\+", labels[n + 1])
  128. # accent phrase border
  129. if a3 == 1 and a2_next == 1 and p3 in "aeiouAEIOUNcl":
  130. phones.append("#")
  131. # pitch falling
  132. elif a1 == 0 and a2_next == a2 + 1 and a2 != f1:
  133. phones.append("]")
  134. # pitch rising
  135. elif a2 == 1 and a2_next == 2:
  136. phones.append("[")
  137. return phones
  138. def _numeric_feature_by_regex(regex, s):
  139. match = re.search(regex, s)
  140. if match is None:
  141. return -50
  142. return int(match.group(1))
  143. def pypinyin_g2p(text) -> List[str]:
  144. from pypinyin import pinyin
  145. from pypinyin import Style
  146. phones = [phone[0] for phone in pinyin(text, style=Style.TONE3)]
  147. return phones
  148. def pypinyin_g2p_phone(text) -> List[str]:
  149. from pypinyin import pinyin
  150. from pypinyin import Style
  151. from pypinyin.style._utils import get_finals
  152. from pypinyin.style._utils import get_initials
  153. phones = [
  154. p
  155. for phone in pinyin(text, style=Style.TONE3)
  156. for p in [
  157. get_initials(phone[0], strict=True),
  158. get_finals(phone[0], strict=True),
  159. ]
  160. if len(p) != 0
  161. ]
  162. return phones
  163. class G2p_en:
  164. """On behalf of g2p_en.G2p.
  165. g2p_en.G2p isn't pickalable and it can't be copied to the other processes
  166. via multiprocessing module.
  167. As a workaround, g2p_en.G2p is instantiated upon calling this class.
  168. """
  169. def __init__(self, no_space: bool = False):
  170. self.no_space = no_space
  171. self.g2p = None
  172. def __call__(self, text) -> List[str]:
  173. if self.g2p is None:
  174. self.g2p = g2p_en.G2p()
  175. phones = self.g2p(text)
  176. if self.no_space:
  177. # remove space which represents word serapater
  178. phones = list(filter(lambda s: s != " ", phones))
  179. return phones
  180. class G2pk:
  181. """On behalf of g2pk.G2p.
  182. g2pk.G2p isn't pickalable and it can't be copied to the other processes
  183. via multiprocessing module.
  184. As a workaround, g2pk.G2p is instantiated upon calling this class.
  185. """
  186. def __init__(
  187. self, descritive=False, group_vowels=False, to_syl=False, no_space=False
  188. ):
  189. self.descritive = descritive
  190. self.group_vowels = group_vowels
  191. self.to_syl = to_syl
  192. self.no_space = no_space
  193. self.g2p = None
  194. def __call__(self, text) -> List[str]:
  195. if self.g2p is None:
  196. import g2pk
  197. self.g2p = g2pk.G2p()
  198. phones = list(
  199. self.g2p(
  200. text,
  201. descriptive=self.descritive,
  202. group_vowels=self.group_vowels,
  203. to_syl=self.to_syl,
  204. )
  205. )
  206. if self.no_space:
  207. # remove space which represents word serapater
  208. phones = list(filter(lambda s: s != " ", phones))
  209. return phones
  210. class Jaso:
  211. PUNC = "!'(),-.:;?"
  212. SPACE = " "
  213. JAMO_LEADS = "".join([chr(_) for _ in range(0x1100, 0x1113)])
  214. JAMO_VOWELS = "".join([chr(_) for _ in range(0x1161, 0x1176)])
  215. JAMO_TAILS = "".join([chr(_) for _ in range(0x11A8, 0x11C3)])
  216. VALID_CHARS = JAMO_LEADS + JAMO_VOWELS + JAMO_TAILS + PUNC + SPACE
  217. def __init__(self, space_symbol=" ", no_space=False):
  218. self.space_symbol = space_symbol
  219. self.no_space = no_space
  220. def _text_to_jaso(self, line: str) -> List[str]:
  221. jasos = list(jamo.hangul_to_jamo(line))
  222. return jasos
  223. def _remove_non_korean_characters(self, tokens):
  224. new_tokens = [token for token in tokens if token in self.VALID_CHARS]
  225. return new_tokens
  226. def __call__(self, text) -> List[str]:
  227. graphemes = [x for x in self._text_to_jaso(text)]
  228. graphemes = self._remove_non_korean_characters(graphemes)
  229. if self.no_space:
  230. graphemes = list(filter(lambda s: s != " ", graphemes))
  231. else:
  232. graphemes = [x if x != " " else self.space_symbol for x in graphemes]
  233. return graphemes
  234. class Phonemizer:
  235. """Phonemizer module for various languages.
  236. This is wrapper module of https://github.com/bootphon/phonemizer.
  237. You can define various g2p modules by specifying options for phonemizer.
  238. See available options:
  239. https://github.com/bootphon/phonemizer/blob/master/phonemizer/phonemize.py#L32
  240. """
  241. def __init__(
  242. self,
  243. backend,
  244. word_separator: Optional[str] = None,
  245. syllable_separator: Optional[str] = None,
  246. phone_separator: Optional[str] = " ",
  247. strip=False,
  248. split_by_single_token: bool = False,
  249. **phonemizer_kwargs,
  250. ):
  251. # delayed import
  252. from phonemizer.backend import BACKENDS
  253. from phonemizer.separator import Separator
  254. self.separator = Separator(
  255. word=word_separator,
  256. syllable=syllable_separator,
  257. phone=phone_separator,
  258. )
  259. # define logger to suppress the warning in phonemizer
  260. logger = logging.getLogger("phonemizer")
  261. logger.setLevel(logging.ERROR)
  262. self.phonemizer = BACKENDS[backend](
  263. **phonemizer_kwargs,
  264. logger=logger,
  265. )
  266. self.strip = strip
  267. self.split_by_single_token = split_by_single_token
  268. def __call__(self, text) -> List[str]:
  269. tokens = self.phonemizer.phonemize(
  270. [text],
  271. separator=self.separator,
  272. strip=self.strip,
  273. njobs=1,
  274. )[0]
  275. if not self.split_by_single_token:
  276. return tokens.split()
  277. else:
  278. # "a: ab" -> ["a", ":", "<space>", "a", "b"]
  279. # TODO(kan-bayashi): space replacement should be dealt in PhonemeTokenizer
  280. return [c.replace(" ", "<space>") for c in tokens]
  281. class PhonemeTokenizer(AbsTokenizer):
  282. def __init__(
  283. self,
  284. g2p_type: Union[None, str],
  285. non_linguistic_symbols: Union[Path, str, Iterable[str]] = None,
  286. space_symbol: str = "<space>",
  287. remove_non_linguistic_symbols: bool = False,
  288. ):
  289. if g2p_type is None:
  290. self.g2p = split_by_space
  291. elif g2p_type == "g2p_en":
  292. self.g2p = G2p_en(no_space=False)
  293. elif g2p_type == "g2p_en_no_space":
  294. self.g2p = G2p_en(no_space=True)
  295. elif g2p_type == "pyopenjtalk":
  296. self.g2p = pyopenjtalk_g2p
  297. elif g2p_type == "pyopenjtalk_kana":
  298. self.g2p = pyopenjtalk_g2p_kana
  299. elif g2p_type == "pyopenjtalk_accent":
  300. self.g2p = pyopenjtalk_g2p_accent
  301. elif g2p_type == "pyopenjtalk_accent_with_pause":
  302. self.g2p = pyopenjtalk_g2p_accent_with_pause
  303. elif g2p_type == "pyopenjtalk_prosody":
  304. self.g2p = pyopenjtalk_g2p_prosody
  305. elif g2p_type == "pypinyin_g2p":
  306. self.g2p = pypinyin_g2p
  307. elif g2p_type == "pypinyin_g2p_phone":
  308. self.g2p = pypinyin_g2p_phone
  309. elif g2p_type == "espeak_ng_arabic":
  310. self.g2p = Phonemizer(
  311. language="ar",
  312. backend="espeak",
  313. with_stress=True,
  314. preserve_punctuation=True,
  315. )
  316. elif g2p_type == "espeak_ng_german":
  317. self.g2p = Phonemizer(
  318. language="de",
  319. backend="espeak",
  320. with_stress=True,
  321. preserve_punctuation=True,
  322. )
  323. elif g2p_type == "espeak_ng_french":
  324. self.g2p = Phonemizer(
  325. language="fr-fr",
  326. backend="espeak",
  327. with_stress=True,
  328. preserve_punctuation=True,
  329. )
  330. elif g2p_type == "espeak_ng_spanish":
  331. self.g2p = Phonemizer(
  332. language="es",
  333. backend="espeak",
  334. with_stress=True,
  335. preserve_punctuation=True,
  336. )
  337. elif g2p_type == "espeak_ng_russian":
  338. self.g2p = Phonemizer(
  339. language="ru",
  340. backend="espeak",
  341. with_stress=True,
  342. preserve_punctuation=True,
  343. )
  344. elif g2p_type == "espeak_ng_greek":
  345. self.g2p = Phonemizer(
  346. language="el",
  347. backend="espeak",
  348. with_stress=True,
  349. preserve_punctuation=True,
  350. )
  351. elif g2p_type == "espeak_ng_finnish":
  352. self.g2p = Phonemizer(
  353. language="fi",
  354. backend="espeak",
  355. with_stress=True,
  356. preserve_punctuation=True,
  357. )
  358. elif g2p_type == "espeak_ng_hungarian":
  359. self.g2p = Phonemizer(
  360. language="hu",
  361. backend="espeak",
  362. with_stress=True,
  363. preserve_punctuation=True,
  364. )
  365. elif g2p_type == "espeak_ng_dutch":
  366. self.g2p = Phonemizer(
  367. language="nl",
  368. backend="espeak",
  369. with_stress=True,
  370. preserve_punctuation=True,
  371. )
  372. elif g2p_type == "espeak_ng_hindi":
  373. self.g2p = Phonemizer(
  374. language="hi",
  375. backend="espeak",
  376. with_stress=True,
  377. preserve_punctuation=True,
  378. )
  379. elif g2p_type == "g2pk":
  380. self.g2p = G2pk(no_space=False)
  381. elif g2p_type == "g2pk_no_space":
  382. self.g2p = G2pk(no_space=True)
  383. elif g2p_type == "espeak_ng_english_us_vits":
  384. # VITS official implementation-like processing
  385. # Reference: https://github.com/jaywalnut310/vits
  386. self.g2p = Phonemizer(
  387. language="en-us",
  388. backend="espeak",
  389. with_stress=True,
  390. preserve_punctuation=True,
  391. strip=True,
  392. word_separator=" ",
  393. phone_separator="",
  394. split_by_single_token=True,
  395. )
  396. elif g2p_type == "korean_jaso":
  397. self.g2p = Jaso(space_symbol=space_symbol, no_space=False)
  398. elif g2p_type == "korean_jaso_no_space":
  399. self.g2p = Jaso(no_space=True)
  400. else:
  401. raise NotImplementedError(f"Not supported: g2p_type={g2p_type}")
  402. self.g2p_type = g2p_type
  403. self.space_symbol = space_symbol
  404. if non_linguistic_symbols is None:
  405. self.non_linguistic_symbols = set()
  406. elif isinstance(non_linguistic_symbols, (Path, str)):
  407. non_linguistic_symbols = Path(non_linguistic_symbols)
  408. try:
  409. with non_linguistic_symbols.open("r", encoding="utf-8") as f:
  410. self.non_linguistic_symbols = set(line.rstrip() for line in f)
  411. except FileNotFoundError:
  412. warnings.warn(f"{non_linguistic_symbols} doesn't exist.")
  413. self.non_linguistic_symbols = set()
  414. else:
  415. self.non_linguistic_symbols = set(non_linguistic_symbols)
  416. self.remove_non_linguistic_symbols = remove_non_linguistic_symbols
  417. def __repr__(self):
  418. return (
  419. f"{self.__class__.__name__}("
  420. f'g2p_type="{self.g2p_type}", '
  421. f'space_symbol="{self.space_symbol}", '
  422. f'non_linguistic_symbols="{self.non_linguistic_symbols}"'
  423. ")"
  424. )
  425. def text2tokens(self, line: str) -> List[str]:
  426. tokens = []
  427. while len(line) != 0:
  428. for w in self.non_linguistic_symbols:
  429. if line.startswith(w):
  430. if not self.remove_non_linguistic_symbols:
  431. tokens.append(line[: len(w)])
  432. line = line[len(w) :]
  433. break
  434. else:
  435. t = line[0]
  436. tokens.append(t)
  437. line = line[1:]
  438. line = "".join(tokens)
  439. tokens = self.g2p(line)
  440. return tokens
  441. def tokens2text(self, tokens: Iterable[str]) -> str:
  442. # phoneme type is not invertible
  443. return "".join(tokens)