composer.py 8.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246
  1. from __future__ import annotations
  2. import warnings
  3. from ruamel.yaml.error import MarkedYAMLError, ReusedAnchorWarning
  4. from ruamel.yaml.compat import nprint, nprintf # NOQA
  5. from ruamel.yaml.events import (
  6. StreamStartEvent,
  7. StreamEndEvent,
  8. MappingStartEvent,
  9. MappingEndEvent,
  10. SequenceStartEvent,
  11. SequenceEndEvent,
  12. AliasEvent,
  13. ScalarEvent,
  14. )
  15. from ruamel.yaml.nodes import MappingNode, ScalarNode, SequenceNode
  16. if False: # MYPY
  17. from typing import Any, Dict, Optional, List # NOQA
  18. __all__ = ['Composer', 'ComposerError', 'MaxDepthExceededError']
  19. class ComposerError(MarkedYAMLError):
  20. pass
  21. class MaxDepthExceededError(MarkedYAMLError):
  22. pass
  23. class Composer:
  24. def __init__(self, loader: Any = None) -> None:
  25. self.loader = loader
  26. if self.loader is not None and getattr(self.loader, '_composer', None) is None:
  27. self.loader._composer = self
  28. self.anchors: Dict[Any, Any] = {}
  29. self.warn_double_anchors = True
  30. self.depth = 0
  31. @property
  32. def parser(self) -> Any:
  33. if hasattr(self.loader, 'typ'):
  34. self.loader.parser
  35. return self.loader._parser
  36. @property
  37. def resolver(self) -> Any:
  38. # assert self.loader._resolver is not None
  39. if hasattr(self.loader, 'typ'):
  40. self.loader.resolver
  41. return self.loader._resolver
  42. def check_node(self) -> Any:
  43. # Drop the STREAM-START event.
  44. if self.parser.check_event(StreamStartEvent):
  45. self.parser.get_event()
  46. # If there are more documents available?
  47. return not self.parser.check_event(StreamEndEvent)
  48. def get_node(self) -> Any:
  49. # Get the root node of the next document.
  50. if not self.parser.check_event(StreamEndEvent):
  51. return self.compose_document()
  52. def get_single_node(self) -> Any:
  53. # Drop the STREAM-START event.
  54. self.parser.get_event()
  55. # Compose a document if the stream is not empty.
  56. document: Any = None
  57. if not self.parser.check_event(StreamEndEvent):
  58. document = self.compose_document()
  59. # Ensure that the stream contains no more documents.
  60. if not self.parser.check_event(StreamEndEvent):
  61. event = self.parser.get_event()
  62. raise ComposerError(
  63. 'expected a single document in the stream',
  64. document.start_mark,
  65. 'but found another document',
  66. event.start_mark,
  67. )
  68. # Drop the STREAM-END event.
  69. self.parser.get_event()
  70. return document
  71. def compose_document(self: Any) -> Any:
  72. self.anchors = {}
  73. # Drop the DOCUMENT-START event.
  74. self.parser.get_event()
  75. # Compose the root node.
  76. node = self.compose_node(None, None)
  77. # Drop the DOCUMENT-END event.
  78. self.parser.get_event()
  79. return node
  80. def return_alias(self, a: Any) -> Any:
  81. return a
  82. def compose_node(self, parent: Any, index: Any) -> Any:
  83. if self.parser.check_event(AliasEvent):
  84. event = self.parser.get_event()
  85. alias = event.anchor
  86. if alias not in self.anchors:
  87. raise ComposerError(
  88. None, None, f'found undefined alias {alias!r}', event.start_mark,
  89. )
  90. return self.return_alias(self.anchors[alias])
  91. self.depth += 1
  92. event = self.parser.peek_event()
  93. if self.loader.max_depth and self.depth > self.loader.max_depth:
  94. raise MaxDepthExceededError(
  95. None,
  96. None,
  97. f'maximum depth of data structure exceeded ({self.depth}), '
  98. 'if necessary increase YAML().max_depth',
  99. event.start_mark,
  100. )
  101. anchor = event.anchor
  102. if anchor is not None: # have an anchor
  103. if self.warn_double_anchors and anchor in self.anchors:
  104. ws = (
  105. f'\nfound duplicate anchor {anchor!r}\n'
  106. f'first occurrence {self.anchors[anchor].start_mark}\n'
  107. f'second occurrence {event.start_mark}'
  108. )
  109. warnings.warn(ws, ReusedAnchorWarning, stacklevel=2)
  110. self.resolver.descend_resolver(parent, index)
  111. if self.parser.check_event(ScalarEvent):
  112. node = self.compose_scalar_node(anchor)
  113. elif self.parser.check_event(SequenceStartEvent):
  114. node = self.compose_sequence_node(anchor)
  115. elif self.parser.check_event(MappingStartEvent):
  116. node = self.compose_mapping_node(anchor)
  117. self.resolver.ascend_resolver()
  118. self.depth -= 1
  119. return node
  120. def compose_scalar_node(self, anchor: Any) -> Any:
  121. event = self.parser.get_event()
  122. tag = event.ctag
  123. if tag is None or str(tag) == '!':
  124. tag = self.resolver.resolve(ScalarNode, event.value, event.implicit)
  125. assert not isinstance(tag, str)
  126. # e.g tag.yaml.org,2002:str
  127. node = ScalarNode(
  128. tag,
  129. event.value,
  130. event.start_mark,
  131. event.end_mark,
  132. style=event.style,
  133. comment=event.comment,
  134. anchor=anchor,
  135. )
  136. if anchor is not None:
  137. self.anchors[anchor] = node
  138. return node
  139. def compose_sequence_node(self, anchor: Any) -> Any:
  140. start_event = self.parser.get_event()
  141. tag = start_event.ctag
  142. if tag is None or str(tag) == '!':
  143. tag = self.resolver.resolve(SequenceNode, None, start_event.implicit)
  144. assert not isinstance(tag, str)
  145. node = SequenceNode(
  146. tag,
  147. [],
  148. start_event.start_mark,
  149. None,
  150. flow_style=start_event.flow_style,
  151. comment=start_event.comment,
  152. anchor=anchor,
  153. )
  154. if anchor is not None:
  155. self.anchors[anchor] = node
  156. index = 0
  157. while not self.parser.check_event(SequenceEndEvent):
  158. node.value.append(self.compose_node(node, index))
  159. index += 1
  160. end_event = self.parser.get_event()
  161. if node.flow_style is True and end_event.comment is not None:
  162. if node.comment is not None:
  163. x = node.flow_style
  164. nprint(
  165. f'Warning: unexpected end_event commment in sequence node {x}\n',
  166. ' if possible, please report an issue with reproducable data/code',
  167. )
  168. node.comment = end_event.comment
  169. node.end_mark = end_event.end_mark
  170. self.check_end_doc_comment(end_event, node)
  171. return node
  172. def compose_mapping_node(self, anchor: Any) -> Any:
  173. start_event = self.parser.get_event()
  174. tag = start_event.ctag
  175. if tag is None or str(tag) == '!':
  176. tag = self.resolver.resolve(MappingNode, None, start_event.implicit)
  177. assert not isinstance(tag, str)
  178. node = MappingNode(
  179. tag,
  180. [],
  181. start_event.start_mark,
  182. None,
  183. flow_style=start_event.flow_style,
  184. comment=start_event.comment,
  185. anchor=anchor,
  186. )
  187. if anchor is not None:
  188. self.anchors[anchor] = node
  189. while not self.parser.check_event(MappingEndEvent):
  190. # key_event = self.parser.peek_event()
  191. item_key = self.compose_node(node, None)
  192. # if item_key in node.value:
  193. # raise ComposerError("while composing a mapping",
  194. # start_event.start_mark,
  195. # "found duplicate key", key_event.start_mark)
  196. item_value = self.compose_node(node, item_key)
  197. # node.value[item_key] = item_value
  198. node.value.append((item_key, item_value))
  199. end_event = self.parser.get_event()
  200. if node.flow_style is True and end_event.comment is not None:
  201. node.comment = end_event.comment
  202. node.end_mark = end_event.end_mark
  203. self.check_end_doc_comment(end_event, node)
  204. return node
  205. def check_end_doc_comment(self, end_event: Any, node: Any) -> None:
  206. if end_event.comment and end_event.comment[1]:
  207. # pre comments on an end_event, no following to move to
  208. if node.comment is None:
  209. node.comment = [None, None]
  210. assert not isinstance(node, ScalarEvent)
  211. # this is a post comment on a mapping node, add as third element
  212. # in the list
  213. node.comment.append(end_event.comment[1])
  214. end_event.comment[1] = None