Untitled
unknown
plain_text
2 years ago
44 kB
7
Indexable
# noinspection PyUnresolvedReferences
import builtins # noqa
import collections
import collections.abc
import datetime
import enum
import ipaddress
import os
import pathlib
import typing
import uuid
# noinspection PyUnresolvedReferences
from base64 import decodebytes, encodebytes # noqa
from contextlib import contextmanager, suppress
# noinspection PyProtectedMember
from dataclasses import _FIELDS, MISSING, Field, is_dataclass # type: ignore
from decimal import Decimal
from fractions import Fraction
from mashumaro.config import TO_DICT_ADD_OMIT_NONE_FLAG, BaseConfig
# noinspection PyUnresolvedReferences
from mashumaro.exceptions import ( # noqa
BadHookSignature,
InvalidFieldValue,
MissingField,
UnserializableDataError,
UnserializableField,
)
from mashumaro.meta.helpers import (
get_class_that_define_method,
get_imported_module_names,
get_type_origin,
is_class_var,
is_generic,
is_init_var,
is_special_typing_primitive,
is_type_var,
is_union,
type_name,
)
from mashumaro.meta.macros import PY_37_MIN
from mashumaro.meta.patch import patch_fromisoformat
from mashumaro.serializer.base.helpers import * # noqa
from mashumaro.types import SerializableType, SerializationStrategy
patch_fromisoformat()
NoneType = type(None)
INITIAL_MODULES = get_imported_module_names()
__PRE_SERIALIZE__ = "__pre_serialize__"
__PRE_DESERIALIZE__ = "__pre_deserialize__"
__POST_SERIALIZE__ = "__post_serialize__"
__POST_DESERIALIZE__ = "__post_deserialize__"
DataClassDictMixinPath = "mashumaro.serializer.base.dict.DataClassDictMixin"
class CodeLines:
def __init__(self):
self._lines: typing.List[str] = []
self._current_indent: str = ""
def append(self, line: str):
self._lines.append(f"{self._current_indent}{line}")
@contextmanager
def indent(self) -> typing.Generator[None, None, None]:
self._current_indent += " " * 4
try:
yield
finally:
self._current_indent = self._current_indent[:-4]
def as_text(self) -> str:
return "\n".join(self._lines)
def reset(self):
self._lines = []
self._current_indent = ""
class CodeBuilder:
def __init__(self, cls):
self.cls = cls
self.lines: CodeLines = CodeLines()
self.modules: typing.Set[str] = set()
self.globals: typing.Set[str] = set()
def reset(self) -> None:
self.lines.reset()
self.modules = INITIAL_MODULES.copy()
self.globals = set()
@property
def namespace(self) -> typing.Dict[typing.Any, typing.Any]:
return self.cls.__dict__
@property
def annotations(self) -> typing.Dict[str, typing.Any]:
return self.namespace.get("__annotations__", {})
def __get_field_types(
self, recursive=True
) -> typing.Dict[str, typing.Any]:
fields = {}
for fname, ftype in typing.get_type_hints(self.cls).items():
if is_class_var(ftype) or is_init_var(ftype):
continue
if recursive or fname in self.annotations:
fields[fname] = ftype
return fields
@property
def field_types(self) -> typing.Dict[str, typing.Any]:
return self.__get_field_types()
@property
def defaults(self) -> typing.Dict[str, typing.Any]:
d = {}
for ancestor in self.cls.__mro__[-1:0:-1]:
if is_dataclass(ancestor):
for field in getattr(ancestor, _FIELDS).values():
if field.default is not MISSING:
d[field.name] = field.default
else:
d[field.name] = field.default_factory
for name in self.__get_field_types(recursive=False):
field = self.namespace.get(name, MISSING)
if isinstance(field, Field):
if field.default is not MISSING:
d[name] = field.default
else:
# https://github.com/python/mypy/issues/6910
d[name] = field.default_factory # type: ignore
else:
d[name] = field
return d
@property
def metadatas(self) -> typing.Dict[str, typing.Mapping[str, typing.Any]]:
d = {}
for ancestor in self.cls.__mro__[-1:0:-1]:
if is_dataclass(ancestor):
for field in getattr(ancestor, _FIELDS).values():
d[field.name] = field.metadata
for name in self.__get_field_types(recursive=False):
field = self.namespace.get(name, MISSING)
if isinstance(field, Field):
d[name] = field.metadata
return d
def _add_type_modules(self, *types_) -> None:
for t in types_:
module = getattr(t, "__module__", None)
if not module:
return
self.ensure_module_imported(module)
args = getattr(t, "__args__", ())
if args:
self._add_type_modules(*args)
constraints = getattr(t, "__constraints__", ())
if constraints:
self._add_type_modules(*constraints)
def ensure_module_imported(self, module: str) -> None:
if module not in self.modules:
self.modules.add(module)
self.add_line(f"if '{module}' not in globals():")
with self.indent():
self.add_line(f"import {module}")
root_module = module.split(".")[0]
if root_module not in self.globals:
self.globals.add(root_module)
self.add_line("else:")
with self.indent():
self.add_line(f"global {root_module}")
def add_line(self, line: str) -> None:
self.lines.append(line)
@contextmanager
def indent(self) -> typing.Generator[None, None, None]:
with self.lines.indent():
yield
def compile(self) -> None:
code = self.lines.as_text()
if self.get_config().debug:
print(self.cls)
print(code)
exec(code, globals(), self.__dict__)
def get_declared_hook(self, method_name: str):
if not hasattr(self.cls, method_name):
return
cls = get_class_that_define_method(method_name, self.cls)
if type_name(cls) != DataClassDictMixinPath:
return cls.__dict__[method_name]
def add_from_dict(self) -> None:
self.reset()
self.add_line("@classmethod")
self.add_line(
"def from_dict(cls, d, use_bytes=False, use_enum=False, "
"use_datetime=False):"
)
with self.indent():
pre_deserialize = self.get_declared_hook(__PRE_DESERIALIZE__)
if pre_deserialize:
if not isinstance(pre_deserialize, classmethod):
raise BadHookSignature(
f"`{__PRE_DESERIALIZE__}` must be a class method with "
f"Callable[[Dict[Any, Any]], Dict[Any, Any]] signature"
)
else:
self.add_line(f"d = cls.{__PRE_DESERIALIZE__}(d)")
self.add_line("try:")
with self.indent():
self.add_line("kwargs = {}")
for fname, ftype in self.field_types.items():
metadata = self.metadatas.get(fname)
self._add_type_modules(ftype)
self.add_line(f"value = d.get('{fname}', MISSING)")
self.add_line("if value is None:")
with self.indent():
self.add_line(f"kwargs['{fname}'] = None")
self.add_line("else:")
with self.indent():
if self.defaults[fname] is MISSING:
self.add_line("if value is MISSING:")
with self.indent():
self.add_line(
f"raise MissingField('{fname}',"
f"{type_name(ftype)},cls)"
)
self.add_line("else:")
with self.indent():
unpacked_value = self._unpack_field_value(
fname=fname,
ftype=ftype,
parent=self.cls,
metadata=metadata,
)
self.add_line("try:")
with self.indent():
self.add_line(
f"kwargs['{fname}'] = {unpacked_value}"
)
self.add_line("except Exception as e:")
with self.indent():
field_type = type_name(ftype)
self.add_line(
f"raise InvalidFieldValue('{fname}',"
f"{field_type},value,cls)"
)
else:
self.add_line("if value is not MISSING:")
with self.indent():
unpacked_value = self._unpack_field_value(
fname=fname,
ftype=ftype,
parent=self.cls,
metadata=metadata,
)
self.add_line("try:")
with self.indent():
self.add_line(
f"kwargs['{fname}'] = {unpacked_value}"
)
self.add_line("except Exception as e:")
with self.indent():
field_type = type_name(ftype)
self.add_line(
f"raise InvalidFieldValue('{fname}',"
f"{field_type},value,cls)"
)
self.add_line("except AttributeError:")
with self.indent():
self.add_line("if not isinstance(d, dict):")
with self.indent():
self.add_line(
f"raise ValueError('Argument for "
f"{type_name(self.cls)}.from_dict method "
f"should be a dict instance') from None"
)
self.add_line("else:")
with self.indent():
self.add_line("raise")
post_deserialize = self.get_declared_hook(__POST_DESERIALIZE__)
if post_deserialize:
if not isinstance(post_deserialize, classmethod):
raise BadHookSignature(
f"`{__POST_DESERIALIZE__}` must be a class method "
f"with Callable[[{type_name(self.cls)}], "
f"{type_name(self.cls)}] signature"
)
else:
self.add_line(
f"return cls.{__POST_DESERIALIZE__}(cls(**kwargs))"
)
else:
self.add_line("return cls(**kwargs)")
self.add_line("setattr(cls, 'from_dict', from_dict)")
self.compile()
def get_config(self, cls=None) -> typing.Type[BaseConfig]:
if cls is None:
cls = self.cls
config_cls = getattr(cls, "Config", BaseConfig)
if not issubclass(config_cls, BaseConfig):
config_cls = type(
"Config",
(BaseConfig, config_cls),
{**BaseConfig.__dict__, **config_cls.__dict__},
)
return config_cls
def get_to_dict_flags(self, cls=None) -> str:
config = self.get_config(cls)
code_generation_options = config.code_generation_options
pluggable_flags = []
if TO_DICT_ADD_OMIT_NONE_FLAG in code_generation_options:
pluggable_flags.append("omit_none")
return ",".join(
["use_bytes", "use_enum", "use_datetime", *pluggable_flags]
)
def get_to_dict_default_flag_values(self, cls=None) -> str:
pluggable_flags = []
omit_none_feature = (
TO_DICT_ADD_OMIT_NONE_FLAG
in self.get_config(cls).code_generation_options
)
if omit_none_feature:
pluggable_flags.append("omit_none")
if pluggable_flags:
pluggable_flags_str = ", " + ", ".join(
[f"{f}=False" for f in pluggable_flags]
)
else:
pluggable_flags_str = ""
return (
f"use_bytes=False, use_enum=False, use_datetime=False"
f"{pluggable_flags_str}"
)
def add_to_dict(self) -> None:
self.reset()
omit_none_feature = (
TO_DICT_ADD_OMIT_NONE_FLAG
in self.get_config().code_generation_options
)
self.add_line(
f"def to_dict(self, {self.get_to_dict_default_flag_values()}):"
)
with self.indent():
pre_serialize = self.get_declared_hook(__PRE_SERIALIZE__)
if pre_serialize:
self.add_line(f"self = self.{__PRE_SERIALIZE__}()")
self.add_line("kwargs = {}")
for fname, ftype in self.field_types.items():
metadata = self.metadatas.get(fname)
self.add_line(f"value = getattr(self, '{fname}')")
self.add_line("if value is None:")
with self.indent():
if omit_none_feature:
self.add_line("if not omit_none:")
with self.indent():
self.add_line(f"kwargs['{fname}'] = None")
else:
self.add_line(f"kwargs['{fname}'] = None")
self.add_line("else:")
with self.indent():
packed_value = self._pack_value(
fname=fname,
ftype=ftype,
parent=self.cls,
metadata=metadata,
)
self.add_line(f"kwargs['{fname}'] = {packed_value}")
post_serialize = self.get_declared_hook(__POST_SERIALIZE__)
if post_serialize:
self.add_line(f"return self.{__POST_SERIALIZE__}(kwargs)")
else:
self.add_line("return kwargs")
self.add_line("setattr(cls, 'to_dict', to_dict)")
self.compile()
def _pack_value(
self, fname, ftype, parent, value_name="value", metadata=None
):
serialize_option: typing.Optional[typing.Any] = None
overridden: typing.Optional[str] = None
if metadata is not None:
serialize_option = metadata.get("serialize")
if serialize_option is None:
strategy = metadata.get("serialization_strategy")
if isinstance(strategy, SerializationStrategy):
serialize_option = strategy.serialize
if serialize_option is None:
strategy = self.get_config().serialization_strategy.get(ftype)
if isinstance(strategy, dict):
serialize_option = strategy.get("serialize")
elif isinstance(strategy, SerializationStrategy):
serialize_option = strategy.serialize
if callable(serialize_option):
setattr(
self.cls,
f"__{fname}_serialize",
staticmethod(serialize_option),
)
overridden = f"self.__{fname}_serialize(self.{fname})"
if is_dataclass(ftype):
flags = self.get_to_dict_flags(ftype)
return overridden or f"{value_name}.to_dict({flags})"
with suppress(TypeError):
if issubclass(ftype, SerializableType):
return overridden or f"{value_name}._serialize()"
origin_type = get_type_origin(ftype)
if is_special_typing_primitive(origin_type):
if origin_type is typing.Any:
return overridden or value_name
elif is_union(ftype):
args = getattr(ftype, "__args__", ())
if len(args) == 2 and args[1] == NoneType: # it is Optional
return self._pack_value(
fname, args[0], parent, metadata=metadata
)
else:
method_name = self._add_pack_union(
fname, ftype, args, parent, metadata
)
return (
f"self.{method_name}({value_name},"
f"{self.get_to_dict_flags()})"
)
elif origin_type is typing.AnyStr:
raise UnserializableDataError(
"AnyStr is not supported by mashumaro"
)
elif is_type_var(ftype):
raise UnserializableDataError(
"TypeVars are not supported by mashumaro"
)
else:
raise UnserializableDataError(
f"{ftype} as a field type is not supported by mashumaro"
)
elif origin_type is int:
return overridden or f"int({value_name})"
elif origin_type is float:
return overridden or f"float({value_name})"
elif origin_type in (bool, NoneType):
return overridden or value_name
elif origin_type in (datetime.datetime, datetime.date, datetime.time):
if overridden:
return f"{value_name} if use_datetime else {overridden}"
return (
f"{value_name} if use_datetime else {value_name}.isoformat()"
)
elif origin_type is datetime.timedelta:
return overridden or f"{value_name}.total_seconds()"
elif origin_type is datetime.timezone:
return overridden or f"{value_name}.tzname(None)"
elif origin_type is uuid.UUID:
return overridden or f"str({value_name})"
elif origin_type in [
ipaddress.IPv4Address,
ipaddress.IPv6Address,
ipaddress.IPv4Network,
ipaddress.IPv6Network,
ipaddress.IPv4Interface,
ipaddress.IPv6Interface,
]:
return overridden or f"str({value_name})"
elif origin_type is Decimal:
return overridden or f"str({value_name})"
elif origin_type is Fraction:
return overridden or f"str({value_name})"
elif issubclass(origin_type, typing.Collection) and not issubclass(
origin_type, enum.Enum
):
args = getattr(ftype, "__args__", ())
def inner_expr(arg_num=0, v_name="value", v_type=None):
if v_type:
return self._pack_value(fname, v_type, parent, v_name)
else:
return self._pack_value(
fname, args[arg_num], parent, v_name
)
if issubclass(origin_type, typing.ByteString):
specific = f"encodebytes({value_name}).decode()"
return (
f"{value_name} if use_bytes else {overridden or specific}"
)
elif issubclass(origin_type, str):
return overridden or value_name
elif issubclass(
origin_type,
(typing.List, typing.Deque, typing.Tuple, typing.AbstractSet),
):
if is_generic(ftype):
return (
overridden
or f"[{inner_expr()} for value in {value_name}]"
)
elif ftype is list:
raise UnserializableField(
fname, ftype, parent, "Use typing.List[T] instead"
)
elif ftype is collections.deque:
raise UnserializableField(
fname, ftype, parent, "Use typing.Deque[T] instead"
)
elif ftype is tuple:
raise UnserializableField(
fname, ftype, parent, "Use typing.Tuple[T] instead"
)
elif ftype is set:
raise UnserializableField(
fname, ftype, parent, "Use typing.Set[T] instead"
)
elif ftype is frozenset:
raise UnserializableField(
fname, ftype, parent, "Use typing.FrozenSet[T] instead"
)
elif issubclass(origin_type, typing.ChainMap):
if ftype is collections.ChainMap:
raise UnserializableField(
fname,
ftype,
parent,
"Use typing.ChainMap[KT,VT] instead",
)
elif is_generic(ftype):
if is_dataclass(args[0]):
raise UnserializableDataError(
"ChainMaps with dataclasses as keys "
"are not supported by mashumaro"
)
else:
return (
overridden
or f'[{{{inner_expr(0,"key")}:{inner_expr(1)} '
f"for key,value in m.items()}} "
f"for m in value.maps]"
)
elif PY_37_MIN and issubclass(origin_type, typing.OrderedDict):
if ftype is collections.OrderedDict:
raise UnserializableField(
fname,
ftype,
parent,
"Use typing.OrderedDict[KT,VT] instead",
)
elif is_generic(ftype):
if is_dataclass(args[0]):
raise UnserializableDataError(
"OrderedDict with dataclasses as keys "
"are not supported by mashumaro"
)
else:
return (
overridden
or f'{{{inner_expr(0, "key")}: {inner_expr(1)} '
f"for key, value in {value_name}.items()}}"
)
elif issubclass(origin_type, typing.Counter):
if ftype is collections.Counter:
raise UnserializableField(
fname,
ftype,
parent,
"Use typing.Counter[KT] instead",
)
elif is_generic(ftype):
if is_dataclass(args[0]):
raise UnserializableDataError(
"Counter with dataclasses as keys "
"are not supported by mashumaro"
)
else:
return (
overridden
or f'{{{inner_expr(0, "key")}: '
f"{inner_expr(1, v_type=int)} "
f"for key, value in {value_name}.items()}}"
)
elif issubclass(origin_type, typing.Mapping):
if ftype is dict:
raise UnserializableField(
fname,
ftype,
parent,
"Use typing.Dict[KT,VT] or Mapping[KT,VT] instead",
)
elif is_generic(ftype):
if is_dataclass(args[0]):
raise UnserializableDataError(
"Mappings with dataclasses as keys "
"are not supported by mashumaro"
)
else:
return (
overridden
or f'{{{inner_expr(0,"key")}: {inner_expr(1)} '
f"for key, value in {value_name}.items()}}"
)
elif issubclass(origin_type, typing.Sequence):
if is_generic(ftype):
return (
overridden
or f"[{inner_expr()} for value in {value_name}]"
)
elif issubclass(origin_type, os.PathLike):
return overridden or f"{value_name}.__fspath__()"
elif issubclass(origin_type, enum.Enum):
specific = f"{value_name}.value"
return f"{value_name} if use_enum else {overridden or specific}"
elif overridden:
return overridden
raise UnserializableField(fname, ftype, parent)
def _unpack_field_value(
self, fname, ftype, parent, value_name="value", metadata=None
):
deserialize_option: typing.Optional[typing.Any] = None
overridden: typing.Optional[str] = None
if metadata is not None:
deserialize_option = metadata.get("deserialize")
if deserialize_option is None:
strategy = metadata.get("serialization_strategy")
if isinstance(strategy, SerializationStrategy):
deserialize_option = strategy.deserialize
if deserialize_option is None:
strategy = self.get_config().serialization_strategy.get(ftype)
if isinstance(strategy, dict):
deserialize_option = strategy.get("deserialize")
elif isinstance(strategy, SerializationStrategy):
deserialize_option = strategy.deserialize
if callable(deserialize_option):
setattr(self.cls, f"__{fname}_deserialize", deserialize_option)
overridden = f"cls.__{fname}_deserialize({value_name})"
if is_dataclass(ftype):
return overridden or (
f"{type_name(ftype)}.from_dict({value_name}, "
f"use_bytes, use_enum, use_datetime)"
)
with suppress(TypeError):
if issubclass(ftype, SerializableType):
return (
overridden
or f"{type_name(ftype)}._deserialize({value_name})"
)
origin_type = get_type_origin(ftype)
if is_special_typing_primitive(origin_type):
if origin_type is typing.Any:
return overridden or value_name
elif is_union(ftype):
args = getattr(ftype, "__args__", ())
if len(args) == 2 and args[1] == NoneType: # it is Optional
return self._unpack_field_value(
fname, args[0], parent, metadata=metadata
)
else:
method_name = self._add_unpack_union(
fname, ftype, args, parent, metadata
)
return (
f"cls.{method_name}({value_name},"
f"use_bytes,use_enum,use_datetime)"
)
elif origin_type is typing.AnyStr:
raise UnserializableDataError(
"AnyStr is not supported by mashumaro"
)
elif is_type_var(ftype):
raise UnserializableDataError(
"TypeVars are not supported by mashumaro"
)
else:
raise UnserializableDataError(
f"{ftype} as a field type is not supported by mashumaro"
)
elif origin_type is int:
return overridden or f"int({value_name})"
elif origin_type is float:
return overridden or f"float({value_name})"
elif origin_type in (bool, NoneType):
return overridden or value_name
elif origin_type in (datetime.datetime, datetime.date, datetime.time):
if overridden:
return f"{value_name} if use_datetime else {overridden}"
elif deserialize_option is not None:
if deserialize_option == "ciso8601":
self.ensure_module_imported("ciso8601")
datetime_parser = "ciso8601.parse_datetime"
elif deserialize_option == "pendulum":
self.ensure_module_imported("pendulum")
datetime_parser = "pendulum.parse"
else:
raise UnserializableField(
fname,
ftype,
parent,
f"Unsupported deserialization engine "
f'"{deserialize_option}"',
)
suffix = ""
if origin_type is datetime.date:
suffix = ".date()"
elif origin_type is datetime.time:
suffix = ".time()"
return (
f"{value_name} if use_datetime else "
f"{datetime_parser}({value_name}){suffix}"
)
return (
f"{value_name} if use_datetime else "
f"datetime.{origin_type.__name__}."
f"fromisoformat({value_name})"
)
elif origin_type is datetime.timedelta:
return overridden or f"datetime.timedelta(seconds={value_name})"
elif origin_type is datetime.timezone:
return overridden or f"parse_timezone({value_name})"
elif origin_type is uuid.UUID:
return overridden or f"uuid.UUID({value_name})"
elif origin_type is ipaddress.IPv4Address:
return overridden or f"ipaddress.IPv4Address({value_name})"
elif origin_type is ipaddress.IPv6Address:
return overridden or f"ipaddress.IPv6Address({value_name})"
elif origin_type is ipaddress.IPv4Network:
return overridden or f"ipaddress.IPv4Network({value_name})"
elif origin_type is ipaddress.IPv6Network:
return overridden or f"ipaddress.IPv6Network({value_name})"
elif origin_type is ipaddress.IPv4Interface:
return overridden or f"ipaddress.IPv4Interface({value_name})"
elif origin_type is ipaddress.IPv6Interface:
return overridden or f"ipaddress.IPv6Interface({value_name})"
elif origin_type is Decimal:
return overridden or f"Decimal({value_name})"
elif origin_type is Fraction:
return overridden or f"Fraction({value_name})"
elif issubclass(origin_type, typing.Collection) and not issubclass(
origin_type, enum.Enum
):
args = getattr(ftype, "__args__", ())
def inner_expr(arg_num=0, v_name="value", v_type=None):
if v_type:
return self._unpack_field_value(
fname, v_type, parent, v_name
)
else:
return self._unpack_field_value(
fname, args[arg_num], parent, v_name
)
if issubclass(origin_type, typing.ByteString):
if origin_type is bytes:
specific = f"decodebytes({value_name}.encode())"
return (
f"{value_name} if use_bytes else "
f"{overridden or specific}"
)
elif origin_type is bytearray:
if overridden:
overridden = (
f"bytearray({value_name}) if use_bytes else "
f"{overridden}"
)
specific = (
f"bytearray({value_name} if use_bytes else "
f"decodebytes({value_name}.encode()))"
)
return overridden or specific
elif issubclass(origin_type, str):
return overridden or value_name
elif issubclass(origin_type, typing.List):
if is_generic(ftype):
return (
overridden
or f"[{inner_expr()} for value in {value_name}]"
)
elif ftype is list:
raise UnserializableField(
fname, ftype, parent, "Use typing.List[T] instead"
)
elif issubclass(origin_type, typing.Deque):
if is_generic(ftype):
return (
overridden
or f"collections.deque([{inner_expr()} "
f"for value in {value_name}])"
)
elif ftype is collections.deque:
raise UnserializableField(
fname, ftype, parent, "Use typing.Deque[T] instead"
)
elif issubclass(origin_type, typing.Tuple):
if is_generic(ftype):
return (
overridden
or f"tuple([{inner_expr()} for value in {value_name}])"
)
elif ftype is tuple:
raise UnserializableField(
fname, ftype, parent, "Use typing.Tuple[T] instead"
)
elif issubclass(origin_type, typing.FrozenSet):
if is_generic(ftype):
return (
overridden
or f"frozenset([{inner_expr()} "
f"for value in {value_name}])"
)
elif ftype is frozenset:
raise UnserializableField(
fname, ftype, parent, "Use typing.FrozenSet[T] instead"
)
elif issubclass(origin_type, typing.AbstractSet):
if is_generic(ftype):
return (
overridden
or f"set([{inner_expr()} for value in {value_name}])"
)
elif ftype is set:
raise UnserializableField(
fname, ftype, parent, "Use typing.Set[T] instead"
)
elif issubclass(origin_type, typing.ChainMap):
if ftype is collections.ChainMap:
raise UnserializableField(
fname,
ftype,
parent,
"Use typing.ChainMap[KT,VT] instead",
)
elif is_generic(ftype):
if is_dataclass(args[0]):
raise UnserializableDataError(
"ChainMaps with dataclasses as keys "
"are not supported by mashumaro"
)
else:
return (
overridden
or f"collections.ChainMap("
f'*[{{{inner_expr(0,"key")}:{inner_expr(1)} '
f"for key, value in m.items()}} "
f"for m in {value_name}])"
)
elif PY_37_MIN and issubclass(origin_type, typing.OrderedDict):
if ftype is collections.OrderedDict:
raise UnserializableField(
fname,
ftype,
parent,
"Use typing.OrderedDict[KT,VT] instead",
)
elif is_generic(ftype):
if is_dataclass(args[0]):
raise UnserializableDataError(
"OrderedDict with dataclasses as keys "
"are not supported by mashumaro"
)
else:
return (
overridden
or f"collections.OrderedDict("
f'{{{inner_expr(0,"key")}: {inner_expr(1)} '
f"for key, value in {value_name}.items()}})"
)
elif issubclass(origin_type, typing.Counter):
if ftype is collections.Counter:
raise UnserializableField(
fname,
ftype,
parent,
"Use typing.Counter[KT] instead",
)
elif is_generic(ftype):
if is_dataclass(args[0]):
raise UnserializableDataError(
"Counter with dataclasses as keys "
"are not supported by mashumaro"
)
else:
return (
overridden
or f"collections.Counter("
f'{{{inner_expr(0,"key")}: '
f"{inner_expr(1, v_type=int)} "
f"for key, value in {value_name}.items()}})"
)
elif issubclass(origin_type, typing.Mapping):
if ftype is dict:
raise UnserializableField(
fname,
ftype,
parent,
"Use typing.Dict[KT,VT] or Mapping[KT,VT] instead",
)
elif is_generic(ftype):
if is_dataclass(args[0]):
raise UnserializableDataError(
"Mappings with dataclasses as keys "
"are not supported by mashumaro"
)
else:
return (
overridden
or f'{{{inner_expr(0,"key")}: {inner_expr(1)} '
f"for key, value in {value_name}.items()}}"
)
elif issubclass(origin_type, typing.Sequence):
if is_generic(ftype):
return (
overridden
or f"[{inner_expr()} for value in {value_name}]"
)
elif issubclass(origin_type, os.PathLike):
if overridden:
return overridden
elif issubclass(origin_type, pathlib.PosixPath):
return f"pathlib.PosixPath({value_name})"
elif issubclass(origin_type, pathlib.WindowsPath):
return f"pathlib.WindowsPath({value_name})"
elif issubclass(origin_type, pathlib.Path):
return f"pathlib.Path({value_name})"
elif issubclass(origin_type, pathlib.PurePosixPath):
return f"pathlib.PurePosixPath({value_name})"
elif issubclass(origin_type, pathlib.PureWindowsPath):
return f"pathlib.PureWindowsPath({value_name})"
elif issubclass(origin_type, pathlib.PurePath):
return f"pathlib.PurePath({value_name})"
elif origin_type is os.PathLike:
return f"pathlib.PurePath({value_name})"
else:
return f"{type_name(origin_type)}({value_name})"
elif issubclass(origin_type, enum.Enum):
specific = f"{type_name(origin_type)}({value_name})"
return f"{value_name} if use_enum else {overridden or specific}"
elif overridden:
return overridden
raise UnserializableField(fname, ftype, parent)
def _add_pack_union(self, fname, ftype, args, parent, metadata) -> str:
lines = CodeLines()
method_name = (
f"__pack_union_{parent.__name__}_{fname}__"
f"{str(uuid.uuid4().hex)}"
)
lines.append(
f"def {method_name}"
f"(self,value, {self.get_to_dict_default_flag_values()}):"
)
with lines.indent():
for packer in [
self._pack_value(fname, arg_type, parent, metadata=metadata)
for arg_type in args
]:
lines.append("try:")
with lines.indent():
lines.append(f"return {packer}")
lines.append("except:")
with lines.indent():
lines.append("pass")
lines.append(
f"raise InvalidFieldValue('{fname}',{ftype},value,cls)"
)
lines.append(f"setattr(cls, '{method_name}', {method_name})")
if self.get_config().debug:
print(self.cls)
print(lines.as_text())
exec(lines.as_text(), globals(), self.__dict__)
return method_name
def _add_unpack_union(self, fname, ftype, args, parent, metadata) -> str:
lines = CodeLines()
method_name = (
f"__unpack_union_{parent.__name__}_{fname}__"
f"{str(uuid.uuid4().hex)}"
)
lines.append("@classmethod")
lines.append(
f"def {method_name}"
f"(cls,value,use_bytes=False,use_enum=False,use_datetime=False):"
)
with lines.indent():
for unpacker in [
self._unpack_field_value(
fname, arg_type, parent, metadata=metadata
)
for arg_type in args
]:
lines.append("try:")
with lines.indent():
lines.append(f"return {unpacker}")
lines.append("except:")
with lines.indent():
lines.append("pass")
lines.append(
f"raise InvalidFieldValue('{fname}',{ftype},value,cls)"
)
lines.append(f"setattr(cls, '{method_name}', {method_name})")
if self.get_config().debug:
print(self.cls)
print(lines.as_text())
exec(lines.as_text(), globals(), self.__dict__)
return method_name
Editor is loading...
Leave a Comment