Source code for apptools.naming.context

# (C) Copyright 2005-2022 Enthought, Inc., Austin, TX
# All rights reserved.
#
# This software is provided without warranty under the terms of the BSD
# license included in LICENSE.txt and may be redistributed only under
# the conditions described in the aforementioned license. The license
# is also available online at http://www.enthought.com/licenses/BSD.txt
#
# Thanks for using Enthought open source!
""" The base class for all naming contexts. """


# Enthought library imports.
from traits.api import Any, Dict, Event, HasTraits
from traits.api import Property, Str

# Local imports.
from .binding import Binding
from .exception import InvalidNameError, NameAlreadyBoundError
from .exception import NameNotFoundError, NotContextError
from .naming_event import NamingEvent
from .naming_manager import naming_manager
from .unique_name import make_unique_name


# Constants for environment property keys.
INITIAL_CONTEXT_FACTORY = "apptools.naming.factory.initial"
OBJECT_FACTORIES = "apptools.naming.factory.object"
STATE_FACTORIES = "apptools.naming.factory.state"


# The default environment.
ENVIRONMENT = {
    # 'Context' properties.
    OBJECT_FACTORIES: [],
    STATE_FACTORIES: [],
}


[docs]class Context(HasTraits): """ The base class for all naming contexts. """ # Keys for environment properties. INITIAL_CONTEXT_FACTORY = INITIAL_CONTEXT_FACTORY OBJECT_FACTORIES = OBJECT_FACTORIES STATE_FACTORIES = STATE_FACTORIES #### 'Context' interface ################################################## # The naming environment in effect for this context. environment = Dict(ENVIRONMENT) # The name of the context within its own namespace. namespace_name = Property(Str) #### Events #### # Fired when an object has been added to the context (either via 'bind' or # 'create_subcontext'). object_added = Event(NamingEvent) # Fired when an object has been changed (via 'rebind'). object_changed = Event(NamingEvent) # Fired when an object has been removed from the context (either via # 'unbind' or 'destroy_subcontext'). object_removed = Event(NamingEvent) # Fired when an object in the context has been renamed (via 'rename'). object_renamed = Event(NamingEvent) # Fired when the contents of the context have changed dramatically. context_changed = Event(NamingEvent) #### Protected 'Context' interface ####################################### # The bindings in the context. _bindings = Dict(Str, Any) ########################################################################### # 'Context' interface. ########################################################################### #### Properties ########################################################### def _get_namespace_name(self): """ Return the name of the context within its own namespace. That is the full-path, through the namespace this context participates in, to get to this context. For example, if the root context of the namespace was called 'Foo', and there was a subcontext of that called 'Bar', and we were within that and called 'Baz', then this should return 'Foo/Bar/Baz'. """ # FIXME: We'd like to raise an exception and force implementors to # decide what to do. However, it appears to be pretty common that # most Context implementations do not override this method -- possibly # because the comments aren't clear on what this is supposed to be? # # Anyway, if we raise an exception then it is impossible to use any # evaluations when building a Traits UI for a Context. That is, the # Traits UI can't include items that have a 'visible_when' or # 'enabled_when' evaluation. This is because the Traits evaluation # code calls the 'get()' method on the Context which attempts to # retrieve the current namespace_name value. # raise OperationNotSupportedError() return "" #### Methods ##############################################################
[docs] def bind(self, name, obj, make_contexts=False): """Binds a name to an object. If 'make_contexts' is True then any missing intermediate contexts are created automatically. """ if len(name) == 0: raise InvalidNameError("empty name") # Parse the name. components = self._parse_name(name) # If there is exactly one component in the name then the operation # takes place in this context. if len(components) == 1: atom = components[0] # Is the name already bound? if self._is_bound(atom): raise NameAlreadyBoundError(name) # Do the actual bind. self._bind(atom, obj) # Trait event notification. self.object_added = NamingEvent( new_binding=Binding(name=name, obj=obj, context=self) ) # Otherwise, attempt to continue resolution into the next context. else: if not self._is_bound(components[0]): if make_contexts: self._create_subcontext(components[0]) else: raise NameNotFoundError(components[0]) next_context = self._get_next_context(components[0]) next_context.bind("/".join(components[1:]), obj, make_contexts)
[docs] def rebind(self, name, obj, make_contexts=False): """Binds an object to a name that may already be bound. If 'make_contexts' is True then any missing intermediate contexts are created automatically. The object may be a different object but may also be the same object that is already bound to the specified name. The name may or may not be already used. Think of this as a safer version of 'bind' since this one will never raise an exception regarding a name being used. """ if len(name) == 0: raise InvalidNameError("empty name") # Parse the name. components = self._parse_name(name) # If there is exactly one component in the name then the operation # takes place in this context. if len(components) == 1: # Do the actual rebind. self._rebind(components[0], obj) # Trait event notification. self.object_changed = NamingEvent( new_binding=Binding(name=name, obj=obj, context=self) ) # Otherwise, attempt to continue resolution into the next context. else: if not self._is_bound(components[0]): if make_contexts: self._create_subcontext(components[0]) else: raise NameNotFoundError(components[0]) next_context = self._get_next_context(components[0]) next_context.rebind("/".join(components[1:]), obj, make_contexts)
[docs] def unbind(self, name): """ Unbinds a name. """ if len(name) == 0: raise InvalidNameError("empty name") # Parse the name. components = self._parse_name(name) # If there is exactly one component in the name then the operation # takes place in this context. if len(components) == 1: atom = components[0] if not self._is_bound(atom): raise NameNotFoundError(name) # Lookup the object that we are unbinding to use in the event # notification. obj = self._lookup(atom) # Do the actual unbind. self._unbind(atom) # Trait event notification. self.object_removed = NamingEvent( old_binding=Binding(name=name, obj=obj, context=self) ) # Otherwise, attempt to continue resolution into the next context. else: if not self._is_bound(components[0]): raise NameNotFoundError(components[0]) next_context = self._get_next_context(components[0]) next_context.unbind("/".join(components[1:]))
[docs] def rename(self, old_name, new_name): """ Binds a new name to an object. """ if len(old_name) == 0 or len(new_name) == 0: raise InvalidNameError("empty name") # Parse the names. old_components = self._parse_name(old_name) new_components = self._parse_name(new_name) # If there is axactly one component in BOTH names then the operation # takes place ENTIRELY in this context. if len(old_components) == 1 and len(new_components) == 1: # Is the old name actually bound? if not self._is_bound(old_name): raise NameNotFoundError(old_name) # Is the new name already bound? if self._is_bound(new_name): raise NameAlreadyBoundError(new_name) # Do the actual rename. self._rename(old_name, new_name) # Lookup the object that we are renaming to use in the event # notification. obj = self._lookup(new_name) # Trait event notification. self.object_renamed = NamingEvent( old_binding=Binding(name=old_name, obj=obj, context=self), new_binding=Binding(name=new_name, obj=obj, context=self), ) else: # fixme: This really needs to be transactional in case the bind # succeeds but the unbind fails. To be safe should we just not # support cross-context renaming for now?!?! # # Lookup the object. obj = self.lookup(old_name) # Bind the new name. self.bind(new_name, obj) # Unbind the old one. self.unbind(old_name)
[docs] def lookup(self, name): """ Resolves a name relative to this context. """ # If the name is empty we return the context itself. if len(name) == 0: # fixme: The JNDI spec. says that this should return a COPY of # the context. return self # Parse the name. components = self._parse_name(name) # If there is exactly one component in the name then the operation # takes place in this context. if len(components) == 1: atom = components[0] if not self._is_bound(atom): raise NameNotFoundError(name) # Do the actual lookup. obj = self._lookup(atom) # Otherwise, attempt to continue resolution into the next context. else: if not self._is_bound(components[0]): raise NameNotFoundError(components[0]) next_context = self._get_next_context(components[0]) obj = next_context.lookup("/".join(components[1:])) return obj
# fixme: Non-JNDI
[docs] def lookup_binding(self, name): """ Looks up the binding for a name relative to this context. """ if len(name) == 0: raise InvalidNameError("empty name") # Parse the name. components = self._parse_name(name) # If there is exactly one component in the name then the operation # takes place in this context. if len(components) == 1: atom = components[0] if not self._is_bound(atom): raise NameNotFoundError(name) # Do the actual lookup. binding = self._lookup_binding(atom) # Otherwise, attempt to continue resolution into the next context. else: if not self._is_bound(components[0]): raise NameNotFoundError(components[0]) next_context = self._get_next_context(components[0]) binding = next_context.lookup_binding("/".join(components[1:])) return binding
# fixme: Non-JNDI
[docs] def lookup_context(self, name): """Resolves a name relative to this context. The name MUST resolve to a context. """ # If the name is empty we return the context itself. if len(name) == 0: # fixme: The JNDI spec. says that this should return a COPY of # the context. return self # Parse the name. components = self._parse_name(name) # If there is exactly one component in the name then the operation # takes place in this context. if len(components) == 1: atom = components[0] if not self._is_bound(atom): raise NameNotFoundError(name) # Do the actual lookup. obj = self._get_next_context(atom) # Otherwise, attempt to continue resolution into the next context. else: if not self._is_bound(components[0]): raise NameNotFoundError(components[0]) next_context = self._get_next_context(components[0]) obj = next_context.lookup("/".join(components[1:])) return obj
[docs] def create_subcontext(self, name): """ Creates a sub-context. """ if len(name) == 0: raise InvalidNameError("empty name") # Parse the name. components = self._parse_name(name) # If there is exactly one component in the name then the operation # takes place in this context. if len(components) == 1: atom = components[0] # Is the name already bound? if self._is_bound(atom): raise NameAlreadyBoundError(name) # Do the actual creation of the sub-context. sub = self._create_subcontext(atom) # Trait event notification. self.object_added = NamingEvent( new_binding=Binding(name=name, obj=sub, context=self) ) # Otherwise, attempt to continue resolution into the next context. else: if not self._is_bound(components[0]): raise NameNotFoundError(components[0]) next_context = self._get_next_context(components[0]) sub = next_context.create_subcontext("/".join(components[1:])) return sub
[docs] def destroy_subcontext(self, name): """ Destroys a sub-context. """ if len(name) == 0: raise InvalidNameError("empty name") # Parse the name. components = self._parse_name(name) # If there is exactly one component in the name then the operation # takes place in this context. if len(components) == 1: atom = components[0] if not self._is_bound(atom): raise NameNotFoundError(name) obj = self._lookup(atom) if not self._is_context(atom): raise NotContextError(name) # Do the actual destruction of the sub-context. self._destroy_subcontext(atom) # Trait event notification. self.object_removed = NamingEvent( old_binding=Binding(name=name, obj=obj, context=self) ) # Otherwise, attempt to continue resolution into the next context. else: if not self._is_bound(components[0]): raise NameNotFoundError(components[0]) next_context = self._get_next_context(components[0]) next_context.destroy_subcontext("/".join(components[1:]))
# fixme: Non-JNDI
[docs] def get_unique_name(self, prefix): """Returns a name that is unique within the context. The name returned will start with the specified prefix. """ return make_unique_name( prefix, existing=self.list_names(""), format="%s (%d)" )
[docs] def list_names(self, name=""): """ Lists the names bound in a context. """ # If the name is empty then the operation takes place in this context. if len(name) == 0: names = self._list_names() # Otherwise, attempt to continue resolution into the next context. else: # Parse the name. components = self._parse_name(name) if not self._is_bound(components[0]): raise NameNotFoundError(components[0]) next_context = self._get_next_context(components[0]) names = next_context.list_names("/".join(components[1:])) return names
[docs] def list_bindings(self, name=""): """ Lists the bindings in a context. """ # If the name is empty then the operation takes place in this context. if len(name) == 0: bindings = self._list_bindings() # Otherwise, attempt to continue resolution into the next context. else: # Parse the name. components = self._parse_name(name) if not self._is_bound(components[0]): raise NameNotFoundError(components[0]) next_context = self._get_next_context(components[0]) bindings = next_context.list_bindings("/".join(components[1:])) return bindings
# fixme: Non-JNDI
[docs] def is_context(self, name): """ Returns True if the name is bound to a context. """ # If the name is empty then it refers to this context. if len(name) == 0: is_context = True else: # Parse the name. components = self._parse_name(name) # If there is exactly one component in the name then the operation # takes place in this context. if len(components) == 1: atom = components[0] if not self._is_bound(atom): raise NameNotFoundError(name) # Do the actual check. is_context = self._is_context(atom) # Otherwise, attempt to continue resolution into the next context. else: if not self._is_bound(components[0]): raise NameNotFoundError(components[0]) next_context = self._get_next_context(components[0]) is_context = next_context.is_context("/".join(components[1:])) return is_context
# fixme: Non-JNDI
[docs] def search(self, obj): """ Returns a list of namespace names that are bound to obj. """ # don't look for None if obj is None: return [] # Obj is bound to these names relative to this context names = [] # path contain the name components down to the current context path = [] self._search(obj, names, path, {}) return names
########################################################################### # Protected 'Context' interface. ########################################################################### def _parse_name(self, name): """Parse a name into a list of components. e.g. 'foo/bar/baz' -> ['foo', 'bar', 'baz'] """ return name.split("/") def _is_bound(self, name): """ Is a name bound in this context? """ return name in self._bindings def _lookup(self, name): """ Looks up a name in this context. """ obj = self._bindings[name] return naming_manager.get_object_instance(obj, name, self) def _lookup_binding(self, name): """ Looks up the binding for a name in this context. """ return Binding(name=name, obj=self._lookup(name), context=self) def _bind(self, name, obj): """ Binds a name to an object in this context. """ state = naming_manager.get_state_to_bind(obj, name, self) self._bindings[name] = state def _rebind(self, name, obj): """ Rebinds a name to an object in this context. """ self._bind(name, obj) def _unbind(self, name): """ Unbinds a name from this context. """ del self._bindings[name] def _rename(self, old_name, new_name): """ Renames an object in this context. """ # Bind the new name. self._bindings[new_name] = self._bindings[old_name] # Unbind the old one. del self._bindings[old_name] def _create_subcontext(self, name): """ Creates a sub-context of this context. """ sub = self.__class__(environment=self.environment) self._bindings[name] = sub return sub def _destroy_subcontext(self, name): """ Destroys a sub-context of this context. """ del self._bindings[name] def _list_bindings(self): """ Lists the bindings in this context. """ bindings = [] for name in self._list_names(): bindings.append( Binding(name=name, obj=self._lookup(name), context=self) ) return bindings def _list_names(self): """ Lists the names bound in this context. """ return list(self._bindings.keys()) def _is_context(self, name): """ Returns True if a name is bound to a context. """ return self._get_next_context(name) is not None def _get_next_context(self, name): """ Returns the next context. """ obj = self._lookup(name) # If the object is a context then everything is just dandy. if isinstance(obj, Context): next_context = obj else: raise NotContextError(name) return next_context def _search(self, obj, names, path, searched): """Append to names any name bound to obj. Join path and name with '/' to for a complete name from the top context. """ # Check the bindings recursively. for binding in self.list_bindings(): if binding.obj is obj: path.append(binding.name) names.append("/".join(path)) path.pop() if ( isinstance(binding.obj, Context) and binding.obj not in searched ): path.append(binding.name) searched[binding.obj] = True binding.obj._search(obj, names, path, searched) path.pop()