| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518 |
- from __future__ import annotations
- import sys
- import os
- import warnings
- import glob
- from importlib import import_module
- import ruamel.yaml
- from ruamel.yaml.error import UnsafeLoaderWarning, YAMLError # NOQA
- from ruamel.yaml.tokens import * # NOQA
- from ruamel.yaml.events import * # NOQA
- from ruamel.yaml.nodes import * # NOQA
- from ruamel.yaml.loader import BaseLoader, SafeLoader, Loader, RoundTripLoader # NOQA
- from ruamel.yaml.dumper import BaseDumper, SafeDumper, Dumper, RoundTripDumper # NOQA
- from ruamel.yaml.compat import StringIO, BytesIO, with_metaclass, nprint, nprintf # NOQA
- from ruamel.yaml.resolver import VersionedResolver, Resolver # NOQA
- from ruamel.yaml.representer import (
- BaseRepresenter,
- SafeRepresenter,
- Representer,
- RoundTripRepresenter,
- )
- from ruamel.yaml.constructor import (
- BaseConstructor,
- SafeConstructor,
- Constructor,
- RoundTripConstructor,
- )
- from ruamel.yaml.loader import Loader as UnsafeLoader # NOQA
- from ruamel.yaml.comments import CommentedMap, CommentedSeq, C_PRE
- from ruamel.yaml.docinfo import DocInfo, version, Version
- from typing import List, Set, Dict, Tuple, Union, Any, Callable, Optional, Text, Type # NOQA
- if False: # MYPY
- from ruamel.yaml.compat import StreamType, StreamTextType, VersionType # NOQA
- from types import TracebackType
- from pathlib import Path
- try:
- from _ruamel_yaml import CParser, CEmitter # type: ignore
- except: # NOQA
- CParser = CEmitter = None
- # import io
- # YAML is an acronym, i.e. spoken: rhymes with "camel". And thus a
- # subset of abbreviations, which should be all caps according to PEP8
- class YAML:
- def __init__(
- self: Any,
- *,
- typ: Optional[Union[List[Text], Text]] = None,
- pure: Any = False,
- output: Any = None,
- plug_ins: Any = None,
- ) -> None: # input=None,
- """
- typ: 'rt'/None -> RoundTripLoader/RoundTripDumper, (default)
- 'safe' -> SafeLoader/SafeDumper,
- 'unsafe' -> normal/unsafe Loader/Dumper (pending deprecation)
- 'full' -> full Dumper only, including python built-ins that are
- potentially unsafe to load
- 'base' -> baseloader
- pure: if True only use Python modules
- input/output: needed to work as context manager
- plug_ins: a list of plug-in files
- """
- self.typ = ['rt'] if typ is None else (typ if isinstance(typ, list) else [typ])
- self.pure = pure
- # self._input = input
- self._output = output
- self._context_manager: Any = None
- self.plug_ins: List[Any] = []
- for pu in ([] if plug_ins is None else plug_ins) + self.official_plug_ins():
- file_name = pu.replace(os.sep, '.')
- self.plug_ins.append(import_module(file_name))
- self.Resolver: Any = ruamel.yaml.resolver.VersionedResolver
- self.allow_unicode = True
- self.Reader: Any = None
- self.Representer: Any = None
- self.Constructor: Any = None
- self.Scanner: Any = None
- self.Serializer: Any = None
- self.default_flow_style: Any = None
- self.comment_handling = None
- self.max_depth = 0
- typ_found = 1
- setup_rt = False
- if 'rt' in self.typ:
- setup_rt = True
- elif 'safe' in self.typ:
- self.Emitter = (
- ruamel.yaml.emitter.Emitter if pure or CEmitter is None else CEmitter
- )
- self.Representer = ruamel.yaml.representer.SafeRepresenter
- self.Parser = ruamel.yaml.parser.Parser if pure or CParser is None else CParser
- self.Composer = ruamel.yaml.composer.Composer
- self.Constructor = ruamel.yaml.constructor.SafeConstructor
- elif 'base' in self.typ:
- self.Emitter = ruamel.yaml.emitter.Emitter
- self.Representer = ruamel.yaml.representer.BaseRepresenter
- self.Parser = ruamel.yaml.parser.Parser if pure or CParser is None else CParser
- self.Composer = ruamel.yaml.composer.Composer
- self.Constructor = ruamel.yaml.constructor.BaseConstructor
- elif 'unsafe' in self.typ:
- warnings.warn(
- "\nyou should no longer specify 'unsafe'.\nFor **dumping only** use yaml=YAML(typ='full')\n", # NOQA
- PendingDeprecationWarning,
- stacklevel=2,
- )
- self.Emitter = (
- ruamel.yaml.emitter.Emitter if pure or CEmitter is None else CEmitter
- )
- self.Representer = ruamel.yaml.representer.Representer
- self.Parser = ruamel.yaml.parser.Parser if pure or CParser is None else CParser
- self.Composer = ruamel.yaml.composer.Composer
- self.Constructor = ruamel.yaml.constructor.Constructor
- elif 'full' in self.typ:
- self.Emitter = (
- ruamel.yaml.emitter.Emitter if pure or CEmitter is None else CEmitter
- )
- self.Representer = ruamel.yaml.representer.Representer
- self.Parser = ruamel.yaml.parser.Parser if pure or CParser is None else CParser
- # self.Composer = ruamel.yaml.composer.Composer
- # self.Constructor = ruamel.yaml.constructor.Constructor
- elif 'rtsc' in self.typ:
- self.default_flow_style = False
- # no optimized rt-dumper yet
- self.Emitter = ruamel.yaml.emitter.RoundTripEmitter
- self.Serializer = ruamel.yaml.serializer.Serializer
- self.Representer = ruamel.yaml.representer.RoundTripRepresenter
- self.Scanner = ruamel.yaml.scanner.RoundTripScannerSC
- # no optimized rt-parser yet
- self.Parser = ruamel.yaml.parser.RoundTripParserSC
- self.Composer = ruamel.yaml.composer.Composer
- self.Constructor = ruamel.yaml.constructor.RoundTripConstructor
- self.comment_handling = C_PRE
- else:
- setup_rt = True
- typ_found = 0
- if setup_rt:
- self.default_flow_style = False
- # no optimized rt-dumper yet
- self.Emitter = ruamel.yaml.emitter.RoundTripEmitter
- self.Serializer = ruamel.yaml.serializer.Serializer
- self.Representer = ruamel.yaml.representer.RoundTripRepresenter
- self.Scanner = ruamel.yaml.scanner.RoundTripScanner
- # no optimized rt-parser yet
- self.Parser = ruamel.yaml.parser.RoundTripParser
- self.Composer = ruamel.yaml.composer.Composer
- self.Constructor = ruamel.yaml.constructor.RoundTripConstructor
- del setup_rt
- self.stream = None
- self.canonical = None
- self.old_indent = None
- self.width: Union[int, None] = None
- self.line_break = None
- self.map_indent: Union[int, None] = None
- self.sequence_indent: Union[int, None] = None
- self.sequence_dash_offset: int = 0
- self.compact_seq_seq = None
- self.compact_seq_map = None
- self.sort_base_mapping_type_on_output = None # default: sort
- self.top_level_colon_align = None
- self.prefix_colon = None
- self._version: Optional[Any] = None
- self.preserve_quotes: Optional[bool] = None
- self.allow_duplicate_keys = False # duplicate keys in map, set
- self.encoding = 'utf-8'
- self.explicit_start: Union[bool, None] = None
- self.explicit_end: Union[bool, None] = None
- self._tags = None
- self.doc_infos: List[DocInfo] = []
- self.default_style = None
- self.top_level_block_style_scalar_no_indent_error_1_1 = False
- # directives end indicator with single scalar document
- self.scalar_after_indicator: Optional[bool] = None
- # [a, b: 1, c: {d: 2}] vs. [a, {b: 1}, {c: {d: 2}}]
- self.brace_single_entry_mapping_in_flow_sequence = False
- for module in self.plug_ins:
- if getattr(module, 'typ', None) in self.typ:
- typ_found += 1
- module.init_typ(self)
- break
- if typ_found == 0:
- raise NotImplementedError(
- f'typ "{self.typ}" not recognised (need to install plug-in?)',
- )
- @property
- def reader(self) -> Any:
- try:
- return self._reader # type: ignore
- except AttributeError:
- self._reader = self.Reader(None, loader=self)
- return self._reader
- @property
- def scanner(self) -> Any:
- try:
- return self._scanner # type: ignore
- except AttributeError:
- if self.Scanner is None:
- raise
- self._scanner = self.Scanner(loader=self)
- return self._scanner
- @property
- def parser(self) -> Any:
- attr = '_' + sys._getframe().f_code.co_name
- if not hasattr(self, attr):
- if self.Parser is not CParser:
- setattr(self, attr, self.Parser(loader=self))
- else:
- if getattr(self, '_stream', None) is None:
- # wait for the stream
- return None
- else:
- # if not hasattr(self._stream, 'read') and hasattr(self._stream, 'open'):
- # # pathlib.Path() instance
- # setattr(self, attr, CParser(self._stream))
- # else:
- setattr(self, attr, CParser(self._stream))
- # self._parser = self._composer = self
- # nprint('scanner', self.loader.scanner)
- return getattr(self, attr)
- @property
- def composer(self) -> Any:
- attr = '_' + sys._getframe().f_code.co_name
- if not hasattr(self, attr):
- setattr(self, attr, self.Composer(loader=self))
- return getattr(self, attr)
- @property
- def constructor(self) -> Any:
- attr = '_' + sys._getframe().f_code.co_name
- if not hasattr(self, attr):
- if self.Constructor is None:
- if 'full' in self.typ:
- raise YAMLError(
- "\nyou can only use yaml=YAML(typ='full') for dumping\n", # NOQA
- )
- cnst = self.Constructor(preserve_quotes=self.preserve_quotes, loader=self) # type: ignore # NOQA
- cnst.allow_duplicate_keys = self.allow_duplicate_keys
- setattr(self, attr, cnst)
- return getattr(self, attr)
- @property
- def resolver(self) -> Any:
- try:
- rslvr = self._resolver # type: ignore
- except AttributeError:
- rslvr = None
- if rslvr is None or rslvr._loader_version != self.version:
- rslvr = self._resolver = self.Resolver(version=self.version, loader=self)
- return rslvr
- @property
- def emitter(self) -> Any:
- attr = '_' + sys._getframe().f_code.co_name
- if not hasattr(self, attr):
- if self.Emitter is not CEmitter:
- _emitter = self.Emitter(
- None,
- canonical=self.canonical,
- indent=self.old_indent,
- width=self.width,
- allow_unicode=self.allow_unicode,
- line_break=self.line_break,
- prefix_colon=self.prefix_colon,
- brace_single_entry_mapping_in_flow_sequence=self.brace_single_entry_mapping_in_flow_sequence, # NOQA
- dumper=self,
- )
- setattr(self, attr, _emitter)
- if self.map_indent is not None:
- _emitter.best_map_indent = self.map_indent
- if self.sequence_indent is not None:
- _emitter.best_sequence_indent = self.sequence_indent
- if self.sequence_dash_offset is not None:
- _emitter.sequence_dash_offset = self.sequence_dash_offset
- # _emitter.block_seq_indent = self.sequence_dash_offset
- if self.compact_seq_seq is not None:
- _emitter.compact_seq_seq = self.compact_seq_seq
- if self.compact_seq_map is not None:
- _emitter.compact_seq_map = self.compact_seq_map
- else:
- if getattr(self, '_stream', None) is None:
- # wait for the stream
- return None
- return None
- return getattr(self, attr)
- @property
- def serializer(self) -> Any:
- attr = '_' + sys._getframe().f_code.co_name
- if not hasattr(self, attr):
- setattr(
- self,
- attr,
- self.Serializer(
- encoding=self.encoding,
- explicit_start=self.explicit_start,
- explicit_end=self.explicit_end,
- version=self.version,
- tags=self.tags,
- dumper=self,
- ),
- )
- return getattr(self, attr)
- @property
- def representer(self) -> Any:
- attr = '_' + sys._getframe().f_code.co_name
- if not hasattr(self, attr):
- repres = self.Representer(
- default_style=self.default_style,
- default_flow_style=self.default_flow_style,
- dumper=self,
- )
- if self.sort_base_mapping_type_on_output is not None:
- repres.sort_base_mapping_type_on_output = self.sort_base_mapping_type_on_output
- setattr(self, attr, repres)
- return getattr(self, attr)
- def scan(self, stream: StreamTextType) -> Any:
- """
- Scan a YAML stream and produce scanning tokens.
- """
- if not hasattr(stream, 'read') and hasattr(stream, 'open'):
- # pathlib.Path() instance
- with stream.open('rb') as fp:
- return self.scan(fp)
- self.doc_infos.append(DocInfo(requested_version=version(self.version)))
- self.tags = {}
- _, parser = self.get_constructor_parser(stream)
- try:
- while self.scanner.check_token():
- yield self.scanner.get_token()
- finally:
- parser.dispose()
- for comp in ('reader', 'scanner'):
- try:
- getattr(getattr(self, '_' + comp), f'reset_{comp}')()
- except AttributeError:
- pass
- def parse(self, stream: StreamTextType) -> Any:
- """
- Parse a YAML stream and produce parsing events.
- """
- if not hasattr(stream, 'read') and hasattr(stream, 'open'):
- # pathlib.Path() instance
- with stream.open('rb') as fp:
- return self.parse(fp)
- self.doc_infos.append(DocInfo(requested_version=version(self.version)))
- self.tags = {}
- _, parser = self.get_constructor_parser(stream)
- try:
- while parser.check_event():
- yield parser.get_event()
- finally:
- parser.dispose()
- for comp in ('reader', 'scanner'):
- try:
- getattr(getattr(self, '_' + comp), f'reset_{comp}')()
- except AttributeError:
- pass
- def compose(self, stream: Union[Path, StreamTextType]) -> Any:
- """
- Parse the first YAML document in a stream
- and produce the corresponding representation tree.
- """
- if not hasattr(stream, 'read') and hasattr(stream, 'open'):
- # pathlib.Path() instance
- with stream.open('rb') as fp:
- return self.compose(fp)
- self.doc_infos.append(DocInfo(requested_version=version(self.version)))
- self.tags = {}
- constructor, parser = self.get_constructor_parser(stream)
- try:
- return constructor.composer.get_single_node()
- finally:
- parser.dispose()
- for comp in ('reader', 'scanner'):
- try:
- getattr(getattr(self, '_' + comp), f'reset_{comp}')()
- except AttributeError:
- pass
- def compose_all(self, stream: Union[Path, StreamTextType]) -> Any:
- """
- Parse all YAML documents in a stream
- and produce corresponding representation trees.
- """
- self.doc_infos.append(DocInfo(requested_version=version(self.version)))
- self.tags = {}
- constructor, parser = self.get_constructor_parser(stream)
- try:
- while constructor.composer.check_node():
- yield constructor.composer.get_node()
- finally:
- parser.dispose()
- for comp in ('reader', 'scanner'):
- try:
- getattr(getattr(self, '_' + comp), f'reset_{comp}')()
- except AttributeError:
- pass
- # separate output resolver?
- # def load(self, stream=None):
- # if self._context_manager:
- # if not self._input:
- # raise TypeError("Missing input stream while dumping from context manager")
- # for data in self._context_manager.load():
- # yield data
- # return
- # if stream is None:
- # raise TypeError("Need a stream argument when not loading from context manager")
- # return self.load_one(stream)
- def load(self, stream: Union[Path, StreamTextType]) -> Any:
- """
- at this point you either have the non-pure Parser (which has its own reader and
- scanner) or you have the pure Parser.
- If the pure Parser is set, then set the Reader and Scanner, if not already set.
- If either the Scanner or Reader are set, you cannot use the non-pure Parser,
- so reset it to the pure parser and set the Reader resp. Scanner if necessary
- """
- if not hasattr(stream, 'read') and hasattr(stream, 'open'):
- # pathlib.Path() instance
- with stream.open('rb') as fp:
- return self.load(fp)
- self.doc_infos.append(DocInfo(requested_version=version(self.version)))
- self.tags = {}
- constructor, parser = self.get_constructor_parser(stream)
- try:
- return constructor.get_single_data()
- finally:
- parser.dispose()
- for comp in ('reader', 'scanner'):
- try:
- getattr(getattr(self, '_' + comp), f'reset_{comp}')()
- except AttributeError:
- pass
- def load_all(self, stream: Union[Path, StreamTextType]) -> Any: # *, skip=None):
- if not hasattr(stream, 'read') and hasattr(stream, 'open'):
- # pathlib.Path() instance
- with stream.open('r') as fp:
- for d in self.load_all(fp):
- yield d
- return
- # if skip is None:
- # skip = []
- # elif isinstance(skip, int):
- # skip = [skip]
- self.doc_infos.append(DocInfo(requested_version=version(self.version)))
- self.tags = {}
- constructor, parser = self.get_constructor_parser(stream)
- try:
- while constructor.check_data():
- yield constructor.get_data()
- self.doc_infos.append(DocInfo(requested_version=version(self.version)))
- finally:
- parser.dispose()
- for comp in ('reader', 'scanner'):
- try:
- getattr(getattr(self, '_' + comp), f'reset_{comp}')()
- except AttributeError:
- pass
- def get_constructor_parser(self, stream: StreamTextType) -> Any:
- """
- the old cyaml needs special setup, and therefore the stream
- """
- if self.Constructor is None:
- if 'full' in self.typ:
- raise YAMLError(
- "\nyou can only use yaml=YAML(typ='full') for dumping\n", # NOQA
- )
- if self.Parser is not CParser:
- if self.Reader is None:
- self.Reader = ruamel.yaml.reader.Reader
- if self.Scanner is None:
- self.Scanner = ruamel.yaml.scanner.Scanner
- self.reader.stream = stream
- else:
- if self.Reader is not None:
- if self.Scanner is None:
- self.Scanner = ruamel.yaml.scanner.Scanner
- self.Parser = ruamel.yaml.parser.Parser
- self.reader.stream = stream
- elif self.Scanner is not None:
- if self.Reader is None:
- self.Reader = ruamel.yaml.reader.Reader
- self.Parser = ruamel.yaml.parser.Parser
- self.reader.stream = stream
- else:
- # combined C level reader>scanner>parser
- # does some calls to the resolver, e.g. BaseResolver.descend_resolver
- # if you just initialise the CParser, too much of resolver.py
- # is actually used
- rslvr = self.Resolver
- # if rslvr is ruamel.yaml.resolver.VersionedResolver:
- # rslvr = ruamel.yaml.resolver.Resolver
- class XLoader(self.Parser, self.Constructor, rslvr): # type: ignore
- def __init__(
- selfx,
- stream: StreamTextType,
- version: Optional[VersionType] = self.version,
- preserve_quotes: Optional[bool] = None,
- ) -> None:
- # NOQA
- CParser.__init__(selfx, stream)
- selfx._parser = selfx._composer = selfx
- self.Constructor.__init__(selfx, loader=selfx)
- selfx.allow_duplicate_keys = self.allow_duplicate_keys
- rslvr.__init__(selfx, version=version, loadumper=selfx)
- self._stream = stream
- loader = XLoader(stream)
- self._scanner = loader
- return loader, loader
- return self.constructor, self.parser
- def emit(self, events: Any, stream: Any) -> None:
- """
- Emit YAML parsing events into a stream.
- If stream is None, return the produced string instead.
- """
- _, _, emitter = self.get_serializer_representer_emitter(stream, None)
- try:
- for event in events:
- emitter.emit(event)
- finally:
- try:
- emitter.dispose()
- except AttributeError:
- raise
- def serialize(self, node: Any, stream: Optional[StreamType]) -> Any:
- """
- Serialize a representation tree into a YAML stream.
- If stream is None, return the produced string instead.
- """
- self.serialize_all([node], stream)
- def serialize_all(self, nodes: Any, stream: Optional[StreamType]) -> Any:
- """
- Serialize a sequence of representation trees into a YAML stream.
- If stream is None, return the produced string instead.
- """
- serializer, _, emitter = self.get_serializer_representer_emitter(stream, None)
- try:
- serializer.open()
- for node in nodes:
- serializer.serialize(node)
- serializer.close()
- finally:
- try:
- emitter.dispose()
- except AttributeError:
- raise
- def dump(
- self: Any, data: Union[Path, StreamType], stream: Any = None, *, transform: Any = None,
- ) -> Any:
- if self._context_manager:
- if not self._output:
- raise TypeError('Missing output stream while dumping from context manager')
- if transform is not None:
- x = self.__class__.__name__
- raise TypeError(
- f'{x}.dump() in the context manager cannot have transform keyword',
- )
- self._context_manager.dump(data)
- else: # old style
- if stream is None:
- raise TypeError('Need a stream argument when not dumping from context manager')
- return self.dump_all([data], stream, transform=transform)
- def dump_all(
- self, documents: Any, stream: Union[Path, StreamType], *, transform: Any = None,
- ) -> Any:
- if self._context_manager:
- raise NotImplementedError
- self._output = stream
- self._context_manager = YAMLContextManager(self, transform=transform)
- for data in documents:
- self._context_manager.dump(data)
- self._context_manager.teardown_output()
- self._output = None
- self._context_manager = None
- def Xdump_all(self, documents: Any, stream: Any, *, transform: Any = None) -> Any:
- """
- Serialize a sequence of Python objects into a YAML stream.
- """
- if not hasattr(stream, 'write') and hasattr(stream, 'open'):
- # pathlib.Path() instance
- with stream.open('w') as fp:
- return self.dump_all(documents, fp, transform=transform)
- # The stream should have the methods `write` and possibly `flush`.
- if self.top_level_colon_align is True:
- tlca: Any = max([len(str(x)) for x in documents[0]])
- else:
- tlca = self.top_level_colon_align
- if transform is not None:
- fstream = stream
- if self.encoding is None:
- stream = StringIO()
- else:
- stream = BytesIO()
- serializer, representer, emitter = self.get_serializer_representer_emitter(
- stream, tlca,
- )
- try:
- self.serializer.open()
- for data in documents:
- try:
- self.representer.represent(data)
- except AttributeError:
- # nprint(dir(dumper._representer))
- raise
- self.serializer.close()
- finally:
- try:
- self.emitter.dispose()
- except AttributeError:
- raise
- # self.dumper.dispose() # cyaml
- delattr(self, '_serializer') # NOQA
- delattr(self, '_emitter') # NOQA
- if transform:
- val = stream.getvalue()
- if self.encoding:
- val = val.decode(self.encoding)
- if fstream is None:
- transform(val)
- else:
- fstream.write(transform(val))
- return None
- def get_serializer_representer_emitter(self, stream: StreamType, tlca: Any) -> Any:
- # we have only .Serializer to deal with (vs .Reader & .Scanner), much simpler
- if self.Emitter is not CEmitter:
- if self.Serializer is None:
- self.Serializer = ruamel.yaml.serializer.Serializer
- self.emitter.stream = stream
- self.emitter.top_level_colon_align = tlca
- if self.scalar_after_indicator is not None:
- self.emitter.scalar_after_indicator = self.scalar_after_indicator
- return self.serializer, self.representer, self.emitter
- if self.Serializer is not None:
- # cannot set serializer with CEmitter
- self.Emitter = ruamel.yaml.emitter.Emitter
- self.emitter.stream = stream
- self.emitter.top_level_colon_align = tlca
- if self.scalar_after_indicator is not None:
- self.emitter.scalar_after_indicator = self.scalar_after_indicator
- return self.serializer, self.representer, self.emitter
- # C routines
- rslvr = (
- ruamel.yaml.resolver.BaseResolver
- if 'base' in self.typ
- else ruamel.yaml.resolver.Resolver
- )
- class XDumper(CEmitter, self.Representer, rslvr): # type: ignore
- def __init__(
- selfx: StreamType,
- stream: Any,
- default_style: Any = None,
- default_flow_style: Any = None,
- canonical: Optional[bool] = None,
- indent: Optional[int] = None,
- width: Optional[int] = None,
- allow_unicode: Optional[bool] = None,
- line_break: Any = None,
- encoding: Any = None,
- explicit_start: Optional[bool] = None,
- explicit_end: Optional[bool] = None,
- version: Any = None,
- tags: Any = None,
- block_seq_indent: Any = None,
- top_level_colon_align: Any = None,
- prefix_colon: Any = None,
- ) -> None:
- # NOQA
- CEmitter.__init__(
- selfx,
- stream,
- canonical=canonical,
- indent=indent,
- width=width,
- encoding=encoding,
- allow_unicode=allow_unicode,
- line_break=line_break,
- explicit_start=explicit_start,
- explicit_end=explicit_end,
- version=version,
- tags=tags,
- )
- selfx._emitter = selfx._serializer = selfx._representer = selfx
- self.Representer.__init__(
- selfx, default_style=default_style, default_flow_style=default_flow_style,
- )
- rslvr.__init__(selfx)
- self._stream = stream
- dumper = XDumper(
- stream,
- default_style=self.default_style,
- default_flow_style=self.default_flow_style,
- canonical=self.canonical,
- indent=self.old_indent,
- width=self.width,
- allow_unicode=self.allow_unicode,
- line_break=self.line_break,
- encoding=self.encoding,
- explicit_start=self.explicit_start,
- explicit_end=self.explicit_end,
- version=self.version,
- tags=self.tags,
- )
- self._emitter = self._serializer = dumper
- return dumper, dumper, dumper
- # basic types
- def map(self, **kw: Any) -> Any:
- if 'rt' in self.typ:
- return CommentedMap(**kw)
- else:
- return dict(**kw)
- def seq(self, *args: Any) -> Any:
- if 'rt' in self.typ:
- return CommentedSeq(*args)
- else:
- return list(*args)
- # helpers
- def official_plug_ins(self) -> Any:
- """search for list of subdirs that are plug-ins, if __file__ is not available, e.g.
- single file installers that are not properly emulating a file-system (issue 324)
- no plug-ins will be found. If any are packaged, you know which file that are
- and you can explicitly provide it during instantiation:
- yaml = ruamel.yaml.YAML(plug_ins=['ruamel/yaml/jinja2/__plug_in__'])
- """
- try:
- bd = os.path.dirname(__file__)
- except NameError:
- return []
- gpbd = os.path.dirname(os.path.dirname(bd))
- res = [x.replace(gpbd, "")[1:-3] for x in glob.glob(bd + '/*/__plug_in__.py')]
- return res
- def register_class(self, cls: Any) -> Any:
- """
- register a class for dumping/loading
- - if it has attribute yaml_tag use that to register, else use class name
- - if it has methods to_yaml/from_yaml use those to dump/load else dump attributes
- as mapping
- """
- tag = getattr(cls, 'yaml_tag', '!' + cls.__name__)
- try:
- self.representer.add_representer(cls, cls.to_yaml)
- except AttributeError:
- def t_y(representer: Any, data: Any) -> Any:
- return representer.represent_yaml_object(
- tag, data, cls, flow_style=representer.default_flow_style,
- )
- self.representer.add_representer(cls, t_y)
- try:
- self.constructor.add_constructor(tag, cls.from_yaml)
- except AttributeError:
- def f_y(constructor: Any, node: Any) -> Any:
- return constructor.construct_yaml_object(node, cls)
- self.constructor.add_constructor(tag, f_y)
- return cls
- # ### context manager
- def __enter__(self) -> Any:
- self._context_manager = YAMLContextManager(self)
- return self
- def __exit__(
- self,
- typ: Optional[Type[BaseException]],
- value: Optional[BaseException],
- traceback: Optional[TracebackType],
- ) -> None:
- if typ:
- nprint('typ', typ)
- self._context_manager.teardown_output()
- # self._context_manager.teardown_input()
- self._context_manager = None
- # ### backwards compatibility
- def _indent(self, mapping: Any = None, sequence: Any = None, offset: Any = None) -> None:
- if mapping is not None:
- self.map_indent = mapping
- if sequence is not None:
- self.sequence_indent = sequence
- if offset is not None:
- self.sequence_dash_offset = offset
- @property
- def version(self) -> Optional[Tuple[int, int]]:
- return self._version
- @version.setter
- def version(self, val: VersionType) -> None:
- if val is None:
- self._version = val
- return
- elif isinstance(val, str):
- sval = tuple(int(x) for x in val.split('.'))
- elif isinstance(val, (list, tuple)):
- sval = tuple(int(x) for x in val)
- elif isinstance(val, Version):
- sval = (val.major, val.minor)
- else:
- raise TypeError(f'unknown version type {type(val)}')
- assert len(sval) == 2, f'version can only have major.minor, got {val}'
- assert sval[0] == 1, f'version major part can only be 1, got {val}'
- assert sval[1] in [1, 2], f'version minor part can only be 2 or 1, got {val}'
- self._version = sval
- @property
- def tags(self) -> Any:
- return self._tags
- @tags.setter
- def tags(self, val: Any) -> None:
- self._tags = val
- @property
- def indent(self) -> Any:
- return self._indent
- @indent.setter
- def indent(self, val: Any) -> None:
- self.old_indent = val
- @property
- def block_seq_indent(self) -> Any:
- return self.sequence_dash_offset
- @block_seq_indent.setter
- def block_seq_indent(self, val: Any) -> None:
- self.sequence_dash_offset = val
- def compact(self, seq_seq: Any = None, seq_map: Any = None) -> None:
- self.compact_seq_seq = seq_seq
- self.compact_seq_map = seq_map
- class YAMLContextManager:
- def __init__(self, yaml: Any, transform: Any = None) -> None:
- # used to be: (Any, Optional[Callable]) -> None
- self._yaml = yaml
- self._output_inited = False
- self._output_path = None
- self._output = self._yaml._output
- self._transform = transform
- # self._input_inited = False
- # self._input = input
- # self._input_path = None
- # self._transform = yaml.transform
- # self._fstream = None
- if not hasattr(self._output, 'write') and hasattr(self._output, 'open'):
- # pathlib.Path() instance, open with the same mode
- self._output_path = self._output
- self._output = self._output_path.open('w')
- # if not hasattr(self._stream, 'write') and hasattr(stream, 'open'):
- # if not hasattr(self._input, 'read') and hasattr(self._input, 'open'):
- # # pathlib.Path() instance, open with the same mode
- # self._input_path = self._input
- # self._input = self._input_path.open('r')
- if self._transform is not None:
- self._fstream = self._output
- if self._yaml.encoding is None:
- self._output = StringIO()
- else:
- self._output = BytesIO()
- def teardown_output(self) -> None:
- if self._output_inited:
- self._yaml.serializer.close()
- else:
- return
- try:
- self._yaml.emitter.dispose()
- except AttributeError:
- raise
- # self.dumper.dispose() # cyaml
- try:
- delattr(self._yaml, '_serializer') # NOQA
- delattr(self._yaml, '_emitter') # NOQA
- except AttributeError:
- raise
- if self._transform:
- val = self._output.getvalue()
- if self._yaml.encoding:
- val = val.decode(self._yaml.encoding)
- if self._fstream is None:
- self._transform(val)
- else:
- self._fstream.write(self._transform(val))
- self._fstream.flush()
- self._output = self._fstream # maybe not necessary
- if self._output_path is not None:
- self._output.close()
- def init_output(self, first_data: Any) -> None:
- if self._yaml.top_level_colon_align is True:
- tlca: Any = max([len(str(x)) for x in first_data])
- else:
- tlca = self._yaml.top_level_colon_align
- self._yaml.get_serializer_representer_emitter(self._output, tlca)
- self._yaml.serializer.open()
- self._output_inited = True
- def dump(self, data: Any) -> None:
- if not self._output_inited:
- self.init_output(data)
- try:
- self._yaml.representer.represent(data)
- except AttributeError:
- # nprint(dir(dumper._representer))
- raise
- # def teardown_input(self):
- # pass
- #
- # def init_input(self):
- # # set the constructor and parser on YAML() instance
- # self._yaml.get_constructor_parser(stream)
- #
- # def load(self):
- # if not self._input_inited:
- # self.init_input()
- # try:
- # while self._yaml.constructor.check_data():
- # yield self._yaml.constructor.get_data()
- # finally:
- # parser.dispose()
- # try:
- # self._reader.reset_reader() # type: ignore
- # except AttributeError:
- # pass
- # try:
- # self._scanner.reset_scanner() # type: ignore
- # except AttributeError:
- # pass
- def yaml_object(yml: Any) -> Any:
- """ decorator for classes that needs to dump/load objects
- The tag for such objects is taken from the class attribute yaml_tag (or the
- class name in lowercase in case unavailable)
- If methods to_yaml and/or from_yaml are available, these are called for dumping resp.
- loading, default routines (dumping a mapping of the attributes) used otherwise.
- """
- def yo_deco(cls: Any) -> Any:
- tag = getattr(cls, 'yaml_tag', '!' + cls.__name__)
- try:
- yml.representer.add_representer(cls, cls.to_yaml)
- except AttributeError:
- def t_y(representer: Any, data: Any) -> Any:
- return representer.represent_yaml_object(
- tag, data, cls, flow_style=representer.default_flow_style,
- )
- yml.representer.add_representer(cls, t_y)
- try:
- yml.constructor.add_constructor(tag, cls.from_yaml)
- except AttributeError:
- def f_y(constructor: Any, node: Any) -> Any:
- return constructor.construct_yaml_object(node, cls)
- yml.constructor.add_constructor(tag, f_y)
- return cls
- return yo_deco
- ########################################################################################
- def warn_deprecation(fun: Any, method: Any, arg: str = '') -> None:
- warnings.warn(
- f'\n{fun} will be removed, use\n\n yaml=YAML({arg})\n yaml.{method}(...)\n\ninstead', # NOQA
- PendingDeprecationWarning, # this will show when testing with pytest/tox
- stacklevel=3,
- )
- def error_deprecation(fun: Any, method: Any, arg: str = '', comment: str = 'instead of') -> None: # NOQA
- import inspect
- s = f'\n"{fun}()" has been removed, use\n\n yaml = YAML({arg})\n yaml.{method}(...)\n\n{comment}' # NOQA
- try:
- info = inspect.getframeinfo(inspect.stack()[2][0])
- context = '' if info.code_context is None else "".join(info.code_context)
- s += f' file "{info.filename}", line {info.lineno}\n\n{context}'
- except Exception as e:
- _ = e
- s += '\n'
- if sys.version_info < (3, 10):
- raise AttributeError(s)
- else:
- raise AttributeError(s, name=None)
- _error_dep_arg = "typ='rt'"
- _error_dep_comment = "and register any classes that you use, or check the tag attribute on the loaded data,\ninstead of" # NOQA
- ########################################################################################
- def scan(stream: StreamTextType, Loader: Any = Loader) -> Any:
- """
- Scan a YAML stream and produce scanning tokens.
- """
- error_deprecation('scan', 'scan', arg=_error_dep_arg, comment=_error_dep_comment)
- def parse(stream: StreamTextType, Loader: Any = Loader) -> Any:
- """
- Parse a YAML stream and produce parsing events.
- """
- error_deprecation('parse', 'parse', arg=_error_dep_arg, comment=_error_dep_comment)
- def compose(stream: StreamTextType, Loader: Any = Loader) -> Any:
- """
- Parse the first YAML document in a stream
- and produce the corresponding representation tree.
- """
- error_deprecation('compose', 'compose', arg=_error_dep_arg, comment=_error_dep_comment)
- def compose_all(stream: StreamTextType, Loader: Any = Loader) -> Any:
- """
- Parse all YAML documents in a stream
- and produce corresponding representation trees.
- """
- error_deprecation('compose', 'compose', arg=_error_dep_arg, comment=_error_dep_comment)
- def load(
- stream: Any, Loader: Any = None, version: Any = None, preserve_quotes: Any = None,
- ) -> Any:
- """
- Parse the first YAML document in a stream
- and produce the corresponding Python object.
- """
- error_deprecation('load', 'load', arg=_error_dep_arg, comment=_error_dep_comment)
- def load_all(
- stream: Any, Loader: Any = None, version: Any = None, preserve_quotes: Any = None,
- ) -> Any:
- # NOQA
- """
- Parse all YAML documents in a stream
- and produce corresponding Python objects.
- """
- error_deprecation('load_all', 'load_all', arg=_error_dep_arg, comment=_error_dep_comment)
- def safe_load(stream: StreamTextType, version: Optional[VersionType] = None) -> Any:
- """
- Parse the first YAML document in a stream
- and produce the corresponding Python object.
- Resolve only basic YAML tags.
- """
- error_deprecation('safe_load', 'load', arg="typ='safe', pure=True")
- def safe_load_all(stream: StreamTextType, version: Optional[VersionType] = None) -> Any:
- """
- Parse all YAML documents in a stream
- and produce corresponding Python objects.
- Resolve only basic YAML tags.
- """
- error_deprecation('safe_load_all', 'load_all', arg="typ='safe', pure=True")
- def round_trip_load(
- stream: StreamTextType,
- version: Optional[VersionType] = None,
- preserve_quotes: Optional[bool] = None,
- ) -> Any:
- """
- Parse the first YAML document in a stream
- and produce the corresponding Python object.
- Resolve only basic YAML tags.
- """
- error_deprecation('round_trip_load_all', 'load')
- def round_trip_load_all(
- stream: StreamTextType,
- version: Optional[VersionType] = None,
- preserve_quotes: Optional[bool] = None,
- ) -> Any:
- """
- Parse all YAML documents in a stream
- and produce corresponding Python objects.
- Resolve only basic YAML tags.
- """
- error_deprecation('round_trip_load_all', 'load_all')
- def emit(
- events: Any,
- stream: Optional[StreamType] = None,
- Dumper: Any = Dumper,
- canonical: Optional[bool] = None,
- indent: Union[int, None] = None,
- width: Optional[int] = None,
- allow_unicode: Optional[bool] = None,
- line_break: Any = None,
- ) -> Any:
- # NOQA
- """
- Emit YAML parsing events into a stream.
- If stream is None, return the produced string instead.
- """
- error_deprecation('emit', 'emit', arg="typ='safe', pure=True")
- enc = None
- def serialize_all(
- nodes: Any,
- stream: Optional[StreamType] = None,
- Dumper: Any = Dumper,
- canonical: Any = None,
- indent: Optional[int] = None,
- width: Optional[int] = None,
- allow_unicode: Optional[bool] = None,
- line_break: Any = None,
- encoding: Any = enc,
- explicit_start: Optional[bool] = None,
- explicit_end: Optional[bool] = None,
- version: Optional[VersionType] = None,
- tags: Any = None,
- ) -> Any:
- # NOQA
- """
- Serialize a sequence of representation trees into a YAML stream.
- If stream is None, return the produced string instead.
- """
- error_deprecation('serialize_all', 'serialize_all', arg="typ='safe', pure=True")
- def serialize(
- node: Any, stream: Optional[StreamType] = None, Dumper: Any = Dumper, **kwds: Any,
- ) -> Any:
- """
- Serialize a representation tree into a YAML stream.
- If stream is None, return the produced string instead.
- """
- error_deprecation('serialize', 'serialize', arg="typ='safe', pure=True")
- def dump_all(
- documents: Any,
- stream: Optional[StreamType] = None,
- Dumper: Any = Dumper,
- default_style: Any = None,
- default_flow_style: Any = None,
- canonical: Optional[bool] = None,
- indent: Optional[int] = None,
- width: Optional[int] = None,
- allow_unicode: Optional[bool] = None,
- line_break: Any = None,
- encoding: Any = enc,
- explicit_start: Optional[bool] = None,
- explicit_end: Optional[bool] = None,
- version: Any = None,
- tags: Any = None,
- block_seq_indent: Any = None,
- top_level_colon_align: Any = None,
- prefix_colon: Any = None,
- ) -> Any:
- # NOQA
- """
- Serialize a sequence of Python objects into a YAML stream.
- If stream is None, return the produced string instead.
- """
- error_deprecation('dump_all', 'dump_all', arg="typ='unsafe', pure=True")
- def dump(
- data: Any,
- stream: Optional[StreamType] = None,
- Dumper: Any = Dumper,
- default_style: Any = None,
- default_flow_style: Any = None,
- canonical: Optional[bool] = None,
- indent: Optional[int] = None,
- width: Optional[int] = None,
- allow_unicode: Optional[bool] = None,
- line_break: Any = None,
- encoding: Any = enc,
- explicit_start: Optional[bool] = None,
- explicit_end: Optional[bool] = None,
- version: Optional[VersionType] = None,
- tags: Any = None,
- block_seq_indent: Any = None,
- ) -> Any:
- # NOQA
- """
- Serialize a Python object into a YAML stream.
- If stream is None, return the produced string instead.
- default_style ∈ None, '', '"', "'", '|', '>'
- """
- error_deprecation('dump', 'dump', arg="typ='unsafe', pure=True")
- def safe_dump(data: Any, stream: Optional[StreamType] = None, **kwds: Any) -> Any:
- """
- Serialize a Python object into a YAML stream.
- Produce only basic YAML tags.
- If stream is None, return the produced string instead.
- """
- error_deprecation('safe_dump', 'dump', arg="typ='safe', pure=True")
- def round_trip_dump(
- data: Any,
- stream: Optional[StreamType] = None,
- Dumper: Any = RoundTripDumper,
- default_style: Any = None,
- default_flow_style: Any = None,
- canonical: Optional[bool] = None,
- indent: Optional[int] = None,
- width: Optional[int] = None,
- allow_unicode: Optional[bool] = None,
- line_break: Any = None,
- encoding: Any = enc,
- explicit_start: Optional[bool] = None,
- explicit_end: Optional[bool] = None,
- version: Optional[VersionType] = None,
- tags: Any = None,
- block_seq_indent: Any = None,
- top_level_colon_align: Any = None,
- prefix_colon: Any = None,
- ) -> Any:
- allow_unicode = True if allow_unicode is None else allow_unicode
- error_deprecation('round_trip_dump', 'dump')
- # Loader/Dumper are no longer composites, to get to the associated
- # Resolver()/Representer(), etc., you need to instantiate the class
- def add_implicit_resolver(
- tag: Any,
- regexp: Any,
- first: Any = None,
- Loader: Any = None,
- Dumper: Any = None,
- resolver: Any = Resolver,
- ) -> None:
- """
- Add an implicit scalar detector.
- If an implicit scalar value matches the given regexp,
- the corresponding tag is assigned to the scalar.
- first is a sequence of possible initial characters or None.
- """
- if Loader is None and Dumper is None:
- resolver.add_implicit_resolver(tag, regexp, first)
- return
- if Loader:
- if hasattr(Loader, 'add_implicit_resolver'):
- Loader.add_implicit_resolver(tag, regexp, first)
- elif issubclass(
- Loader, (BaseLoader, SafeLoader, ruamel.yaml.loader.Loader, RoundTripLoader),
- ):
- Resolver.add_implicit_resolver(tag, regexp, first)
- else:
- raise NotImplementedError
- if Dumper:
- if hasattr(Dumper, 'add_implicit_resolver'):
- Dumper.add_implicit_resolver(tag, regexp, first)
- elif issubclass(
- Dumper, (BaseDumper, SafeDumper, ruamel.yaml.dumper.Dumper, RoundTripDumper),
- ):
- Resolver.add_implicit_resolver(tag, regexp, first)
- else:
- raise NotImplementedError
- # this code currently not tested
- def add_path_resolver(
- tag: Any,
- path: Any,
- kind: Any = None,
- Loader: Any = None,
- Dumper: Any = None,
- resolver: Any = Resolver,
- ) -> None:
- """
- Add a path based resolver for the given tag.
- A path is a list of keys that forms a path
- to a node in the representation tree.
- Keys can be string values, integers, or None.
- """
- if Loader is None and Dumper is None:
- resolver.add_path_resolver(tag, path, kind)
- return
- if Loader:
- if hasattr(Loader, 'add_path_resolver'):
- Loader.add_path_resolver(tag, path, kind)
- elif issubclass(
- Loader, (BaseLoader, SafeLoader, ruamel.yaml.loader.Loader, RoundTripLoader),
- ):
- Resolver.add_path_resolver(tag, path, kind)
- else:
- raise NotImplementedError
- if Dumper:
- if hasattr(Dumper, 'add_path_resolver'):
- Dumper.add_path_resolver(tag, path, kind)
- elif issubclass(
- Dumper, (BaseDumper, SafeDumper, ruamel.yaml.dumper.Dumper, RoundTripDumper),
- ):
- Resolver.add_path_resolver(tag, path, kind)
- else:
- raise NotImplementedError
- def add_constructor(
- tag: Any, object_constructor: Any, Loader: Any = None, constructor: Any = Constructor,
- ) -> None:
- """
- Add an object constructor for the given tag.
- object_onstructor is a function that accepts a Loader instance
- and a node object and produces the corresponding Python object.
- """
- if Loader is None:
- constructor.add_constructor(tag, object_constructor)
- else:
- if hasattr(Loader, 'add_constructor'):
- Loader.add_constructor(tag, object_constructor)
- return
- if issubclass(Loader, BaseLoader):
- BaseConstructor.add_constructor(tag, object_constructor)
- elif issubclass(Loader, SafeLoader):
- SafeConstructor.add_constructor(tag, object_constructor)
- elif issubclass(Loader, Loader):
- Constructor.add_constructor(tag, object_constructor)
- elif issubclass(Loader, RoundTripLoader):
- RoundTripConstructor.add_constructor(tag, object_constructor)
- else:
- raise NotImplementedError
- def add_multi_constructor(
- tag_prefix: Any, multi_constructor: Any, Loader: Any = None, constructor: Any = Constructor, # NOQA
- ) -> None:
- """
- Add a multi-constructor for the given tag prefix.
- Multi-constructor is called for a node if its tag starts with tag_prefix.
- Multi-constructor accepts a Loader instance, a tag suffix,
- and a node object and produces the corresponding Python object.
- """
- if Loader is None:
- constructor.add_multi_constructor(tag_prefix, multi_constructor)
- else:
- if False and hasattr(Loader, 'add_multi_constructor'):
- Loader.add_multi_constructor(tag_prefix, constructor)
- return
- if issubclass(Loader, BaseLoader):
- BaseConstructor.add_multi_constructor(tag_prefix, multi_constructor)
- elif issubclass(Loader, SafeLoader):
- SafeConstructor.add_multi_constructor(tag_prefix, multi_constructor)
- elif issubclass(Loader, ruamel.yaml.loader.Loader):
- Constructor.add_multi_constructor(tag_prefix, multi_constructor)
- elif issubclass(Loader, RoundTripLoader):
- RoundTripConstructor.add_multi_constructor(tag_prefix, multi_constructor)
- else:
- raise NotImplementedError
- def add_representer(
- data_type: Any, object_representer: Any, Dumper: Any = None, representer: Any = Representer, # NOQA
- ) -> None:
- """
- Add a representer for the given type.
- object_representer is a function accepting a Dumper instance
- and an instance of the given data type
- and producing the corresponding representation node.
- """
- if Dumper is None:
- representer.add_representer(data_type, object_representer)
- else:
- if hasattr(Dumper, 'add_representer'):
- Dumper.add_representer(data_type, object_representer)
- return
- if issubclass(Dumper, BaseDumper):
- BaseRepresenter.add_representer(data_type, object_representer)
- elif issubclass(Dumper, SafeDumper):
- SafeRepresenter.add_representer(data_type, object_representer)
- elif issubclass(Dumper, Dumper):
- Representer.add_representer(data_type, object_representer)
- elif issubclass(Dumper, RoundTripDumper):
- RoundTripRepresenter.add_representer(data_type, object_representer)
- else:
- raise NotImplementedError
- # this code currently not tested
- def add_multi_representer(
- data_type: Any, multi_representer: Any, Dumper: Any = None, representer: Any = Representer,
- ) -> None:
- """
- Add a representer for the given type.
- multi_representer is a function accepting a Dumper instance
- and an instance of the given data type or subtype
- and producing the corresponding representation node.
- """
- if Dumper is None:
- representer.add_multi_representer(data_type, multi_representer)
- else:
- if hasattr(Dumper, 'add_multi_representer'):
- Dumper.add_multi_representer(data_type, multi_representer)
- return
- if issubclass(Dumper, BaseDumper):
- BaseRepresenter.add_multi_representer(data_type, multi_representer)
- elif issubclass(Dumper, SafeDumper):
- SafeRepresenter.add_multi_representer(data_type, multi_representer)
- elif issubclass(Dumper, Dumper):
- Representer.add_multi_representer(data_type, multi_representer)
- elif issubclass(Dumper, RoundTripDumper):
- RoundTripRepresenter.add_multi_representer(data_type, multi_representer)
- else:
- raise NotImplementedError
- class YAMLObjectMetaclass(type):
- """
- The metaclass for YAMLObject.
- """
- def __init__(cls, name: Any, bases: Any, kwds: Any) -> None:
- super().__init__(name, bases, kwds)
- if 'yaml_tag' in kwds and kwds['yaml_tag'] is not None:
- cls.yaml_constructor.add_constructor(cls.yaml_tag, cls.from_yaml) # type: ignore
- cls.yaml_representer.add_representer(cls, cls.to_yaml) # type: ignore
- class YAMLObject(with_metaclass(YAMLObjectMetaclass)): # type: ignore
- """
- An object that can dump itself to a YAML stream
- and load itself from a YAML stream.
- """
- __slots__ = () # no direct instantiation, so allow immutable subclasses
- yaml_constructor = Constructor
- yaml_representer = Representer
- yaml_tag: Any = None
- yaml_flow_style: Any = None
- @classmethod
- def from_yaml(cls, constructor: Any, node: Any) -> Any:
- """
- Convert a representation node to a Python object.
- """
- return constructor.construct_yaml_object(node, cls)
- @classmethod
- def to_yaml(cls, representer: Any, data: Any) -> Any:
- """
- Convert a Python object to a representation node.
- """
- return representer.represent_yaml_object(
- cls.yaml_tag, data, cls, flow_style=cls.yaml_flow_style,
- )
|