Source code for libres.context.core

import libres
import threading

from cached_property import cached_property
from contextlib import contextmanager
from libres.modules import errors


missing = object()
required = object()


[docs]class StoppableService(object): """ Services inheriting from this class have their stop_service method called when the service is discarded. Note that this only happens when a service is replaced with a new one and not when libres is stopped (i.e. this is *not* a deconstructor). """ def stop_service(self): pass
[docs]class ContextServicesMixin(object): """ Provides access methods to the context's services. Expects the class that uses the mixin to provide self.context. The results are cached for performance. """ @cached_property def is_allocation_exposed(self): return self.context.get_service('exposure').is_allocation_exposed @cached_property def generate_uuid(self): return self.context.get_service('uuid_generator') @cached_property def validate_email(self): return self.context.get_service('email_validator')
[docs] def clear_cache(self): """ Clears the cache of the mixin. """ try: del self.is_allocation_exposed except AttributeError: pass try: del self.generate_uuid except AttributeError: pass try: del self.validate_email except AttributeError: pass
@property def session_provider(self): return self.context.get_service('session_provider') @property def session(self): """ Returns the current session. """ return self.session_provider.session()
[docs] def close(self): """ Closes the current session. """ self.session.close()
@property def begin_nested(self): return self.session.begin_nested def commit(self): return self.session.commit() def rollback(self): return self.session.rollback()
[docs]class Context(object): """ Used throughout Libres, the context holds settings like the database connection string and services like the json dumps/loads functions that should be used. Contexts allow consumers of the Libres library to override these settings / services as they wish. It also makes sure that multiple consumers of Libres can co-exist in a single process, as each consumer must operate on it's own context. Libres holds all contexts in libres.registry and provides a master_context. When a consumer registers its own context, all lookups happen on the custom context. If that context can provide a service or a setting, it is used. If the custom context can't provide a service or a setting, the master_context is used instead. In other words, the custom context inherits from the master context. Note that contexts not meant to be changed often. Classes talking to the database usually cache data form the context freely. That means basically that after changing the context you should get a fresh :class:`~libres.db.scheduler.Scheduler` instance or call :meth:`~.ContextServicesMixin.clear_cache`. A context may be registered as follows:: from libres import registry my_context = registry.register_context('my_app') See also :class:`~libres.context.registry.Registry` """
[docs] def __init__(self, name, registry=None, parent=None, locked=False): self.name = name self.registry = registry or libres.registry self.values = {} self.parent = parent self.locked = False self.thread_lock = threading.RLock()
def __repr__(self): return "<Libres Context(name='{}')>".format(self.name) @contextmanager def as_current_context(self): with self.registry.context(self.name): yield def switch_to(self): self.registry.switch_context(self.name) def lock(self): with self.thread_lock: self.locked = True def unlock(self): with self.thread_lock: self.locked = False def get(self, key): if key in self.values: return self.values[key] elif self.parent: return self.parent.get(key) else: return missing def set(self, key, value): if self.locked: raise errors.ContextIsLocked with self.thread_lock: # If a value already exists it could be a stoppable service. # Stoppable services are called before they are stop so they # can clean up after themselves without having to wait for the GC. if isinstance(self.values.get(key), StoppableService): self.values[key].stop_service() self.values[key] = value def get_setting(self, name): return self.get('settings.{}'.format(name)) def set_setting(self, name, value): with self.thread_lock: self.set('settings.{}'.format(name), value) def get_service(self, name): service_id = '/'.join(('service', name)) service = self.get(service_id) if service is missing: raise errors.UnknownService(service_id) cache_id = '/'.join(('service', name, 'cache')) cache = self.get(cache_id) # no cache if cache is missing: return service(self) else: # first call, cache it! if cache is required: self.set(cache_id, service(self)) # nth call, use cached value return self.get(cache_id) def set_service(self, name, factory, cache=False): with self.thread_lock: service_id = '/'.join(('service', name)) self.set(service_id, factory) if cache: cache_id = '/'.join(('service', name, 'cache')) self.set(cache_id, required)