Extensions
All extensions work with context manager
OnVisitExtension
Runs for each node in the expression
Source code in dynamic_expressions/extensions.py
| @runtime_checkable
class OnVisitExtension[Context: EmptyContext](Protocol):
def on_visit(
self,
*,
node: Node,
provided_context: Context,
execution_context: ExecutionContext,
) -> AbstractAsyncContextManager[None]: ...
|
CacheExtension
Caching result for nodes
Base cache extension
Source code in dynamic_expressions/cache/base.py
| class CacheExtension[
Context: EmptyContext,
](OnVisitExtension[Context]):
policies: Sequence[CachePolicy[Context]]
default_serializer: Serializer[Any]
_policy_cache: MutableMapping[type[Node], CachePolicy[Context] | None]
@contextlib.asynccontextmanager
async def on_visit(
self,
*,
node: Node,
provided_context: Context,
execution_context: ExecutionContext,
) -> AsyncIterator[None]:
policy = self._get_policy(node)
if policy is None or node in execution_context.cache:
yield
return
key = policy.key(node, provided_context)
serializer = policy.serializer or self.default_serializer
cached_value = await self.get_cache(key)
deserialized_value: object | None = None
if cached_value is not None:
deserialized_value = serializer.deserialize(cached_value)
execution_context.cache[node] = deserialized_value
yield
if (
node in execution_context.cache
and deserialized_value != execution_context.cache[node]
):
await self.set_cache(
key=key,
value=serializer.serialize(execution_context.cache[node]),
policy=policy,
)
def _get_policy(self, node: Node) -> CachePolicy[Context] | None:
node_cls = type(node)
if node_cls in self._policy_cache:
return self._policy_cache[node_cls]
self._policy_cache[node_cls] = next(
(policy for policy in self.policies if isinstance(node, policy.types)),
None,
)
return self._policy_cache[node_cls]
@abc.abstractmethod
async def get_cache(self, key: str) -> Any | None: ... # noqa: ANN401
@abc.abstractmethod
async def set_cache(
self,
key: str,
value: Any, # noqa: ANN401
policy: CachePolicy[Context],
) -> None: ...
|
For redis
Source code in dynamic_expressions/cache/redis.py
| class RedisCacheExtension[Context: EmptyContext](CacheExtension[Context]):
def __init__(
self,
client: RedisClient,
policies: Sequence[CachePolicy[Context]],
default_serializer: Serializer[Any],
) -> None:
self.policies = policies
self.default_serializer = default_serializer
self._client = client
self._policy_cache = {}
async def get_cache(self, key: str) -> bytes | None:
return await self._client.get(name=key)
async def set_cache(
self,
key: str,
value: bytes,
policy: CachePolicy[Context],
) -> None:
await self._client.set(name=key, value=value, ex=policy.ttl)
|