# (C) Copyright 2005-2024 Enthought, Inc., Austin, TX
# All rights reserved.
#
# This software is provided without warranty under the terms of the BSD
# license included in LICENSE.txt and may be redistributed only under
# the conditions described in the aforementioned license. The license
# is also available online at http://www.enthought.com/licenses/BSD.txt
#
# Thanks for using Enthought open source!
"""
Code to support recording to a readable and executable Python script.
FIXME:
- Support for dictionaries?
"""
import builtins
import warnings
from traits.api import (
HasTraits,
List,
Str,
Dict,
Bool,
Property,
Int,
Instance,
)
from traits.util.camel_case import camel_case_to_python
###############################################################################
# `_RegistryData` class.
###############################################################################
class _RegistryData(HasTraits):
# Object's script ID
script_id = Property(Str)
# Path to object in object hierarchy.
path = Property(Str)
# Parent data for this object if any.
parent_data = Instance("_RegistryData", allow_none=True)
# The name of the trait on the parent which is this object.
trait_name_on_parent = Str("")
# List of traits we are listening for on this object.
names = List(Str)
# Nested recordable instances on the object.
sub_recordables = List(Str)
# List of traits that are lists.
list_names = List(Str)
_script_id = Str("")
###########################################################################
# Non-public interface.
###########################################################################
def _get_path(self):
pdata = self.parent_data
path = ""
if pdata is not None:
pid = pdata.script_id
ppath = pdata.path
tnop = self.trait_name_on_parent
if "[" in tnop:
# If the object is a nested object through an iterator,
# we instantiate it and don't refer to it through the
# path, this makes scripting convenient.
if len(ppath) == 0:
path = pid + "." + tnop
else:
path = ppath + "." + tnop
else:
path = ppath + "." + tnop
return path
def _get_script_id(self):
sid = self._script_id
if len(sid) == 0:
pdata = self.parent_data
sid = pdata.script_id + "." + self.trait_name_on_parent
return sid
def _set_script_id(self, id):
self._script_id = id
###############################################################################
# `RecorderError` class.
###############################################################################
[docs]class RecorderError(Exception):
pass
###############################################################################
# `Recorder` class.
###############################################################################
[docs]class Recorder(HasTraits):
# The lines of code recorded.
lines = List(Str)
# Are we recording or not?
recording = Bool(False, desc="if script recording is enabled or not")
# The Python script we have recorded so far. This is just a
# convenience trait for the `get_code()` method.
script = Property(Str)
########################################
# Private traits.
# Dict used to store information on objects registered. It stores a
# unique name for the object and its path in the object hierarchy
# traversed.
_registry = Dict
# Reverse registry with keys as script_id and object as value.
_reverse_registry = Dict
# A mapping to generate unique names for objects. The key is the
# name used (which is something derived from the class name of the
# object) and the value is an integer describing the number of times
# that variable name has been used earlier.
_name_map = Dict(Str, Int)
# A list of special reserved script IDs. This is handy when you
# want a particular object to have an easy to read script ID and not
# the default one based on its class name. This leads to slightly
# easier to read scripts.
_special_ids = List
# What are the known names in the script? By known names we mean
# names which are actually bound to objects.
_known_ids = List(Str)
# The known types in the namespace.
_known_types = List(Str)
# A guard to check if we are currently in a recorded function call,
# in which case we don't want to do any recording.
_in_function = Bool(False)
###########################################################################
# `Recorder` interface.
###########################################################################
[docs] def record(self, code):
"""Record a string to be stored to the output file.
Parameters
----------
code : str
A string of text.
"""
if self.recording and not self._in_function:
lines = self.lines
# Analyze the code and add extra code if needed.
self._analyze_code(code)
# Add the code.
lines.append(code)
[docs] def register(
self,
object,
parent=None,
trait_name_on_parent="",
ignore=None,
known=False,
script_id=None,
):
"""Register an object with the recorder. This sets up the
object for recording.
By default all traits (except those starting and ending with
'_') are recorded. For attributes that are themselves
recordable, one may mark traits with a 'record' metadata as
follows:
- If metadata `record=False` is set, the nested object will not be
recorded.
- If `record=True`, then that object is also recorded if it is
not `None`.
If the object is a list or dict that is marked with
`record=True`, the list is itself not listened to for changes
but all its contents are registered.
If the `object` has a trait named `recorder` then this recorder
instance will be set to it if possible.
Parameters
----------
object : Instance(HasTraits)
The object to register in the registry.
parent : Instance(HasTraits)
An optional parent object in which `object` is contained
trait_name_on_parent : str
An optional trait name of the `object` in the `parent`.
ignore : list(str)
An optional list of trait names on the `object` to be
ignored.
known : bool
Optional specification if the `object` id is known on the
interpreter. This is needed if you are manually injecting
code to define/create an object.
script_id : str
Optionally specify a script_id to use for this object. It
is not guaranteed that this ID will be used since it may
already be in use.
"""
registry = self._registry
# Do nothing if the object is already registered.
if object in registry:
return
# When parent is specified the trait_name_on_parent must also be.
if parent is not None:
assert len(trait_name_on_parent) > 0
if ignore is None:
ignore = []
if isinstance(object, HasTraits):
# Always ignore these.
ignore.extend(["trait_added", "trait_modified"])
sub_recordables = list(object.traits(record=True).keys())
# Find all the trait names we must ignore.
ignore.extend(object.traits(record=False).keys())
# The traits to listen for.
tnames = [
t
for t in object.trait_names()
if not t.startswith("_")
and not t.endswith("_")
and t not in ignore
]
# Find all list traits.
trts = object.traits()
list_names = []
for t in tnames:
tt = trts[t].trait_type
if (
hasattr(tt, "default_value_type")
and tt.default_value_type == 5
):
list_names.append(t)
else:
# No traits, so we can't do much.
sub_recordables = []
tnames = []
list_names = []
# Setup the registry data.
# If a script id is supplied try and use it.
sid = ""
if script_id is not None:
r_registry = self._reverse_registry
while script_id in r_registry:
script_id = "%s1" % script_id
sid = script_id
# Add the chosen id to special_id list.
self._special_ids.append(sid)
if parent is None:
pdata = None
if len(sid) == 0:
sid = self._get_unique_name(object)
else:
pdata = self._get_registry_data(parent)
tnop = trait_name_on_parent
if "[" in tnop:
# If the object is a nested object through an iterator,
# we instantiate it and don't refer to it through the
# path, this makes scripting convenient.
sid = self._get_unique_name(object)
# Register the object with the data.
data = _RegistryData(
script_id=sid,
parent_data=pdata,
trait_name_on_parent=trait_name_on_parent,
names=tnames,
sub_recordables=sub_recordables,
list_names=list_names,
)
registry[object] = data
# Now get the script id of the object -- note that if sid is ''
# above then the script_id is computed from that of the parent.
sid = data.script_id
# Setup reverse registry so we can get the object from the
# script_id.
self._reverse_registry[sid] = object
# Record the script_id if the known argument is explicitly set to
# True.
if known:
self._known_ids.append(sid)
# Try and set the recorder attribute if necessary.
if hasattr(object, "recorder"):
try:
object.recorder = self
except Exception as e:
msg = "Cannot set 'recorder' trait of object %r: " "%s" % (
object,
e,
)
warnings.warn(msg, warnings.RuntimeWarning)
if isinstance(object, HasTraits):
# Add handler for lists.
for name in list_names:
object.on_trait_change(
self._list_items_listner, "%s_items" % name
)
# Register all sub-recordables.
for name in sub_recordables:
obj = getattr(object, name)
if isinstance(obj, list):
# Don't register the object itself but register its
# children.
for i, child in enumerate(obj):
attr = "%s[%d]" % (name, i)
self.register(
child, parent=object, trait_name_on_parent=attr
)
elif obj is not None:
self.register(
obj, parent=object, trait_name_on_parent=name
)
# Listen for changes to the trait itself so the newly
# assigned object can also be listened to.
object.on_trait_change(self._object_changed_handler, name)
# Now add listner for the object itself.
object.on_trait_change(self._listner, tnames)
[docs] def unregister(self, object):
"""Unregister the given object from the recorder. This inverts
the logic of the `register(...)` method.
"""
registry = self._registry
# Do nothing if the object isn't registered.
if object not in registry:
return
data = registry[object]
# Try and unset the recorder attribute if necessary.
if hasattr(object, "recorder"):
try:
object.recorder = None
except Exception as e:
msg = "Cannot unset 'recorder' trait of object %r:" "%s" % (
object,
e,
)
warnings.warn(msg, warnings.RuntimeWarning)
if isinstance(object, HasTraits):
# Remove all list_items handlers.
for name in data.list_names:
object.on_trait_change(
self._list_items_listner, "%s_items" % name, remove=True
)
# Unregister all sub-recordables.
for name in data.sub_recordables:
obj = getattr(object, name)
if isinstance(obj, list):
# Unregister the children.
for i, child in enumerate(obj):
self.unregister(child)
elif obj is not None:
self.unregister(obj)
# Remove the trait handler for trait assignments.
object.on_trait_change(
self._object_changed_handler, name, remove=True
)
# Now remove listner for the object itself.
object.on_trait_change(self._listner, data.names, remove=True)
# Remove the object data from the registry etc.
if data.script_id in self._known_ids:
self._known_ids.remove(data.script_id)
del self._reverse_registry[data.script_id]
del registry[object]
[docs] def save(self, file):
"""Save the recorded lines to the given file. It does not close
the file.
"""
file.write(self.get_code())
file.flush()
[docs] def record_function(self, func, args, kw):
"""Record a function call given the function and its
arguments."""
if self.recording and not self._in_function:
# Record the function name and arguments.
call_str = self._function_as_string(func, args, kw)
# Call the function.
try:
self._in_function = True
result = func(*args, **kw)
finally:
self._in_function = False
# Register the result if it is not None.
if func.__name__ == "__init__":
f_self = args[0]
code = self._import_class_string(f_self.__class__)
self.lines.append(code)
return_str = self._registry.get(f_self).script_id
else:
return_str = self._return_as_string(result)
if len(return_str) > 0:
self.lines.append("%s = %s" % (return_str, call_str))
else:
self.lines.append("%s" % (call_str))
else:
result = func(*args, **kw)
return result
[docs] def ui_save(self):
"""Save recording to file, pop up a UI dialog to find out where
and close the file when done.
"""
from pyface.api import FileDialog, OK
wildcard = "Python files (*.py)|*.py|" + FileDialog.WILDCARD_ALL
dialog = FileDialog(
title="Save Script", action="save as", wildcard=wildcard
)
if dialog.open() == OK:
fname = dialog.path
f = open(fname, "w")
self.save(f)
f.close()
[docs] def clear(self):
"""Clears all previous recorded state and unregisters all
registered objects."""
# First unregister any registered objects.
registry = self._registry
while len(registry) > 0:
self.unregister(list(registry.keys())[0])
# Clear the various lists.
self.lines[:] = []
self._registry.clear()
self._known_ids[:] = []
self._name_map.clear()
self._reverse_registry.clear()
self._known_types[:] = []
self._special_ids[:] = []
[docs] def get_code(self):
"""Returns the recorded lines as a string of printable code."""
return "\n".join(self.lines) + "\n"
[docs] def is_registered(self, object):
"""Returns True if the given object is registered with the
recorder."""
return object in self._registry
[docs] def get_script_id(self, object):
"""Returns the script_id of a registered object. Useful when
you want to manually add a record statement."""
return self._get_registry_data(object).script_id
[docs] def get_object_path(self, object):
"""Returns the path in the object hierarchy of a registered
object. Useful for debugging."""
return self._get_registry_data(object).path
[docs] def write_script_id_in_namespace(self, script_id):
"""If a script_id is not known in the current script's namespace,
this sets it using the path of the object or actually
instantiating it. If this is not possible (since the script_id
matches no existing object), nothing is recorded but the
framework is notified that the particular script_id is available
in the namespace. This is useful when you want to inject code
in the namespace to create a particular object.
"""
if not self.recording:
return
known_ids = self._known_ids
if script_id not in known_ids:
obj = self._reverse_registry.get(script_id)
# Add the ID to the known_ids.
known_ids.append(script_id)
if obj is not None:
data = self._registry.get(obj)
result = ""
if len(data.path) > 0:
# Record code for instantiation of object.
result = "%s = %s" % (script_id, data.path)
else:
# This is not the best thing to do but better than
# nothing.
result = self._import_class_string(obj.__class__)
cls = obj.__class__.__name__
result += "\n%s = %s()" % (script_id, cls)
if len(result) > 0:
self.lines.extend(result.split("\n"))
###########################################################################
# Non-public interface.
###########################################################################
def _get_unique_name(self, obj):
"""Return a unique object name (a string). Note that this does
not cache the object, so if called with the same object 3 times
you'll get three different names.
"""
cname = obj.__class__.__name__
nm = self._name_map
result = ""
builtin = False
if cname in builtins.__dict__:
builtin = True
if hasattr(obj, "__name__"):
cname = obj.__name__
else:
cname = camel_case_to_python(cname)
special_ids = self._special_ids
while len(result) == 0 or result in special_ids:
if cname in nm:
id = nm[cname] + 1
nm[cname] = id
result = "%s%d" % (cname, id)
else:
nm[cname] = 0
# The first id doesn't need a number if it isn't builtin.
if builtin:
result = "%s0" % (cname)
else:
result = cname
return result
def _get_registry_data(self, object):
"""Get the data for an object from registry."""
data = self._registry.get(object)
if data is None:
msg = (
"Recorder: Can't get script_id since object %s not registered"
)
raise RecorderError(msg % (object))
return data
def _listner(self, object, name, old, new):
"""The listner for trait changes on an object.
This is called by child listners or when any of the recordable
object's traits change when recording to a script is enabled.
Parameters:
-----------
object : Object which has changed.
name : extended name of attribute that changed.
old : Old value.
new : New value.
"""
if self.recording and not self._in_function:
new_repr = repr(new)
sid = self._get_registry_data(object).script_id
if len(sid) == 0:
msg = "%s = %r" % (name, new)
else:
msg = "%s.%s = %r" % (sid, name, new)
if new_repr.startswith("<") and new_repr.endswith(">"):
self.record("# " + msg)
else:
self.record(msg)
def _list_items_listner(self, object, name, old, event):
"""The listner for *_items on list traits of the object."""
# Set the path of registered objects in the modified list and
# all their children. This is done by unregistering the object
# and re-registering them. This is slow but.
registry = self._registry
sid = registry.get(object).script_id
trait_name = name[:-6]
items = getattr(object, trait_name)
for (i, item) in enumerate(items):
if item in registry:
data = registry.get(item)
tnop = data.trait_name_on_parent
if len(tnop) > 0:
data.trait_name_on_parent = "%s[%d]" % (trait_name, i)
# Record the change.
if self.recording and not self._in_function:
index = event.index
removed = event.removed
added = event.added
nr = len(removed)
slice = "[%d:%d]" % (index, index + nr)
rhs = [self._object_as_string(item) for item in added]
rhs = ", ".join(rhs)
obj = "%s.%s" % (sid, name[:-6])
msg = "%s%s = [%s]" % (obj, slice, rhs)
self.record(msg)
def _object_changed_handler(self, object, name, old, new):
"""Called when a child recordable object has been reassigned."""
registry = self._registry
if old is not None:
if old in registry:
self.unregister(old)
if new is not None:
if new not in registry:
self.register(new, parent=object, trait_name_on_parent=name)
def _get_script(self):
return self.get_code()
def _analyze_code(self, code):
"""Analyze the code and return extra code if needed."""
lhs = ""
try:
lhs = code.split()[0]
except IndexError:
pass
if "." in lhs:
ob_name = lhs.split(".")[0]
self.write_script_id_in_namespace(ob_name)
def _function_as_string(self, func, args, kw):
"""Return a string representing the function call."""
func_name = func.__name__
func_code = func.__code__
# Even if func is really a decorated method it never shows up as
# a bound or unbound method here, so we have to inspect the
# argument names to figure out if this is a method or function.
if func_code.co_argcount > 0 and func_code.co_varnames[0] == "self":
# This is a method, the first argument is bound to self.
f_self = args[0]
# Convert the remaining arguments to strings.
argl = [self._object_as_string(arg) for arg in args[1:]]
# If this is __init__ we special case it.
if func_name == "__init__":
# Register the object.
self.register(f_self, known=True)
func_name = f_self.__class__.__name__
else:
sid = self._object_as_string(f_self)
func_name = "%s.%s" % (sid, func_name)
else:
argl = [self._object_as_string(arg) for arg in args]
# Convert the keyword args.
kwl = [
"%s=%s" % (key, self._object_as_string(value))
for key, value in kw.items()
]
argl.extend(kwl)
# Make a string representation of the args, kw.
argstr = ", ".join(argl)
return "%s(%s)" % (func_name, argstr)
def _is_arbitrary_object(self, object):
"""Return True if the object is an arbitrary non-primitive object.
We assume that if the hex id of the object is in its string
representation then it is an arbitrary object.
"""
ob_id = id(object)
orepr = repr(object)
hex_id = "%x" % ob_id
return hex_id.upper() in orepr.upper()
def _object_as_string(self, object):
"""Return a string representing the object."""
registry = self._registry
if object in registry:
# Return script id if the object is known; create the script
# id on the namespace if needed before that.
sid = registry.get(object).script_id
base_id = sid.split(".")[0]
self.write_script_id_in_namespace(base_id)
return sid
else:
if not self._is_arbitrary_object(object):
return repr(object)
# If we get here, we just register the object and call ourselves
# again to do the needful.
self.register(object)
return self._object_as_string(object)
def _return_as_string(self, object):
"""Return a string given a returned object from a function."""
result = ""
ignore = (float, complex, bool, int, str)
if object is not None and type(object) not in ignore:
# If object is not know, register it.
registry = self._registry
if object not in registry:
self.register(object)
result = registry.get(object).script_id
# Since this is returned it is known on the namespace.
known_ids = self._known_ids
if result not in known_ids:
known_ids.append(result)
return result
def _import_class_string(self, cls):
"""Import a class if needed."""
cname = cls.__name__
result = ""
if cname not in builtins.__dict__:
mod = cls.__module__
typename = "%s.%s" % (mod, cname)
if typename not in self._known_types:
result = "from %s import %s" % (mod, cname)
self._known_types.append(typename)
return result