# SPDX-License-Identifier: MIT
from __future__ import annotations
import abc
import contextlib
import copy
import enum
import functools
import inspect
import itertools
import linecache
import sys
import types
import typing
from operator import itemgetter
# We need to import _compat itself in addition to the _compat members to avoid
# having the thread-local in the globals here.
from . import _compat, _config, setters
from ._compat import (
PY_3_8_PLUS,
PY_3_10_PLUS,
PY_3_11_PLUS,
_AnnotationExtractor,
_get_annotations,
get_generic_base,
)
from .exceptions import (
DefaultAlreadySetError,
FrozenInstanceError,
NotAnAttrsClassError,
UnannotatedAttributeError,
)
# This is used at least twice, so cache it here.
_OBJ_SETATTR = object.__setattr__
_INIT_FACTORY_PAT = "__attr_factory_%s"
_CLASSVAR_PREFIXES = (
"typing.ClassVar",
"t.ClassVar",
"ClassVar",
"typing_extensions.ClassVar",
)
# we don't use a double-underscore prefix because that triggers
# name mangling when trying to create a slot for the field
# (when slots=True)
_HASH_CACHE_FIELD = "_attrs_cached_hash"
_EMPTY_METADATA_SINGLETON = types.MappingProxyType({})
# Unique object for unequivocal getattr() defaults.
_SENTINEL = object()
_DEFAULT_ON_SETATTR = setters.pipe(setters.convert, setters.validate)
class _Nothing(enum.Enum):
"""
Sentinel to indicate the lack of a value when `None` is ambiguous.
If extending attrs, you can use ``typing.Literal[NOTHING]`` to show
that a value may be ``NOTHING``.
.. versionchanged:: 21.1.0 ``bool(NOTHING)`` is now False.
.. versionchanged:: 22.2.0 ``NOTHING`` is now an ``enum.Enum`` variant.
"""
NOTHING = enum.auto()
def __repr__(self):
return "NOTHING"
def __bool__(self):
return False
NOTHING = _Nothing.NOTHING
"""
Sentinel to indicate the lack of a value when `None` is ambiguous.
"""
class _CacheHashWrapper(int):
"""
An integer subclass that pickles / copies as None
This is used for non-slots classes with ``cache_hash=True``, to avoid
serializing a potentially (even likely) invalid hash value. Since `None`
is the default value for uncalculated hashes, whenever this is copied,
the copy's value for the hash should automatically reset.
See GH #613 for more details.
"""
def __reduce__(self, _none_constructor=type(None), _args=()): # noqa: B008
return _none_constructor, _args
[docs]
def attrib(
default=NOTHING,
validator=None,
repr=True,
cmp=None,
hash=None,
init=True,
metadata=None,
type=None,
converter=None,
factory=None,
kw_only=False,
eq=None,
order=None,
on_setattr=None,
alias=None,
):
"""
Create a new field / attribute on a class.
Identical to `attrs.field`, except it's not keyword-only.
Consider using `attrs.field` in new code (``attr.ib`` will *never* go away,
though).
.. warning::
Does **nothing** unless the class is also decorated with
`attr.s` (or similar)!
.. versionadded:: 15.2.0 *convert*
.. versionadded:: 16.3.0 *metadata*
.. versionchanged:: 17.1.0 *validator* can be a ``list`` now.
.. versionchanged:: 17.1.0
*hash* is `None` and therefore mirrors *eq* by default.
.. versionadded:: 17.3.0 *type*
.. deprecated:: 17.4.0 *convert*
.. versionadded:: 17.4.0
*converter* as a replacement for the deprecated *convert* to achieve
consistency with other noun-based arguments.
.. versionadded:: 18.1.0
``factory=f`` is syntactic sugar for ``default=attr.Factory(f)``.
.. versionadded:: 18.2.0 *kw_only*
.. versionchanged:: 19.2.0 *convert* keyword argument removed.
.. versionchanged:: 19.2.0 *repr* also accepts a custom callable.
.. deprecated:: 19.2.0 *cmp* Removal on or after 2021-06-01.
.. versionadded:: 19.2.0 *eq* and *order*
.. versionadded:: 20.1.0 *on_setattr*
.. versionchanged:: 20.3.0 *kw_only* backported to Python 2
.. versionchanged:: 21.1.0
*eq*, *order*, and *cmp* also accept a custom callable
.. versionchanged:: 21.1.0 *cmp* undeprecated
.. versionadded:: 22.2.0 *alias*
"""
eq, eq_key, order, order_key = _determine_attrib_eq_order(
cmp, eq, order, True
)
if hash is not None and hash is not True and hash is not False:
msg = "Invalid value for hash. Must be True, False, or None."
raise TypeError(msg)
if factory is not None:
if default is not NOTHING:
msg = (
"The `default` and `factory` arguments are mutually exclusive."
)
raise ValueError(msg)
if not callable(factory):
msg = "The `factory` argument must be a callable."
raise ValueError(msg)
default = Factory(factory)
if metadata is None:
metadata = {}
# Apply syntactic sugar by auto-wrapping.
if isinstance(on_setattr, (list, tuple)):
on_setattr = setters.pipe(*on_setattr)
if validator and isinstance(validator, (list, tuple)):
validator = and_(*validator)
if converter and isinstance(converter, (list, tuple)):
converter = pipe(*converter)
return _CountingAttr(
default=default,
validator=validator,
repr=repr,
cmp=None,
hash=hash,
init=init,
converter=converter,
metadata=metadata,
type=type,
kw_only=kw_only,
eq=eq,
eq_key=eq_key,
order=order,
order_key=order_key,
on_setattr=on_setattr,
alias=alias,
)
def _compile_and_eval(script, globs, locs=None, filename=""):
"""
Evaluate the script with the given global (globs) and local (locs)
variables.
"""
bytecode = compile(script, filename, "exec")
eval(bytecode, globs, locs)
def _make_method(name, script, filename, globs, locals=None):
"""
Create the method with the script given and return the method object.
"""
locs = {} if locals is None else locals
# In order of debuggers like PDB being able to step through the code,
# we add a fake linecache entry.
count = 1
base_filename = filename
while True:
linecache_tuple = (
len(script),
None,
script.splitlines(True),
filename,
)
old_val = linecache.cache.setdefault(filename, linecache_tuple)
if old_val == linecache_tuple:
break
filename = f"{base_filename[:-1]}-{count}>"
count += 1
_compile_and_eval(script, globs, locs, filename)
return locs[name]
def _make_attr_tuple_class(cls_name, attr_names):
"""
Create a tuple subclass to hold `Attribute`s for an `attrs` class.
The subclass is a bare tuple with properties for names.
class MyClassAttributes(tuple):
__slots__ = ()
x = property(itemgetter(0))
"""
attr_class_name = f"{cls_name}Attributes"
attr_class_template = [
f"class {attr_class_name}(tuple):",
" __slots__ = ()",
]
if attr_names:
for i, attr_name in enumerate(attr_names):
attr_class_template.append(
f" {attr_name} = _attrs_property(_attrs_itemgetter({i}))"
)
else:
attr_class_template.append(" pass")
globs = {"_attrs_itemgetter": itemgetter, "_attrs_property": property}
_compile_and_eval("\n".join(attr_class_template), globs)
return globs[attr_class_name]
# Tuple class for extracted attributes from a class definition.
# `base_attrs` is a subset of `attrs`.
_Attributes = _make_attr_tuple_class(
"_Attributes",
[
# all attributes to build dunder methods for
"attrs",
# attributes that have been inherited
"base_attrs",
# map inherited attributes to their originating classes
"base_attrs_map",
],
)
def _is_class_var(annot):
"""
Check whether *annot* is a typing.ClassVar.
The string comparison hack is used to avoid evaluating all string
annotations which would put attrs-based classes at a performance
disadvantage compared to plain old classes.
"""
annot = str(annot)
# Annotation can be quoted.
if annot.startswith(("'", '"')) and annot.endswith(("'", '"')):
annot = annot[1:-1]
return annot.startswith(_CLASSVAR_PREFIXES)
def _has_own_attribute(cls, attrib_name):
"""
Check whether *cls* defines *attrib_name* (and doesn't just inherit it).
"""
return attrib_name in cls.__dict__
def _collect_base_attrs(cls, taken_attr_names):
"""
Collect attr.ibs from base classes of *cls*, except *taken_attr_names*.
"""
base_attrs = []
base_attr_map = {} # A dictionary of base attrs to their classes.
# Traverse the MRO and collect attributes.
for base_cls in reversed(cls.__mro__[1:-1]):
for a in getattr(base_cls, "__attrs_attrs__", []):
if a.inherited or a.name in taken_attr_names:
continue
a = a.evolve(inherited=True) # noqa: PLW2901
base_attrs.append(a)
base_attr_map[a.name] = base_cls
# For each name, only keep the freshest definition i.e. the furthest at the
# back. base_attr_map is fine because it gets overwritten with every new
# instance.
filtered = []
seen = set()
for a in reversed(base_attrs):
if a.name in seen:
continue
filtered.insert(0, a)
seen.add(a.name)
return filtered, base_attr_map
def _collect_base_attrs_broken(cls, taken_attr_names):
"""
Collect attr.ibs from base classes of *cls*, except *taken_attr_names*.
N.B. *taken_attr_names* will be mutated.
Adhere to the old incorrect behavior.
Notably it collects from the front and considers inherited attributes which
leads to the buggy behavior reported in #428.
"""
base_attrs = []
base_attr_map = {} # A dictionary of base attrs to their classes.
# Traverse the MRO and collect attributes.
for base_cls in cls.__mro__[1:-1]:
for a in getattr(base_cls, "__attrs_attrs__", []):
if a.name in taken_attr_names:
continue
a = a.evolve(inherited=True) # noqa: PLW2901
taken_attr_names.add(a.name)
base_attrs.append(a)
base_attr_map[a.name] = base_cls
return base_attrs, base_attr_map
def _transform_attrs(
cls, these, auto_attribs, kw_only, collect_by_mro, field_transformer
):
"""
Transform all `_CountingAttr`s on a class into `Attribute`s.
If *these* is passed, use that and don't look for them on the class.
If *collect_by_mro* is True, collect them in the correct MRO order,
otherwise use the old -- incorrect -- order. See #428.
Return an `_Attributes`.
"""
cd = cls.__dict__
anns = _get_annotations(cls)
if these is not None:
ca_list = list(these.items())
elif auto_attribs is True:
ca_names = {
name
for name, attr in cd.items()
if isinstance(attr, _CountingAttr)
}
ca_list = []
annot_names = set()
for attr_name, type in anns.items():
if _is_class_var(type):
continue
annot_names.add(attr_name)
a = cd.get(attr_name, NOTHING)
if not isinstance(a, _CountingAttr):
a = attrib() if a is NOTHING else attrib(default=a)
ca_list.append((attr_name, a))
unannotated = ca_names - annot_names
if len(unannotated) > 0:
raise UnannotatedAttributeError(
"The following `attr.ib`s lack a type annotation: "
+ ", ".join(
sorted(unannotated, key=lambda n: cd.get(n).counter)
)
+ "."
)
else:
ca_list = sorted(
(
(name, attr)
for name, attr in cd.items()
if isinstance(attr, _CountingAttr)
),
key=lambda e: e[1].counter,
)
own_attrs = [
Attribute.from_counting_attr(
name=attr_name, ca=ca, type=anns.get(attr_name)
)
for attr_name, ca in ca_list
]
if collect_by_mro:
base_attrs, base_attr_map = _collect_base_attrs(
cls, {a.name for a in own_attrs}
)
else:
base_attrs, base_attr_map = _collect_base_attrs_broken(
cls, {a.name for a in own_attrs}
)
if kw_only:
own_attrs = [a.evolve(kw_only=True) for a in own_attrs]
base_attrs = [a.evolve(kw_only=True) for a in base_attrs]
attrs = base_attrs + own_attrs
# Mandatory vs non-mandatory attr order only matters when they are part of
# the __init__ signature and when they aren't kw_only (which are moved to
# the end and can be mandatory or non-mandatory in any order, as they will
# be specified as keyword args anyway). Check the order of those attrs:
had_default = False
for a in (a for a in attrs if a.init is not False and a.kw_only is False):
if had_default is True and a.default is NOTHING:
msg = f"No mandatory attributes allowed after an attribute with a default value or factory. Attribute in question: {a!r}"
raise ValueError(msg)
if had_default is False and a.default is not NOTHING:
had_default = True
if field_transformer is not None:
attrs = field_transformer(cls, attrs)
# Resolve default field alias after executing field_transformer.
# This allows field_transformer to differentiate between explicit vs
# default aliases and supply their own defaults.
attrs = [
a.evolve(alias=_default_init_alias_for(a.name)) if not a.alias else a
for a in attrs
]
# Create AttrsClass *after* applying the field_transformer since it may
# add or remove attributes!
attr_names = [a.name for a in attrs]
AttrsClass = _make_attr_tuple_class(cls.__name__, attr_names)
return _Attributes((AttrsClass(attrs), base_attrs, base_attr_map))
def _make_cached_property_getattr(cached_properties, original_getattr, cls):
lines = [
# Wrapped to get `__class__` into closure cell for super()
# (It will be replaced with the newly constructed class after construction).
"def wrapper(_cls):",
" __class__ = _cls",
" def __getattr__(self, item, cached_properties=cached_properties, original_getattr=original_getattr, _cached_setattr_get=_cached_setattr_get):",
" func = cached_properties.get(item)",
" if func is not None:",
" result = func(self)",
" _setter = _cached_setattr_get(self)",
" _setter(item, result)",
" return result",
]
if original_getattr is not None:
lines.append(
" return original_getattr(self, item)",
)
else:
lines.extend(
[
" try:",
" return super().__getattribute__(item)",
" except AttributeError:",
" if not hasattr(super(), '__getattr__'):",
" raise",
" return super().__getattr__(item)",
" original_error = f\"'{self.__class__.__name__}' object has no attribute '{item}'\"",
" raise AttributeError(original_error)",
]
)
lines.extend(
[
" return __getattr__",
"__getattr__ = wrapper(_cls)",
]
)
unique_filename = _generate_unique_filename(cls, "getattr")
glob = {
"cached_properties": cached_properties,
"_cached_setattr_get": _OBJ_SETATTR.__get__,
"original_getattr": original_getattr,
}
return _make_method(
"__getattr__",
"\n".join(lines),
unique_filename,
glob,
locals={
"_cls": cls,
},
)
def _frozen_setattrs(self, name, value):
"""
Attached to frozen classes as __setattr__.
"""
if isinstance(self, BaseException) and name in (
"__cause__",
"__context__",
"__traceback__",
):
BaseException.__setattr__(self, name, value)
return
raise FrozenInstanceError()
def _frozen_delattrs(self, name):
"""
Attached to frozen classes as __delattr__.
"""
raise FrozenInstanceError()
class _ClassBuilder:
"""
Iteratively build *one* class.
"""
__slots__ = (
"_attr_names",
"_attrs",
"_base_attr_map",
"_base_names",
"_cache_hash",
"_cls",
"_cls_dict",
"_delete_attribs",
"_frozen",
"_has_pre_init",
"_pre_init_has_args",
"_has_post_init",
"_is_exc",
"_on_setattr",
"_slots",
"_weakref_slot",
"_wrote_own_setattr",
"_has_custom_setattr",
)
def __init__(
self,
cls,
these,
slots,
frozen,
weakref_slot,
getstate_setstate,
auto_attribs,
kw_only,
cache_hash,
is_exc,
collect_by_mro,
on_setattr,
has_custom_setattr,
field_transformer,
):
attrs, base_attrs, base_map = _transform_attrs(
cls,
these,
auto_attribs,
kw_only,
collect_by_mro,
field_transformer,
)
self._cls = cls
self._cls_dict = dict(cls.__dict__) if slots else {}
self._attrs = attrs
self._base_names = {a.name for a in base_attrs}
self._base_attr_map = base_map
self._attr_names = tuple(a.name for a in attrs)
self._slots = slots
self._frozen = frozen
self._weakref_slot = weakref_slot
self._cache_hash = cache_hash
self._has_pre_init = bool(getattr(cls, "__attrs_pre_init__", False))
self._pre_init_has_args = False
if self._has_pre_init:
# Check if the pre init method has more arguments than just `self`
# We want to pass arguments if pre init expects arguments
pre_init_func = cls.__attrs_pre_init__
pre_init_signature = inspect.signature(pre_init_func)
self._pre_init_has_args = len(pre_init_signature.parameters) > 1
self._has_post_init = bool(getattr(cls, "__attrs_post_init__", False))
self._delete_attribs = not bool(these)
self._is_exc = is_exc
self._on_setattr = on_setattr
self._has_custom_setattr = has_custom_setattr
self._wrote_own_setattr = False
self._cls_dict["__attrs_attrs__"] = self._attrs
if frozen:
self._cls_dict["__setattr__"] = _frozen_setattrs
self._cls_dict["__delattr__"] = _frozen_delattrs
self._wrote_own_setattr = True
elif on_setattr in (
_DEFAULT_ON_SETATTR,
setters.validate,
setters.convert,
):
has_validator = has_converter = False
for a in attrs:
if a.validator is not None:
has_validator = True
if a.converter is not None:
has_converter = True
if has_validator and has_converter:
break
if (
(
on_setattr == _DEFAULT_ON_SETATTR
and not (has_validator or has_converter)
)
or (on_setattr == setters.validate and not has_validator)
or (on_setattr == setters.convert and not has_converter)
):
# If class-level on_setattr is set to convert + validate, but
# there's no field to convert or validate, pretend like there's
# no on_setattr.
self._on_setattr = None
if getstate_setstate:
(
self._cls_dict["__getstate__"],
self._cls_dict["__setstate__"],
) = self._make_getstate_setstate()
def __repr__(self):
return f"<_ClassBuilder(cls={self._cls.__name__})>"
def build_class(self):
"""
Finalize class based on the accumulated configuration.
Builder cannot be used after calling this method.
"""
if self._slots is True:
cls = self._create_slots_class()
else:
cls = self._patch_original_class()
if PY_3_10_PLUS:
cls = abc.update_abstractmethods(cls)
# The method gets only called if it's not inherited from a base class.
# _has_own_attribute does NOT work properly for classmethods.
if (
getattr(cls, "__attrs_init_subclass__", None)
and "__attrs_init_subclass__" not in cls.__dict__
):
cls.__attrs_init_subclass__()
return cls
def _patch_original_class(self):
"""
Apply accumulated methods and return the class.
"""
cls = self._cls
base_names = self._base_names
# Clean class of attribute definitions (`attr.ib()`s).
if self._delete_attribs:
for name in self._attr_names:
if (
name not in base_names
and getattr(cls, name, _SENTINEL) is not _SENTINEL
):
# An AttributeError can happen if a base class defines a
# class variable and we want to set an attribute with the
# same name by using only a type annotation.
with contextlib.suppress(AttributeError):
delattr(cls, name)
# Attach our dunder methods.
for name, value in self._cls_dict.items():
setattr(cls, name, value)
# If we've inherited an attrs __setattr__ and don't write our own,
# reset it to object's.
if not self._wrote_own_setattr and getattr(
cls, "__attrs_own_setattr__", False
):
cls.__attrs_own_setattr__ = False
if not self._has_custom_setattr:
cls.__setattr__ = _OBJ_SETATTR
return cls
def _create_slots_class(self):
"""
Build and return a new class with a `__slots__` attribute.
"""
cd = {
k: v
for k, v in self._cls_dict.items()
if k not in (*tuple(self._attr_names), "__dict__", "__weakref__")
}
# If our class doesn't have its own implementation of __setattr__
# (either from the user or by us), check the bases, if one of them has
# an attrs-made __setattr__, that needs to be reset. We don't walk the
# MRO because we only care about our immediate base classes.
# XXX: This can be confused by subclassing a slotted attrs class with
# XXX: a non-attrs class and subclass the resulting class with an attrs
# XXX: class. See `test_slotted_confused` for details. For now that's
# XXX: OK with us.
if not self._wrote_own_setattr:
cd["__attrs_own_setattr__"] = False
if not self._has_custom_setattr:
for base_cls in self._cls.__bases__:
if base_cls.__dict__.get("__attrs_own_setattr__", False):
cd["__setattr__"] = _OBJ_SETATTR
break
# Traverse the MRO to collect existing slots
# and check for an existing __weakref__.
existing_slots = {}
weakref_inherited = False
for base_cls in self._cls.__mro__[1:-1]:
if base_cls.__dict__.get("__weakref__", None) is not None:
weakref_inherited = True
existing_slots.update(
{
name: getattr(base_cls, name)
for name in getattr(base_cls, "__slots__", [])
}
)
base_names = set(self._base_names)
names = self._attr_names
if (
self._weakref_slot
and "__weakref__" not in getattr(self._cls, "__slots__", ())
and "__weakref__" not in names
and not weakref_inherited
):
names += ("__weakref__",)
if PY_3_8_PLUS:
cached_properties = {
name: cached_property.func
for name, cached_property in cd.items()
if isinstance(cached_property, functools.cached_property)
}
else:
# `functools.cached_property` was introduced in 3.8.
# So can't be used before this.
cached_properties = {}
# Collect methods with a `__class__` reference that are shadowed in the new class.
# To know to update them.
additional_closure_functions_to_update = []
if cached_properties:
class_annotations = _get_annotations(self._cls)
for name, func in cached_properties.items():
# Add cached properties to names for slotting.
names += (name,)
# Clear out function from class to avoid clashing.
del cd[name]
additional_closure_functions_to_update.append(func)
annotation = inspect.signature(func).return_annotation
if annotation is not inspect.Parameter.empty:
class_annotations[name] = annotation
original_getattr = cd.get("__getattr__")
if original_getattr is not None:
additional_closure_functions_to_update.append(original_getattr)
cd["__getattr__"] = _make_cached_property_getattr(
cached_properties, original_getattr, self._cls
)
# We only add the names of attributes that aren't inherited.
# Setting __slots__ to inherited attributes wastes memory.
slot_names = [name for name in names if name not in base_names]
# There are slots for attributes from current class
# that are defined in parent classes.
# As their descriptors may be overridden by a child class,
# we collect them here and update the class dict
reused_slots = {
slot: slot_descriptor
for slot, slot_descriptor in existing_slots.items()
if slot in slot_names
}
slot_names = [name for name in slot_names if name not in reused_slots]
cd.update(reused_slots)
if self._cache_hash:
slot_names.append(_HASH_CACHE_FIELD)
cd["__slots__"] = tuple(slot_names)
cd["__qualname__"] = self._cls.__qualname__
# Create new class based on old class and our methods.
cls = type(self._cls)(self._cls.__name__, self._cls.__bases__, cd)
# The following is a fix for
# <https://github.com/python-attrs/attrs/issues/102>.
# If a method mentions `__class__` or uses the no-arg super(), the
# compiler will bake a reference to the class in the method itself
# as `method.__closure__`. Since we replace the class with a
# clone, we rewrite these references so it keeps working.
for item in itertools.chain(
cls.__dict__.values(), additional_closure_functions_to_update
):
if isinstance(item, (classmethod, staticmethod)):
# Class- and staticmethods hide their functions inside.
# These might need to be rewritten as well.
closure_cells = getattr(item.__func__, "__closure__", None)
elif isinstance(item, property):
# Workaround for property `super()` shortcut (PY3-only).
# There is no universal way for other descriptors.
closure_cells = getattr(item.fget, "__closure__", None)
else:
closure_cells = getattr(item, "__closure__", None)
if not closure_cells: # Catch None or the empty list.
continue
for cell in closure_cells:
try:
match = cell.cell_contents is self._cls
except ValueError: # noqa: PERF203
# ValueError: Cell is empty
pass
else:
if match:
cell.cell_contents = cls
return cls
def add_repr(self, ns):
self._cls_dict["__repr__"] = self._add_method_dunders(
_make_repr(self._attrs, ns, self._cls)
)
return self
def add_str(self):
repr = self._cls_dict.get("__repr__")
if repr is None:
msg = "__str__ can only be generated if a __repr__ exists."
raise ValueError(msg)
def __str__(self):
return self.__repr__()
self._cls_dict["__str__"] = self._add_method_dunders(__str__)
return self
def _make_getstate_setstate(self):
"""
Create custom __setstate__ and __getstate__ methods.
"""
# __weakref__ is not writable.
state_attr_names = tuple(
an for an in self._attr_names if an != "__weakref__"
)
def slots_getstate(self):
"""
Automatically created by attrs.
"""
return {name: getattr(self, name) for name in state_attr_names}
hash_caching_enabled = self._cache_hash
def slots_setstate(self, state):
"""
Automatically created by attrs.
"""
__bound_setattr = _OBJ_SETATTR.__get__(self)
if isinstance(state, tuple):
# Backward compatibility with attrs instances pickled with
# attrs versions before v22.2.0 which stored tuples.
for name, value in zip(state_attr_names, state):
__bound_setattr(name, value)
else:
for name in state_attr_names:
if name in state:
__bound_setattr(name, state[name])
# The hash code cache is not included when the object is
# serialized, but it still needs to be initialized to None to
# indicate that the first call to __hash__ should be a cache
# miss.
if hash_caching_enabled:
__bound_setattr(_HASH_CACHE_FIELD, None)
return slots_getstate, slots_setstate
def make_unhashable(self):
self._cls_dict["__hash__"] = None
return self
def add_hash(self):
self._cls_dict["__hash__"] = self._add_method_dunders(
_make_hash(
self._cls,
self._attrs,
frozen=self._frozen,
cache_hash=self._cache_hash,
)
)
return self
def add_init(self):
self._cls_dict["__init__"] = self._add_method_dunders(
_make_init(
self._cls,
self._attrs,
self._has_pre_init,
self._pre_init_has_args,
self._has_post_init,
self._frozen,
self._slots,
self._cache_hash,
self._base_attr_map,
self._is_exc,
self._on_setattr,
attrs_init=False,
)
)
return self
def add_match_args(self):
self._cls_dict["__match_args__"] = tuple(
field.name
for field in self._attrs
if field.init and not field.kw_only
)
def add_attrs_init(self):
self._cls_dict["__attrs_init__"] = self._add_method_dunders(
_make_init(
self._cls,
self._attrs,
self._has_pre_init,
self._pre_init_has_args,
self._has_post_init,
self._frozen,
self._slots,
self._cache_hash,
self._base_attr_map,
self._is_exc,
self._on_setattr,
attrs_init=True,
)
)
return self
def add_eq(self):
cd = self._cls_dict
cd["__eq__"] = self._add_method_dunders(
_make_eq(self._cls, self._attrs)
)
cd["__ne__"] = self._add_method_dunders(_make_ne())
return self
def add_order(self):
cd = self._cls_dict
cd["__lt__"], cd["__le__"], cd["__gt__"], cd["__ge__"] = (
self._add_method_dunders(meth)
for meth in _make_order(self._cls, self._attrs)
)
return self
def add_setattr(self):
if self._frozen:
return self
sa_attrs = {}
for a in self._attrs:
on_setattr = a.on_setattr or self._on_setattr
if on_setattr and on_setattr is not setters.NO_OP:
sa_attrs[a.name] = a, on_setattr
if not sa_attrs:
return self
if self._has_custom_setattr:
# We need to write a __setattr__ but there already is one!
msg = "Can't combine custom __setattr__ with on_setattr hooks."
raise ValueError(msg)
# docstring comes from _add_method_dunders
def __setattr__(self, name, val):
try:
a, hook = sa_attrs[name]
except KeyError:
nval = val
else:
nval = hook(self, a, val)
_OBJ_SETATTR(self, name, nval)
self._cls_dict["__attrs_own_setattr__"] = True
self._cls_dict["__setattr__"] = self._add_method_dunders(__setattr__)
self._wrote_own_setattr = True
return self
def _add_method_dunders(self, method):
"""
Add __module__ and __qualname__ to a *method* if possible.
"""
with contextlib.suppress(AttributeError):
method.__module__ = self._cls.__module__
with contextlib.suppress(AttributeError):
method.__qualname__ = f"{self._cls.__qualname__}.{method.__name__}"
with contextlib.suppress(AttributeError):
method.__doc__ = (
"Method generated by attrs for class "
f"{self._cls.__qualname__}."
)
return method
def _determine_attrs_eq_order(cmp, eq, order, default_eq):
"""
Validate the combination of *cmp*, *eq*, and *order*. Derive the effective
values of eq and order. If *eq* is None, set it to *default_eq*.
"""
if cmp is not None and any((eq is not None, order is not None)):
msg = "Don't mix `cmp` with `eq' and `order`."
raise ValueError(msg)
# cmp takes precedence due to bw-compatibility.
if cmp is not None:
return cmp, cmp
# If left None, equality is set to the specified default and ordering
# mirrors equality.
if eq is None:
eq = default_eq
if order is None:
order = eq
if eq is False and order is True:
msg = "`order` can only be True if `eq` is True too."
raise ValueError(msg)
return eq, order
def _determine_attrib_eq_order(cmp, eq, order, default_eq):
"""
Validate the combination of *cmp*, *eq*, and *order*. Derive the effective
values of eq and order. If *eq* is None, set it to *default_eq*.
"""
if cmp is not None and any((eq is not None, order is not None)):
msg = "Don't mix `cmp` with `eq' and `order`."
raise ValueError(msg)
def decide_callable_or_boolean(value):
"""
Decide whether a key function is used.
"""
if callable(value):
value, key = True, value
else:
key = None
return value, key
# cmp takes precedence due to bw-compatibility.
if cmp is not None:
cmp, cmp_key = decide_callable_or_boolean(cmp)
return cmp, cmp_key, cmp, cmp_key
# If left None, equality is set to the specified default and ordering
# mirrors equality.
if eq is None:
eq, eq_key = default_eq, None
else:
eq, eq_key = decide_callable_or_boolean(eq)
if order is None:
order, order_key = eq, eq_key
else:
order, order_key = decide_callable_or_boolean(order)
if eq is False and order is True:
msg = "`order` can only be True if `eq` is True too."
raise ValueError(msg)
return eq, eq_key, order, order_key
def _determine_whether_to_implement(
cls, flag, auto_detect, dunders, default=True
):
"""
Check whether we should implement a set of methods for *cls*.
*flag* is the argument passed into @attr.s like 'init', *auto_detect* the
same as passed into @attr.s and *dunders* is a tuple of attribute names
whose presence signal that the user has implemented it themselves.
Return *default* if no reason for either for or against is found.
"""
if flag is True or flag is False:
return flag
if flag is None and auto_detect is False:
return default
# Logically, flag is None and auto_detect is True here.
for dunder in dunders:
if _has_own_attribute(cls, dunder):
return False
return default
[docs]
def attrs(
maybe_cls=None,
these=None,
repr_ns=None,
repr=None,
cmp=None,
hash=None,
init=None,
slots=False,
frozen=False,
weakref_slot=True,
str=False,
auto_attribs=False,
kw_only=False,
cache_hash=False,
auto_exc=False,
eq=None,
order=None,
auto_detect=False,
collect_by_mro=False,
getstate_setstate=None,
on_setattr=None,
field_transformer=None,
match_args=True,
unsafe_hash=None,
):
r"""
A class decorator that adds :term:`dunder methods` according to the
specified attributes using `attr.ib` or the *these* argument.
Consider using `attrs.define` / `attrs.frozen` in new code (``attr.s`` will
*never* go away, though).
Args:
repr_ns (str):
When using nested classes, there was no way in Python 2 to
automatically detect that. This argument allows to set a custom
name for a more meaningful ``repr`` output. This argument is
pointless in Python 3 and is therefore deprecated.
.. caution::
Refer to `attrs.define` for the rest of the parameters, but note that they
can have different defaults.
Notably, leaving *on_setattr* as `None` will **not** add any hooks.
.. versionadded:: 16.0.0 *slots*
.. versionadded:: 16.1.0 *frozen*
.. versionadded:: 16.3.0 *str*
.. versionadded:: 16.3.0 Support for ``__attrs_post_init__``.
.. versionchanged:: 17.1.0
*hash* supports `None` as value which is also the default now.
.. versionadded:: 17.3.0 *auto_attribs*
.. versionchanged:: 18.1.0
If *these* is passed, no attributes are deleted from the class body.
.. versionchanged:: 18.1.0 If *these* is ordered, the order is retained.
.. versionadded:: 18.2.0 *weakref_slot*
.. deprecated:: 18.2.0
``__lt__``, ``__le__``, ``__gt__``, and ``__ge__`` now raise a
`DeprecationWarning` if the classes compared are subclasses of
each other. ``__eq`` and ``__ne__`` never tried to compared subclasses
to each other.
.. versionchanged:: 19.2.0
``__lt__``, ``__le__``, ``__gt__``, and ``__ge__`` now do not consider
subclasses comparable anymore.
.. versionadded:: 18.2.0 *kw_only*
.. versionadded:: 18.2.0 *cache_hash*
.. versionadded:: 19.1.0 *auto_exc*
.. deprecated:: 19.2.0 *cmp* Removal on or after 2021-06-01.
.. versionadded:: 19.2.0 *eq* and *order*
.. versionadded:: 20.1.0 *auto_detect*
.. versionadded:: 20.1.0 *collect_by_mro*
.. versionadded:: 20.1.0 *getstate_setstate*
.. versionadded:: 20.1.0 *on_setattr*
.. versionadded:: 20.3.0 *field_transformer*
.. versionchanged:: 21.1.0
``init=False`` injects ``__attrs_init__``
.. versionchanged:: 21.1.0 Support for ``__attrs_pre_init__``
.. versionchanged:: 21.1.0 *cmp* undeprecated
.. versionadded:: 21.3.0 *match_args*
.. versionadded:: 22.2.0
*unsafe_hash* as an alias for *hash* (for :pep:`681` compliance).
.. deprecated:: 24.1.0 *repr_ns*
.. versionchanged:: 24.1.0
Instances are not compared as tuples of attributes anymore, but using a
big ``and`` condition. This is faster and has more correct behavior for
uncomparable values like `math.nan`.
.. versionadded:: 24.1.0
If a class has an *inherited* classmethod called
``__attrs_init_subclass__``, it is executed after the class is created.
.. deprecated:: 24.1.0 *hash* is deprecated in favor of *unsafe_hash*.
"""
if repr_ns is not None:
import warnings
warnings.warn(
DeprecationWarning(
"The `repr_ns` argument is deprecated and will be removed in or after August 2025."
),
stacklevel=2,
)
eq_, order_ = _determine_attrs_eq_order(cmp, eq, order, None)
# unsafe_hash takes precedence due to PEP 681.
if unsafe_hash is not None:
hash = unsafe_hash
if isinstance(on_setattr, (list, tuple)):
on_setattr = setters.pipe(*on_setattr)
def wrap(cls):
is_frozen = frozen or _has_frozen_base_class(cls)
is_exc = auto_exc is True and issubclass(cls, BaseException)
has_own_setattr = auto_detect and _has_own_attribute(
cls, "__setattr__"
)
if has_own_setattr and is_frozen:
msg = "Can't freeze a class with a custom __setattr__."
raise ValueError(msg)
builder = _ClassBuilder(
cls,
these,
slots,
is_frozen,
weakref_slot,
_determine_whether_to_implement(
cls,
getstate_setstate,
auto_detect,
("__getstate__", "__setstate__"),
default=slots,
),
auto_attribs,
kw_only,
cache_hash,
is_exc,
collect_by_mro,
on_setattr,
has_own_setattr,
field_transformer,
)
if _determine_whether_to_implement(
cls, repr, auto_detect, ("__repr__",)
):
builder.add_repr(repr_ns)
if str is True:
builder.add_str()
eq = _determine_whether_to_implement(
cls, eq_, auto_detect, ("__eq__", "__ne__")
)
if not is_exc and eq is True:
builder.add_eq()
if not is_exc and _determine_whether_to_implement(
cls, order_, auto_detect, ("__lt__", "__le__", "__gt__", "__ge__")
):
builder.add_order()
builder.add_setattr()
nonlocal hash
if (
hash is None
and auto_detect is True
and _has_own_attribute(cls, "__hash__")
):
hash = False
if hash is not True and hash is not False and hash is not None:
# Can't use `hash in` because 1 == True for example.
msg = "Invalid value for hash. Must be True, False, or None."
raise TypeError(msg)
if hash is False or (hash is None and eq is False) or is_exc:
# Don't do anything. Should fall back to __object__'s __hash__
# which is by id.
if cache_hash:
msg = "Invalid value for cache_hash. To use hash caching, hashing must be either explicitly or implicitly enabled."
raise TypeError(msg)
elif hash is True or (
hash is None and eq is True and is_frozen is True
):
# Build a __hash__ if told so, or if it's safe.
builder.add_hash()
else:
# Raise TypeError on attempts to hash.
if cache_hash:
msg = "Invalid value for cache_hash. To use hash caching, hashing must be either explicitly or implicitly enabled."
raise TypeError(msg)
builder.make_unhashable()
if _determine_whether_to_implement(
cls, init, auto_detect, ("__init__",)
):
builder.add_init()
else:
builder.add_attrs_init()
if cache_hash:
msg = "Invalid value for cache_hash. To use hash caching, init must be True."
raise TypeError(msg)
if (
PY_3_10_PLUS
and match_args
and not _has_own_attribute(cls, "__match_args__")
):
builder.add_match_args()
return builder.build_class()
# maybe_cls's type depends on the usage of the decorator. It's a class
# if it's used as `@attrs` but `None` if used as `@attrs()`.
if maybe_cls is None:
return wrap
return wrap(maybe_cls)
_attrs = attrs
"""
Internal alias so we can use it in functions that take an argument called
*attrs*.
"""
def _has_frozen_base_class(cls):
"""
Check whether *cls* has a frozen ancestor by looking at its
__setattr__.
"""
return cls.__setattr__ is _frozen_setattrs
def _generate_unique_filename(cls, func_name):
"""
Create a "filename" suitable for a function being generated.
"""
return (
f"<attrs generated {func_name} {cls.__module__}."
f"{getattr(cls, '__qualname__', cls.__name__)}>"
)
def _make_hash(cls, attrs, frozen, cache_hash):
attrs = tuple(
a for a in attrs if a.hash is True or (a.hash is None and a.eq is True)
)
tab = " "
unique_filename = _generate_unique_filename(cls, "hash")
type_hash = hash(unique_filename)
# If eq is custom generated, we need to include the functions in globs
globs = {}
hash_def = "def __hash__(self"
hash_func = "hash(("
closing_braces = "))"
if not cache_hash:
hash_def += "):"
else:
hash_def += ", *"
hash_def += ", _cache_wrapper=__import__('attr._make')._make._CacheHashWrapper):"
hash_func = "_cache_wrapper(" + hash_func
closing_braces += ")"
method_lines = [hash_def]
def append_hash_computation_lines(prefix, indent):
"""
Generate the code for actually computing the hash code.
Below this will either be returned directly or used to compute
a value which is then cached, depending on the value of cache_hash
"""
method_lines.extend(
[
indent + prefix + hash_func,
indent + f" {type_hash},",
]
)
for a in attrs:
if a.eq_key:
cmp_name = f"_{a.name}_key"
globs[cmp_name] = a.eq_key
method_lines.append(
indent + f" {cmp_name}(self.{a.name}),"
)
else:
method_lines.append(indent + f" self.{a.name},")
method_lines.append(indent + " " + closing_braces)
if cache_hash:
method_lines.append(tab + f"if self.{_HASH_CACHE_FIELD} is None:")
if frozen:
append_hash_computation_lines(
f"object.__setattr__(self, '{_HASH_CACHE_FIELD}', ", tab * 2
)
method_lines.append(tab * 2 + ")") # close __setattr__
else:
append_hash_computation_lines(
f"self.{_HASH_CACHE_FIELD} = ", tab * 2
)
method_lines.append(tab + f"return self.{_HASH_CACHE_FIELD}")
else:
append_hash_computation_lines("return ", tab)
script = "\n".join(method_lines)
return _make_method("__hash__", script, unique_filename, globs)
def _add_hash(cls, attrs):
"""
Add a hash method to *cls*.
"""
cls.__hash__ = _make_hash(cls, attrs, frozen=False, cache_hash=False)
return cls
def _make_ne():
"""
Create __ne__ method.
"""
def __ne__(self, other):
"""
Check equality and either forward a NotImplemented or
return the result negated.
"""
result = self.__eq__(other)
if result is NotImplemented:
return NotImplemented
return not result
return __ne__
def _make_eq(cls, attrs):
"""
Create __eq__ method for *cls* with *attrs*.
"""
attrs = [a for a in attrs if a.eq]
unique_filename = _generate_unique_filename(cls, "eq")
lines = [
"def __eq__(self, other):",
" if other.__class__ is not self.__class__:",
" return NotImplemented",
]
# We can't just do a big self.x = other.x and... clause due to
# irregularities like nan == nan is false but (nan,) == (nan,) is true.
globs = {}
if attrs:
lines.append(" return (")
for a in attrs:
if a.eq_key:
cmp_name = f"_{a.name}_key"
# Add the key function to the global namespace
# of the evaluated function.
globs[cmp_name] = a.eq_key
lines.append(
f" {cmp_name}(self.{a.name}) == {cmp_name}(other.{a.name})"
)
else:
lines.append(f" self.{a.name} == other.{a.name}")
if a is not attrs[-1]:
lines[-1] = f"{lines[-1]} and"
lines.append(" )")
else:
lines.append(" return True")
script = "\n".join(lines)
return _make_method("__eq__", script, unique_filename, globs)
def _make_order(cls, attrs):
"""
Create ordering methods for *cls* with *attrs*.
"""
attrs = [a for a in attrs if a.order]
def attrs_to_tuple(obj):
"""
Save us some typing.
"""
return tuple(
key(value) if key else value
for value, key in (
(getattr(obj, a.name), a.order_key) for a in attrs
)
)
def __lt__(self, other):
"""
Automatically created by attrs.
"""
if other.__class__ is self.__class__:
return attrs_to_tuple(self) < attrs_to_tuple(other)
return NotImplemented
def __le__(self, other):
"""
Automatically created by attrs.
"""
if other.__class__ is self.__class__:
return attrs_to_tuple(self) <= attrs_to_tuple(other)
return NotImplemented
def __gt__(self, other):
"""
Automatically created by attrs.
"""
if other.__class__ is self.__class__:
return attrs_to_tuple(self) > attrs_to_tuple(other)
return NotImplemented
def __ge__(self, other):
"""
Automatically created by attrs.
"""
if other.__class__ is self.__class__:
return attrs_to_tuple(self) >= attrs_to_tuple(other)
return NotImplemented
return __lt__, __le__, __gt__, __ge__
def _add_eq(cls, attrs=None):
"""
Add equality methods to *cls* with *attrs*.
"""
if attrs is None:
attrs = cls.__attrs_attrs__
cls.__eq__ = _make_eq(cls, attrs)
cls.__ne__ = _make_ne()
return cls
def _make_repr(attrs, ns, cls):
unique_filename = _generate_unique_filename(cls, "repr")
# Figure out which attributes to include, and which function to use to
# format them. The a.repr value can be either bool or a custom
# callable.
attr_names_with_reprs = tuple(
(a.name, (repr if a.repr is True else a.repr), a.init)
for a in attrs
if a.repr is not False
)
globs = {
name + "_repr": r for name, r, _ in attr_names_with_reprs if r != repr
}
globs["_compat"] = _compat
globs["AttributeError"] = AttributeError
globs["NOTHING"] = NOTHING
attribute_fragments = []
for name, r, i in attr_names_with_reprs:
accessor = (
"self." + name if i else 'getattr(self, "' + name + '", NOTHING)'
)
fragment = (
"%s={%s!r}" % (name, accessor)
if r == repr
else "%s={%s_repr(%s)}" % (name, name, accessor)
)
attribute_fragments.append(fragment)
repr_fragment = ", ".join(attribute_fragments)
if ns is None:
cls_name_fragment = '{self.__class__.__qualname__.rsplit(">.", 1)[-1]}'
else:
cls_name_fragment = ns + ".{self.__class__.__name__}"
lines = [
"def __repr__(self):",
" try:",
" already_repring = _compat.repr_context.already_repring",
" except AttributeError:",
" already_repring = {id(self),}",
" _compat.repr_context.already_repring = already_repring",
" else:",
" if id(self) in already_repring:",
" return '...'",
" else:",
" already_repring.add(id(self))",
" try:",
f" return f'{cls_name_fragment}({repr_fragment})'",
" finally:",
" already_repring.remove(id(self))",
]
return _make_method(
"__repr__", "\n".join(lines), unique_filename, globs=globs
)
def _add_repr(cls, ns=None, attrs=None):
"""
Add a repr method to *cls*.
"""
if attrs is None:
attrs = cls.__attrs_attrs__
cls.__repr__ = _make_repr(attrs, ns, cls)
return cls
def fields(cls):
"""
Return the tuple of *attrs* attributes for a class.
The tuple also allows accessing the fields by their names (see below for
examples).
Args:
cls (type): Class to introspect.
Raises:
TypeError: If *cls* is not a class.
attrs.exceptions.NotAnAttrsClassError:
If *cls* is not an *attrs* class.
Returns:
tuple (with name accessors) of `attrs.Attribute`
.. versionchanged:: 16.2.0 Returned tuple allows accessing the fields
by name.
.. versionchanged:: 23.1.0 Add support for generic classes.
"""
generic_base = get_generic_base(cls)
if generic_base is None and not isinstance(cls, type):
msg = "Passed object must be a class."
raise TypeError(msg)
attrs = getattr(cls, "__attrs_attrs__", None)
if attrs is None:
if generic_base is not None:
attrs = getattr(generic_base, "__attrs_attrs__", None)
if attrs is not None:
# Even though this is global state, stick it on here to speed
# it up. We rely on `cls` being cached for this to be
# efficient.
cls.__attrs_attrs__ = attrs
return attrs
msg = f"{cls!r} is not an attrs-decorated class."
raise NotAnAttrsClassError(msg)
return attrs
def fields_dict(cls):
"""
Return an ordered dictionary of *attrs* attributes for a class, whose keys
are the attribute names.
Args:
cls (type): Class to introspect.
Raises:
TypeError: If *cls* is not a class.
attrs.exceptions.NotAnAttrsClassError:
If *cls* is not an *attrs* class.
Returns:
dict[str, attrs.Attribute]: Dict of attribute name to definition
.. versionadded:: 18.1.0
"""
if not isinstance(cls, type):
msg = "Passed object must be a class."
raise TypeError(msg)
attrs = getattr(cls, "__attrs_attrs__", None)
if attrs is None:
msg = f"{cls!r} is not an attrs-decorated class."
raise NotAnAttrsClassError(msg)
return {a.name: a for a in attrs}
def validate(inst):
"""
Validate all attributes on *inst* that have a validator.
Leaves all exceptions through.
Args:
inst: Instance of a class with *attrs* attributes.
"""
if _config._run_validators is False:
return
for a in fields(inst.__class__):
v = a.validator
if v is not None:
v(inst, a, getattr(inst, a.name))
def _is_slot_attr(a_name, base_attr_map):
"""
Check if the attribute name comes from a slot class.
"""
cls = base_attr_map.get(a_name)
return cls and "__slots__" in cls.__dict__
def _make_init(
cls,
attrs,
pre_init,
pre_init_has_args,
post_init,
frozen,
slots,
cache_hash,
base_attr_map,
is_exc,
cls_on_setattr,
attrs_init,
):
has_cls_on_setattr = (
cls_on_setattr is not None and cls_on_setattr is not setters.NO_OP
)
if frozen and has_cls_on_setattr:
msg = "Frozen classes can't use on_setattr."
raise ValueError(msg)
needs_cached_setattr = cache_hash or frozen
filtered_attrs = []
attr_dict = {}
for a in attrs:
if not a.init and a.default is NOTHING:
continue
filtered_attrs.append(a)
attr_dict[a.name] = a
if a.on_setattr is not None:
if frozen is True:
msg = "Frozen classes can't use on_setattr."
raise ValueError(msg)
needs_cached_setattr = True
elif has_cls_on_setattr and a.on_setattr is not setters.NO_OP:
needs_cached_setattr = True
unique_filename = _generate_unique_filename(cls, "init")
script, globs, annotations = _attrs_to_init_script(
filtered_attrs,
frozen,
slots,
pre_init,
pre_init_has_args,
post_init,
cache_hash,
base_attr_map,
is_exc,
needs_cached_setattr,
has_cls_on_setattr,
"__attrs_init__" if attrs_init else "__init__",
)
if cls.__module__ in sys.modules:
# This makes typing.get_type_hints(CLS.__init__) resolve string types.
globs.update(sys.modules[cls.__module__].__dict__)
globs.update({"NOTHING": NOTHING, "attr_dict": attr_dict})
if needs_cached_setattr:
# Save the lookup overhead in __init__ if we need to circumvent
# setattr hooks.
globs["_cached_setattr_get"] = _OBJ_SETATTR.__get__
init = _make_method(
"__attrs_init__" if attrs_init else "__init__",
script,
unique_filename,
globs,
)
init.__annotations__ = annotations
return init
def _setattr(attr_name: str, value_var: str, has_on_setattr: bool) -> str:
"""
Use the cached object.setattr to set *attr_name* to *value_var*.
"""
return f"_setattr('{attr_name}', {value_var})"
def _setattr_with_converter(
attr_name: str, value_var: str, has_on_setattr: bool, converter: Converter
) -> str:
"""
Use the cached object.setattr to set *attr_name* to *value_var*, but run
its converter first.
"""
return f"_setattr('{attr_name}', {converter._fmt_converter_call(attr_name, value_var)})"
def _assign(attr_name: str, value: str, has_on_setattr: bool) -> str:
"""
Unless *attr_name* has an on_setattr hook, use normal assignment. Otherwise
relegate to _setattr.
"""
if has_on_setattr:
return _setattr(attr_name, value, True)
return f"self.{attr_name} = {value}"
def _assign_with_converter(
attr_name: str, value_var: str, has_on_setattr: bool, converter: Converter
) -> str:
"""
Unless *attr_name* has an on_setattr hook, use normal assignment after
conversion. Otherwise relegate to _setattr_with_converter.
"""
if has_on_setattr:
return _setattr_with_converter(attr_name, value_var, True, converter)
return f"self.{attr_name} = {converter._fmt_converter_call(attr_name, value_var)}"
def _determine_setters(
frozen: bool, slots: bool, base_attr_map: dict[str, type]
):
"""
Determine the correct setter functions based on whether a class is frozen
and/or slotted.
"""
if frozen is True:
if slots is True:
return (), _setattr, _setattr_with_converter
# Dict frozen classes assign directly to __dict__.
# But only if the attribute doesn't come from an ancestor slot
# class.
# Note _inst_dict will be used again below if cache_hash is True
def fmt_setter(
attr_name: str, value_var: str, has_on_setattr: bool
) -> str:
if _is_slot_attr(attr_name, base_attr_map):
return _setattr(attr_name, value_var, has_on_setattr)
return f"_inst_dict['{attr_name}'] = {value_var}"
def fmt_setter_with_converter(
attr_name: str,
value_var: str,
has_on_setattr: bool,
converter: Converter,
) -> str:
if has_on_setattr or _is_slot_attr(attr_name, base_attr_map):
return _setattr_with_converter(
attr_name, value_var, has_on_setattr, converter
)
return f"_inst_dict['{attr_name}'] = {converter._fmt_converter_call(attr_name, value_var)}"
return (
("_inst_dict = self.__dict__",),
fmt_setter,
fmt_setter_with_converter,
)
# Not frozen -- we can just assign directly.
return (), _assign, _assign_with_converter
def _attrs_to_init_script(
attrs: list[Attribute],
is_frozen: bool,
is_slotted: bool,
call_pre_init: bool,
pre_init_has_args: bool,
call_post_init: bool,
does_cache_hash: bool,
base_attr_map: dict[str, type],
is_exc: bool,
needs_cached_setattr: bool,
has_cls_on_setattr: bool,
method_name: str,
) -> tuple[str, dict, dict]:
"""
Return a script of an initializer for *attrs*, a dict of globals, and
annotations for the initializer.
The globals are required by the generated script.
"""
lines = ["self.__attrs_pre_init__()"] if call_pre_init else []
if needs_cached_setattr:
lines.append(
# Circumvent the __setattr__ descriptor to save one lookup per
# assignment. Note _setattr will be used again below if
# does_cache_hash is True.
"_setattr = _cached_setattr_get(self)"
)
extra_lines, fmt_setter, fmt_setter_with_converter = _determine_setters(
is_frozen, is_slotted, base_attr_map
)
lines.extend(extra_lines)
args = []
kw_only_args = []
attrs_to_validate = []
# This is a dictionary of names to validator and converter callables.
# Injecting this into __init__ globals lets us avoid lookups.
names_for_globals = {}
annotations = {"return": None}
for a in attrs:
if a.validator:
attrs_to_validate.append(a)
attr_name = a.name
has_on_setattr = a.on_setattr is not None or (
a.on_setattr is not setters.NO_OP and has_cls_on_setattr
)
# a.alias is set to maybe-mangled attr_name in _ClassBuilder if not
# explicitly provided
arg_name = a.alias
has_factory = isinstance(a.default, Factory)
maybe_self = "self" if has_factory and a.default.takes_self else ""
if a.converter and not isinstance(a.converter, Converter):
converter = Converter(a.converter)
else:
converter = a.converter
if a.init is False:
if has_factory:
init_factory_name = _INIT_FACTORY_PAT % (a.name,)
if converter is not None:
lines.append(
fmt_setter_with_converter(
attr_name,
init_factory_name + f"({maybe_self})",
has_on_setattr,
converter,
)
)
names_for_globals[converter._get_global_name(a.name)] = (
converter.converter
)
else:
lines.append(
fmt_setter(
attr_name,
init_factory_name + f"({maybe_self})",
has_on_setattr,
)
)
names_for_globals[init_factory_name] = a.default.factory
elif converter is not None:
lines.append(
fmt_setter_with_converter(
attr_name,
f"attr_dict['{attr_name}'].default",
has_on_setattr,
converter,
)
)
names_for_globals[converter._get_global_name(a.name)] = (
converter.converter
)
else:
lines.append(
fmt_setter(
attr_name,
f"attr_dict['{attr_name}'].default",
has_on_setattr,
)
)
elif a.default is not NOTHING and not has_factory:
arg = f"{arg_name}=attr_dict['{attr_name}'].default"
if a.kw_only:
kw_only_args.append(arg)
else:
args.append(arg)
if converter is not None:
lines.append(
fmt_setter_with_converter(
attr_name, arg_name, has_on_setattr, converter
)
)
names_for_globals[converter._get_global_name(a.name)] = (
converter.converter
)
else:
lines.append(fmt_setter(attr_name, arg_name, has_on_setattr))
elif has_factory:
arg = f"{arg_name}=NOTHING"
if a.kw_only:
kw_only_args.append(arg)
else:
args.append(arg)
lines.append(f"if {arg_name} is not NOTHING:")
init_factory_name = _INIT_FACTORY_PAT % (a.name,)
if converter is not None:
lines.append(
" "
+ fmt_setter_with_converter(
attr_name, arg_name, has_on_setattr, converter
)
)
lines.append("else:")
lines.append(
" "
+ fmt_setter_with_converter(
attr_name,
init_factory_name + "(" + maybe_self + ")",
has_on_setattr,
converter,
)
)
names_for_globals[converter._get_global_name(a.name)] = (
converter.converter
)
else:
lines.append(
" " + fmt_setter(attr_name, arg_name, has_on_setattr)
)
lines.append("else:")
lines.append(
" "
+ fmt_setter(
attr_name,
init_factory_name + "(" + maybe_self + ")",
has_on_setattr,
)
)
names_for_globals[init_factory_name] = a.default.factory
else:
if a.kw_only:
kw_only_args.append(arg_name)
else:
args.append(arg_name)
if converter is not None:
lines.append(
fmt_setter_with_converter(
attr_name, arg_name, has_on_setattr, converter
)
)
names_for_globals[converter._get_global_name(a.name)] = (
converter.converter
)
else:
lines.append(fmt_setter(attr_name, arg_name, has_on_setattr))
if a.init is True:
if a.type is not None and converter is None:
annotations[arg_name] = a.type
elif converter is not None and converter._first_param_type:
# Use the type from the converter if present.
annotations[arg_name] = converter._first_param_type
if attrs_to_validate: # we can skip this if there are no validators.
names_for_globals["_config"] = _config
lines.append("if _config._run_validators is True:")
for a in attrs_to_validate:
val_name = "__attr_validator_" + a.name
attr_name = "__attr_" + a.name
lines.append(f" {val_name}(self, {attr_name}, self.{a.name})")
names_for_globals[val_name] = a.validator
names_for_globals[attr_name] = a
if call_post_init:
lines.append("self.__attrs_post_init__()")
# Because this is set only after __attrs_post_init__ is called, a crash
# will result if post-init tries to access the hash code. This seemed
# preferable to setting this beforehand, in which case alteration to field
# values during post-init combined with post-init accessing the hash code
# would result in silent bugs.
if does_cache_hash:
if is_frozen:
if is_slotted:
init_hash_cache = f"_setattr('{_HASH_CACHE_FIELD}', None)"
else:
init_hash_cache = f"_inst_dict['{_HASH_CACHE_FIELD}'] = None"
else:
init_hash_cache = f"self.{_HASH_CACHE_FIELD} = None"
lines.append(init_hash_cache)
# For exceptions we rely on BaseException.__init__ for proper
# initialization.
if is_exc:
vals = ",".join(f"self.{a.name}" for a in attrs if a.init)
lines.append(f"BaseException.__init__(self, {vals})")
args = ", ".join(args)
pre_init_args = args
if kw_only_args:
# leading comma & kw_only args
args += f"{', ' if args else ''}*, {', '.join(kw_only_args)}"
pre_init_kw_only_args = ", ".join(
[
f"{kw_arg_name}={kw_arg_name}"
# We need to remove the defaults from the kw_only_args.
for kw_arg_name in (kwa.split("=")[0] for kwa in kw_only_args)
]
)
pre_init_args += ", " if pre_init_args else ""
pre_init_args += pre_init_kw_only_args
if call_pre_init and pre_init_has_args:
# If pre init method has arguments, pass same arguments as `__init__`.
lines[0] = f"self.__attrs_pre_init__({pre_init_args})"
# Python 3.7 doesn't allow backslashes in f strings.
NL = "\n "
return (
f"""def {method_name}(self, {args}):
{NL.join(lines) if lines else 'pass'}
""",
names_for_globals,
annotations,
)
def _default_init_alias_for(name: str) -> str:
"""
The default __init__ parameter name for a field.
This performs private-name adjustment via leading-unscore stripping,
and is the default value of Attribute.alias if not provided.
"""
return name.lstrip("_")
class Attribute:
"""
*Read-only* representation of an attribute.
.. warning::
You should never instantiate this class yourself.
The class has *all* arguments of `attr.ib` (except for ``factory`` which is
only syntactic sugar for ``default=Factory(...)`` plus the following:
- ``name`` (`str`): The name of the attribute.
- ``alias`` (`str`): The __init__ parameter name of the attribute, after
any explicit overrides and default private-attribute-name handling.
- ``inherited`` (`bool`): Whether or not that attribute has been inherited
from a base class.
- ``eq_key`` and ``order_key`` (`typing.Callable` or `None`): The
callables that are used for comparing and ordering objects by this
attribute, respectively. These are set by passing a callable to
`attr.ib`'s ``eq``, ``order``, or ``cmp`` arguments. See also
:ref:`comparison customization <custom-comparison>`.
Instances of this class are frequently used for introspection purposes
like:
- `fields` returns a tuple of them.
- Validators get them passed as the first argument.
- The :ref:`field transformer <transform-fields>` hook receives a list of
them.
- The ``alias`` property exposes the __init__ parameter name of the field,
with any overrides and default private-attribute handling applied.
.. versionadded:: 20.1.0 *inherited*
.. versionadded:: 20.1.0 *on_setattr*
.. versionchanged:: 20.2.0 *inherited* is not taken into account for
equality checks and hashing anymore.
.. versionadded:: 21.1.0 *eq_key* and *order_key*
.. versionadded:: 22.2.0 *alias*
For the full version history of the fields, see `attr.ib`.
"""
__slots__ = (
"name",
"default",
"validator",
"repr",
"eq",
"eq_key",
"order",
"order_key",
"hash",
"init",
"metadata",
"type",
"converter",
"kw_only",
"inherited",
"on_setattr",
"alias",
)
def __init__(
self,
name,
default,
validator,
repr,
cmp, # XXX: unused, remove along with other cmp code.
hash,
init,
inherited,
metadata=None,
type=None,
converter=None,
kw_only=False,
eq=None,
eq_key=None,
order=None,
order_key=None,
on_setattr=None,
alias=None,
):
eq, eq_key, order, order_key = _determine_attrib_eq_order(
cmp, eq_key or eq, order_key or order, True
)
# Cache this descriptor here to speed things up later.
bound_setattr = _OBJ_SETATTR.__get__(self)
# Despite the big red warning, people *do* instantiate `Attribute`
# themselves.
bound_setattr("name", name)
bound_setattr("default", default)
bound_setattr("validator", validator)
bound_setattr("repr", repr)
bound_setattr("eq", eq)
bound_setattr("eq_key", eq_key)
bound_setattr("order", order)
bound_setattr("order_key", order_key)
bound_setattr("hash", hash)
bound_setattr("init", init)
bound_setattr("converter", converter)
bound_setattr(
"metadata",
(
types.MappingProxyType(dict(metadata)) # Shallow copy
if metadata
else _EMPTY_METADATA_SINGLETON
),
)
bound_setattr("type", type)
bound_setattr("kw_only", kw_only)
bound_setattr("inherited", inherited)
bound_setattr("on_setattr", on_setattr)
bound_setattr("alias", alias)
def __setattr__(self, name, value):
raise FrozenInstanceError()
@classmethod
def from_counting_attr(cls, name, ca, type=None):
# type holds the annotated value. deal with conflicts:
if type is None:
type = ca.type
elif ca.type is not None:
msg = "Type annotation and type argument cannot both be present"
raise ValueError(msg)
inst_dict = {
k: getattr(ca, k)
for k in Attribute.__slots__
if k
not in (
"name",
"validator",
"default",
"type",
"inherited",
) # exclude methods and deprecated alias
}
return cls(
name=name,
validator=ca._validator,
default=ca._default,
type=type,
cmp=None,
inherited=False,
**inst_dict,
)
# Don't use attrs.evolve since fields(Attribute) doesn't work
def evolve(self, **changes):
"""
Copy *self* and apply *changes*.
This works similarly to `attrs.evolve` but that function does not work
with {class}`Attribute`.
It is mainly meant to be used for `transform-fields`.
.. versionadded:: 20.3.0
"""
new = copy.copy(self)
new._setattrs(changes.items())
return new
# Don't use _add_pickle since fields(Attribute) doesn't work
def __getstate__(self):
"""
Play nice with pickle.
"""
return tuple(
getattr(self, name) if name != "metadata" else dict(self.metadata)
for name in self.__slots__
)
def __setstate__(self, state):
"""
Play nice with pickle.
"""
self._setattrs(zip(self.__slots__, state))
def _setattrs(self, name_values_pairs):
bound_setattr = _OBJ_SETATTR.__get__(self)
for name, value in name_values_pairs:
if name != "metadata":
bound_setattr(name, value)
else:
bound_setattr(
name,
(
types.MappingProxyType(dict(value))
if value
else _EMPTY_METADATA_SINGLETON
),
)
_a = [
Attribute(
name=name,
default=NOTHING,
validator=None,
repr=True,
cmp=None,
eq=True,
order=False,
hash=(name != "metadata"),
init=True,
inherited=False,
alias=_default_init_alias_for(name),
)
for name in Attribute.__slots__
]
Attribute = _add_hash(
_add_eq(
_add_repr(Attribute, attrs=_a),
attrs=[a for a in _a if a.name != "inherited"],
),
attrs=[a for a in _a if a.hash and a.name != "inherited"],
)
class _CountingAttr:
"""
Intermediate representation of attributes that uses a counter to preserve
the order in which the attributes have been defined.
*Internal* data structure of the attrs library. Running into is most
likely the result of a bug like a forgotten `@attr.s` decorator.
"""
__slots__ = (
"counter",
"_default",
"repr",
"eq",
"eq_key",
"order",
"order_key",
"hash",
"init",
"metadata",
"_validator",
"converter",
"type",
"kw_only",
"on_setattr",
"alias",
)
__attrs_attrs__ = (
*tuple(
Attribute(
name=name,
alias=_default_init_alias_for(name),
default=NOTHING,
validator=None,
repr=True,
cmp=None,
hash=True,
init=True,
kw_only=False,
eq=True,
eq_key=None,
order=False,
order_key=None,
inherited=False,
on_setattr=None,
)
for name in (
"counter",
"_default",
"repr",
"eq",
"order",
"hash",
"init",
"on_setattr",
"alias",
)
),
Attribute(
name="metadata",
alias="metadata",
default=None,
validator=None,
repr=True,
cmp=None,
hash=False,
init=True,
kw_only=False,
eq=True,
eq_key=None,
order=False,
order_key=None,
inherited=False,
on_setattr=None,
),
)
cls_counter = 0
def __init__(
self,
default,
validator,
repr,
cmp,
hash,
init,
converter,
metadata,
type,
kw_only,
eq,
eq_key,
order,
order_key,
on_setattr,
alias,
):
_CountingAttr.cls_counter += 1
self.counter = _CountingAttr.cls_counter
self._default = default
self._validator = validator
self.converter = converter
self.repr = repr
self.eq = eq
self.eq_key = eq_key
self.order = order
self.order_key = order_key
self.hash = hash
self.init = init
self.metadata = metadata
self.type = type
self.kw_only = kw_only
self.on_setattr = on_setattr
self.alias = alias
def validator(self, meth):
"""
Decorator that adds *meth* to the list of validators.
Returns *meth* unchanged.
.. versionadded:: 17.1.0
"""
if self._validator is None:
self._validator = meth
else:
self._validator = and_(self._validator, meth)
return meth
def default(self, meth):
"""
Decorator that allows to set the default for an attribute.
Returns *meth* unchanged.
Raises:
DefaultAlreadySetError: If default has been set before.
.. versionadded:: 17.1.0
"""
if self._default is not NOTHING:
raise DefaultAlreadySetError()
self._default = Factory(meth, takes_self=True)
return meth
_CountingAttr = _add_eq(_add_repr(_CountingAttr))
class Factory:
"""
Stores a factory callable.
If passed as the default value to `attrs.field`, the factory is used to
generate a new value.
Args:
factory (typing.Callable):
A callable that takes either none or exactly one mandatory
positional argument depending on *takes_self*.
takes_self (bool):
Pass the partially initialized instance that is being initialized
as a positional argument.
.. versionadded:: 17.1.0 *takes_self*
"""
__slots__ = ("factory", "takes_self")
def __init__(self, factory, takes_self=False):
self.factory = factory
self.takes_self = takes_self
def __getstate__(self):
"""
Play nice with pickle.
"""
return tuple(getattr(self, name) for name in self.__slots__)
def __setstate__(self, state):
"""
Play nice with pickle.
"""
for name, value in zip(self.__slots__, state):
setattr(self, name, value)
_f = [
Attribute(
name=name,
default=NOTHING,
validator=None,
repr=True,
cmp=None,
eq=True,
order=False,
hash=True,
init=True,
inherited=False,
)
for name in Factory.__slots__
]
Factory = _add_hash(_add_eq(_add_repr(Factory, attrs=_f), attrs=_f), attrs=_f)
class Converter:
"""
Stores a converter callable.
Allows for the wrapped converter to take additional arguments. The
arguments are passed in the order they are documented.
Args:
converter (Callable): A callable that converts the passed value.
takes_self (bool):
Pass the partially initialized instance that is being initialized
as a positional argument. (default: `False`)
takes_field (bool):
Pass the field definition (an :class:`Attribute`) into the
converter as a positional argument. (default: `False`)
.. versionadded:: 24.1.0
"""
__slots__ = (
"converter",
"takes_self",
"takes_field",
"_first_param_type",
"_global_name",
"__call__",
)
def __init__(self, converter, *, takes_self=False, takes_field=False):
self.converter = converter
self.takes_self = takes_self
self.takes_field = takes_field
ex = _AnnotationExtractor(converter)
self._first_param_type = ex.get_first_param_type()
if not (self.takes_self or self.takes_field):
self.__call__ = lambda value, _, __: self.converter(value)
elif self.takes_self and not self.takes_field:
self.__call__ = lambda value, instance, __: self.converter(
value, instance
)
elif not self.takes_self and self.takes_field:
self.__call__ = lambda value, __, field: self.converter(
value, field
)
else:
self.__call__ = lambda value, instance, field: self.converter(
value, instance, field
)
rt = ex.get_return_type()
if rt is not None:
self.__call__.__annotations__["return"] = rt
@staticmethod
def _get_global_name(attr_name: str) -> str:
"""
Return the name that a converter for an attribute name *attr_name*
would have.
"""
return f"__attr_converter_{attr_name}"
def _fmt_converter_call(self, attr_name: str, value_var: str) -> str:
"""
Return a string that calls the converter for an attribute name
*attr_name* and the value in variable named *value_var* according to
`self.takes_self` and `self.takes_field`.
"""
if not (self.takes_self or self.takes_field):
return f"{self._get_global_name(attr_name)}({value_var})"
if self.takes_self and self.takes_field:
return f"{self._get_global_name(attr_name)}({value_var}, self, attr_dict['{attr_name}'])"
if self.takes_self:
return f"{self._get_global_name(attr_name)}({value_var}, self)"
return f"{self._get_global_name(attr_name)}({value_var}, attr_dict['{attr_name}'])"
def __getstate__(self):
"""
Return a dict containing only converter and takes_self -- the rest gets
computed when loading.
"""
return {
"converter": self.converter,
"takes_self": self.takes_self,
"takes_field": self.takes_field,
}
def __setstate__(self, state):
"""
Load instance from state.
"""
self.__init__(**state)
_f = [
Attribute(
name=name,
default=NOTHING,
validator=None,
repr=True,
cmp=None,
eq=True,
order=False,
hash=True,
init=True,
inherited=False,
)
for name in ("converter", "takes_self", "takes_field")
]
Converter = _add_hash(
_add_eq(_add_repr(Converter, attrs=_f), attrs=_f), attrs=_f
)
def make_class(
name, attrs, bases=(object,), class_body=None, **attributes_arguments
):
r"""
A quick way to create a new class called *name* with *attrs*.
Args:
name (str): The name for the new class.
attrs( list | dict):
A list of names or a dictionary of mappings of names to `attr.ib`\
s / `attrs.field`\ s.
The order is deduced from the order of the names or attributes
inside *attrs*. Otherwise the order of the definition of the
attributes is used.
bases (tuple[type, ...]): Classes that the new class will subclass.
class_body (dict):
An optional dictionary of class attributes for the new class.
attributes_arguments: Passed unmodified to `attr.s`.
Returns:
type: A new class with *attrs*.
.. versionadded:: 17.1.0 *bases*
.. versionchanged:: 18.1.0 If *attrs* is ordered, the order is retained.
.. versionchanged:: 23.2.0 *class_body*
"""
if isinstance(attrs, dict):
cls_dict = attrs
elif isinstance(attrs, (list, tuple)):
cls_dict = {a: attrib() for a in attrs}
else:
msg = "attrs argument must be a dict or a list."
raise TypeError(msg)
pre_init = cls_dict.pop("__attrs_pre_init__", None)
post_init = cls_dict.pop("__attrs_post_init__", None)
user_init = cls_dict.pop("__init__", None)
body = {}
if class_body is not None:
body.update(class_body)
if pre_init is not None:
body["__attrs_pre_init__"] = pre_init
if post_init is not None:
body["__attrs_post_init__"] = post_init
if user_init is not None:
body["__init__"] = user_init
type_ = types.new_class(name, bases, {}, lambda ns: ns.update(body))
# For pickling to work, the __module__ variable needs to be set to the
# frame where the class is created. Bypass this step in environments where
# sys._getframe is not defined (Jython for example) or sys._getframe is not
# defined for arguments greater than 0 (IronPython).
with contextlib.suppress(AttributeError, ValueError):
type_.__module__ = sys._getframe(1).f_globals.get(
"__name__", "__main__"
)
# We do it here for proper warnings with meaningful stacklevel.
cmp = attributes_arguments.pop("cmp", None)
(
attributes_arguments["eq"],
attributes_arguments["order"],
) = _determine_attrs_eq_order(
cmp,
attributes_arguments.get("eq"),
attributes_arguments.get("order"),
True,
)
cls = _attrs(these=cls_dict, **attributes_arguments)(type_)
# Only add type annotations now or "_attrs()" will complain:
cls.__annotations__ = {
k: v.type for k, v in cls_dict.items() if v.type is not None
}
return cls
# These are required by within this module so we define them here and merely
# import into .validators / .converters.
@attrs(slots=True, unsafe_hash=True)
class _AndValidator:
"""
Compose many validators to a single one.
"""
_validators = attrib()
def __call__(self, inst, attr, value):
for v in self._validators:
v(inst, attr, value)
def and_(*validators):
"""
A validator that composes multiple validators into one.
When called on a value, it runs all wrapped validators.
Args:
validators (~collections.abc.Iterable[typing.Callable]):
Arbitrary number of validators.
.. versionadded:: 17.1.0
"""
vals = []
for validator in validators:
vals.extend(
validator._validators
if isinstance(validator, _AndValidator)
else [validator]
)
return _AndValidator(tuple(vals))
def pipe(*converters):
"""
A converter that composes multiple converters into one.
When called on a value, it runs all wrapped converters, returning the
*last* value.
Type annotations will be inferred from the wrapped converters', if they
have any.
converters (~collections.abc.Iterable[typing.Callable]):
Arbitrary number of converters.
.. versionadded:: 20.1.0
"""
def pipe_converter(val, inst, field):
for c in converters:
val = c(val, inst, field) if isinstance(c, Converter) else c(val)
return val
if not converters:
# If the converter list is empty, pipe_converter is the identity.
A = typing.TypeVar("A")
pipe_converter.__annotations__.update({"val": A, "return": A})
else:
# Get parameter type from first converter.
t = _AnnotationExtractor(converters[0]).get_first_param_type()
if t:
pipe_converter.__annotations__["val"] = t
last = converters[-1]
if not PY_3_11_PLUS and isinstance(last, Converter):
last = last.__call__
# Get return type from last converter.
rt = _AnnotationExtractor(last).get_return_type()
if rt:
pipe_converter.__annotations__["return"] = rt
return Converter(pipe_converter, takes_self=True, takes_field=True)