Skip to content

Extensions

All extensions work with context manager

OnVisitExtension

Runs for each node in the expression

Source code in dynamic_expressions/extensions.py
@runtime_checkable
class OnVisitExtension[Context: EmptyContext](Protocol):
    def on_visit(
        self,
        *,
        node: Node,
        provided_context: Context,
        execution_context: ExecutionContext,
    ) -> AbstractAsyncContextManager[None]: ...

CacheExtension

Caching result for nodes

Base cache extension

Source code in dynamic_expressions/cache/base.py
class CacheExtension[
    Context: EmptyContext,
](OnVisitExtension[Context]):
    policies: Sequence[CachePolicy[Context]]
    default_serializer: Serializer[Any]
    _policy_cache: MutableMapping[type[Node], CachePolicy[Context] | None]

    @contextlib.asynccontextmanager
    async def on_visit(
        self,
        *,
        node: Node,
        provided_context: Context,
        execution_context: ExecutionContext,
    ) -> AsyncIterator[None]:
        policy = self._get_policy(node)

        if policy is None or node in execution_context.cache:
            yield
            return

        key = policy.key(node, provided_context)
        serializer = policy.serializer or self.default_serializer
        cached_value = await self.get_cache(key)
        deserialized_value: object | None = None
        if cached_value is not None:
            deserialized_value = serializer.deserialize(cached_value)
            execution_context.cache[node] = deserialized_value

        yield

        if (
            node in execution_context.cache
            and deserialized_value != execution_context.cache[node]
        ):
            await self.set_cache(
                key=key,
                value=serializer.serialize(execution_context.cache[node]),
                policy=policy,
            )

    def _get_policy(self, node: Node) -> CachePolicy[Context] | None:
        node_cls = type(node)
        if node_cls in self._policy_cache:
            return self._policy_cache[node_cls]

        self._policy_cache[node_cls] = next(
            (policy for policy in self.policies if isinstance(node, policy.types)),
            None,
        )
        return self._policy_cache[node_cls]

    @abc.abstractmethod
    async def get_cache(self, key: str) -> Any | None: ...  # noqa: ANN401

    @abc.abstractmethod
    async def set_cache(
        self,
        key: str,
        value: Any,  # noqa: ANN401
        policy: CachePolicy[Context],
    ) -> None: ...

For redis

Source code in dynamic_expressions/cache/redis.py
class RedisCacheExtension[Context: EmptyContext](CacheExtension[Context]):
    def __init__(
        self,
        client: RedisClient,
        policies: Sequence[CachePolicy[Context]],
        default_serializer: Serializer[Any],
    ) -> None:
        self.policies = policies
        self.default_serializer = default_serializer
        self._client = client
        self._policy_cache = {}

    async def get_cache(self, key: str) -> bytes | None:
        return await self._client.get(name=key)

    async def set_cache(
        self,
        key: str,
        value: bytes,
        policy: CachePolicy[Context],
    ) -> None:
        await self._client.set(name=key, value=value, ex=policy.ttl)