Source code for pyface.image.image

# (C) Copyright 2005-2023 Enthought, Inc., Austin, TX
# All rights reserved.
#
# This software is provided without warranty under the terms of the BSD
# license included in LICENSE.txt and may be redistributed only under
# the conditions described in the aforementioned license. The license
# is also available online at http://www.enthought.com/licenses/BSD.txt
#
# Thanks for using Enthought open source!
#  Date:   11/03/2007

""" Defines the ImageLibrary object used to manage Pyface image libraries.
"""

import sys
from os import (
    environ,
    listdir,
    remove,
    stat,
    makedirs,
    rename,
    access,
    R_OK,
    W_OK,
    X_OK,
)
from os.path import (
    join,
    isdir,
    isfile,
    splitext,
    abspath,
    dirname,
    basename,
    exists,
)
from stat import ST_MTIME
from platform import system
from zipfile import is_zipfile, ZipFile, ZIP_DEFLATED
import datetime
import time
from _thread import allocate_lock
from threading import Thread

from traits.api import (
    HasPrivateTraits,
    Property,
    Str,
    Int,
    List,
    Dict,
    File,
    Instance,
    Bool,
    Undefined,
    TraitError,
    Float,
    Any,
    cached_property,
)
from traits.trait_base import get_resource_path, traits_home

from pyface.api import ImageResource
from pyface.resource_manager import resource_manager
from pyface.resource.resource_reference import (
    ImageReference,
    ResourceReference,
)
from pyface.ui_traits import HasMargin, HasBorder, Alignment

# ---------------------------------------------------------------------------
#  Constants:
# ---------------------------------------------------------------------------

# Standard image file extensions:
ImageFileExts = (".png", ".gif", ".jpg", "jpeg")

# The image_cache root directory:
image_cache_path = join(traits_home(), "image_cache")

# Names of files that should not be copied when ceating a new library copy:
dont_copy_list = ("image_volume.py", "image_info.py", "license.txt")

# -- Code Generation Templates ----------------------------------------------

# Template for creating an ImageVolumeInfo object:
ImageVolumeInfoCodeTemplate = """        ImageVolumeInfo(
            description=%(description)s,
            copyright=%(copyright)s,
            license=%(license)s,
            image_names=%(image_names)s
        )"""

# Template for creating an ImageVolumeInfo license text:
ImageVolumeInfoTextTemplate = """Description:
    %s

Copyright:
    %s

License:
    %s

Applicable Images:
%s"""

# Template for creating an ImageVolume object:
ImageVolumeTemplate = """from pyface.image.image import ImageVolume, ImageVolumeInfo

volume = ImageVolume(
    category=%(category)s,
    keywords=%(keywords)s,
    aliases=%(aliases)s,
    time_stamp=%(time_stamp)s,
    info=[
%(info)s
    ]
)"""

# Template for creating an ImageVolume 'images' list:
ImageVolumeImagesTemplate = """from pyface.image.image import ImageInfo
from pyface.ui_traits import Margin, Border

images = [
%s
]"""

# Template for creating an ImageInfo object:
ImageInfoTemplate = """    ImageInfo(
        name=%(name)s,
        image_name=%(image_name)s,
        description=%(description)s,
        category=%(category)s,
        keywords=%(keywords)s,
        width=%(width)d,
        height=%(height)d,
        border=Border(%(bleft)d, %(bright)d, %(btop)d, %(bbottom)d),
        content=Margin(%(cleft)d, %(cright)d, %(ctop)d, %(cbottom)d),
        label=Margin(%(lleft)d, %(lright)d, %(ltop)d, %(lbottom)d),
        alignment=%(alignment)s
    )"""


[docs]def read_file(file_name): """ Returns the contents of the specified *file_name*. """ with open(file_name, "rb") as fh: return fh.read()
[docs]def write_file(file_name, data): """ Writes the specified data to the specified file. """ if isinstance(data, str): data = data.encode('utf8') with open(file_name, "wb") as fh: fh.write(data)
[docs]def get_python_value(source, name): """ Returns the value of a Python symbol loaded from a specified source code string. """ temp = {} exec(source.replace(b"\r", b""), globals(), temp) return temp[name]
[docs]def time_stamp_for(time): """ Returns a specified time as a text string. """ return datetime.datetime.utcfromtimestamp(time).strftime("%Y%m%d%H%M%S")
[docs]def add_object_prefix(dict, object, prefix): """ Adds all traits from a specified object to a dictionary with a specified name prefix. """ for name, value in object.trait_get().items(): dict[prefix + name] = value
[docs]def split_image_name(image_name): """ Splits a specified **image_name** into its constituent volume and file names and returns a tuple of the form: ( volume_name, file_name ). """ col = image_name.find(":") volume_name = image_name[1:col] file_name = image_name[col + 1:] if file_name.find(".") < 0: file_name += ".png" return (volume_name, file_name)
[docs]def join_image_name(volume_name, file_name): """ Joins a specified **volume_name** and **file_name** into an image name, and return the resulting image name. """ root, ext = splitext(file_name) if (ext == ".png") and (root.find(".") < 0): file_name = root return "@%s:%s" % (volume_name, file_name)
[docs]class FastZipFile(HasPrivateTraits): """ Provides fast access to zip files by keeping the underlying zip file open across multiple uses. """ #: The path to the zip file: path = File() #: The open zip file object (if None, the file is closed): zf = Property #: The time stamp of when the zip file was most recently accessed: time_stamp = Float() #: The lock used to manage access to the 'zf' trait between the two threads: access = Any() # -- Public Methods ---------------------------------------------------------
[docs] def namelist(self): """ Returns the names of all files in the top-level zip file directory. """ self.access.acquire() try: return self.zf.namelist() finally: self.access.release()
[docs] def read(self, file_name): """ Returns the contents of the specified **file_name** from the zip file. """ self.access.acquire() try: return self.zf.read(file_name) finally: self.access.release()
[docs] def close(self): """ Temporarily closes the zip file (usually while the zip file is being replaced by a different version). """ self.access.acquire() try: if self._zf is not None: self._zf.close() self._zf = None finally: self.access.release()
# -- Default Value Implementations ------------------------------------------ def _access_default(self): return allocate_lock() # -- Property Implementations ----------------------------------------------- def _get_zf(self): # Restart the time-out: self.time_stamp = time.time() if self._zf is None: self._zf = ZipFile(self.path, "r") if self._running is None: Thread(target=self._process).start() self._running = True return self._zf # -- Private Methods -------------------------------------------------------- def _process(self): """ Waits until the zip file has not been accessed for a while, then closes the file and exits. """ while True: time.sleep(1) self.access.acquire() if time.time() > (self.time_stamp + 2.0): if self._zf is not None: self._zf.close() self._zf = None self._running = None self.access.release() break self.access.release()
# ------------------------------------------------------------------------------- # 'ImageInfo' class: # -------------------------------------------------------------------------------
[docs]class ImageInfo(HasPrivateTraits): """ Defines a class that contains information about a specific Traits UI image. """ #: The volume this image belongs to: volume = Instance("ImageVolume") #: The user friendly name of the image: name = Str() #: The full image name (e.g. '@standard:floppy'): image_name = Str() #: A description of the image: description = Str() #: The category that the image belongs to: category = Str("General") #: A list of keywords used to describe/categorize the image: keywords = List(Str) #: The image width (in pixels): width = Int() #: The image height (in pixels): height = Int() #: The border inset: border = HasBorder #: The margin to use around the content: content = HasMargin #: The margin to use around the label: label = HasMargin #: The alignment to use for the label: alignment = Alignment #: The copyright that applies to this image: copyright = Property #: The license that applies to this image: license = Property #: A read-only string containing the Python code needed to construct this #: ImageInfo object: image_info_code = Property # -- Default Value Implementations ------------------------------------------ def _name_default(self): return split_image_name(self.image_name)[1] def _width_default(self): if self.volume is None: return 0 image = self.volume.image_resource(self.image_name) if image is None: self.height = 0 return 0 width, self.height = image.image_size(image.create_image()) return width def _height_default(self): if self.volume is None: return 0 image = self.volume.image_resource(self.image_name) if image is None: self.width = 0 return 0 self.width, height = image.image_size(image.create_image()) return height # -- Property Implementations ----------------------------------------------- def _get_image_info_code(self): data = dict( (name, repr(value)) for name, value in self.trait_get( "name", "image_name", "description", "category", "keywords", "alignment", ).items() ) data.update(self.trait_get("width", "height")) sides = ["left", "right", "top", "bottom"] data.update(("b" + name, getattr(self.border, name)) for name in sides) data.update( ("c" + name, getattr(self.content, name)) for name in sides ) data.update(("l" + name, getattr(self.label, name)) for name in sides) return ImageInfoTemplate % data def _get_copyright(self): return self._volume_info("copyright") def _get_license(self): return self._volume_info("license") # -- Private Methods -------------------------------------------------------- def _volume_info(self, name): """ Returns the VolumeInfo object that applies to this image. """ info = self.volume.volume_info(self.image_name) if info is not None: return getattr(info, name, "Unknown") return "Unknown"
# ------------------------------------------------------------------------------- # 'ImageVolumeInfo' class: # -------------------------------------------------------------------------------
[docs]class ImageVolumeInfo(HasPrivateTraits): #: A general description of the images: description = Str("No volume description specified.") #: The copyright that applies to the images: copyright = Str("No copyright information specified.") #: The license that applies to the images: license = Str("No license information specified.") #: The list of image names within the volume the information applies to. #: Note that an empty list means that the information applies to all images #: in the volume: image_names = List(Str) #: A read-only string containing the Python code needed to construct this #: ImageVolumeInfo object: image_volume_info_code = Property #: A read-only string containing the text describing the volume info: image_volume_info_text = Property # -- Property Implementations ----------------------------------------------- @cached_property def _get_image_volume_info_code(self): data = dict( (name, repr(getattr(self, name))) for name in ["description", "copyright", "license", "image_names"] ) return ImageVolumeInfoCodeTemplate % data @cached_property def _get_image_volume_info_text(self): description = self.description.replace("\n", "\n ") license = self.license.replace("\n", "\n ").strip() image_names = self.image_names image_names.sort() if len(image_names) == 0: image_names = ["All"] images = "\n".join([" - " + image_name for image_name in image_names]) return ImageVolumeInfoTextTemplate % ( description, self.copyright, license, images, ) # -- Public Methods ---------------------------------------------------------
[docs] def clone(self): """ Returns a copy of the ImageVolumeInfo object. """ return self.clone(["description", "copyright", "license"])
# ------------------------------------------------------------------------------- # 'ImageVolume' class: # -------------------------------------------------------------------------------
[docs]class ImageVolume(HasPrivateTraits): #: The canonical name of this volume: name = Str() #: The list of volume descriptors that apply to this volume: info = List(ImageVolumeInfo) #: The category that the volume belongs to: category = Str("General") #: A list of keywords used to describe the volume: keywords = List(Str) #: The list of aliases for this volume: aliases = List(Str) #: The path of the file that defined this volume: path = File() #: Is the path a zip file? is_zip_file = Bool(True) #: The FastZipFile object used to access the underlying zip file: zip_file = Instance(FastZipFile) #: The list of images available in the volume: images = List(ImageInfo) #: A dictionary mapping image names to ImageInfo objects: catalog = Property(observe="images") #: The time stamp of when the image library was last modified: time_stamp = Str() #: A read-only string containing the Python code needed to construct this #: ImageVolume object: image_volume_code = Property #: A read-only string containing the Python code needed to construct the #: 'images' list for this ImageVolume object: images_code = Property #: A read-only string containing the text describing the contents of the #: volume (description, copyright, license information, and the images they #: apply to): license_text = Property # -- Public Methods ---------------------------------------------------------
[docs] def update(self): """ Updates the contents of the image volume from the underlying image store, and saves the results. """ # Unlink all our current images: for image in self.images: image.volume = None # Make sure the images are up to date by deleting any current value: self.reset_traits(["images"]) # Save the new image volume information: self.save()
[docs] def save(self): """ Saves the contents of the image volume using the current contents of the **ImageVolume**. """ path = self.path if not self.is_zip_file: # Make sure the directory is writable: if not access(path, R_OK | W_OK | X_OK): return False # Make sure the directory and zip file are writable: elif (not access(dirname(path), R_OK | W_OK | X_OK)) or ( exists(path) and (not access(path, W_OK)) ): return False # Pre-compute the images code, because it can require a long time # to load all of the images so that we can determine their size, and we # don't want that time to interfere with the time stamp of the image # volume: images_code = self.images_code if not self.is_zip_file: # We need to time stamp when this volume info was generated, but # it needs to be the same or newer then the time stamp of the file # it is in. So we use the current time plus a 'fudge factor' to # allow for some slop in when the OS actually time stamps the file: self.time_stamp = time_stamp_for(time.time() + 5.0) # Write the volume manifest source code to a file: write_file(join(path, "image_volume.py"), self.image_volume_code) # Write the image info source code to a file: write_file(join(path, "image_info.py"), images_code) # Write a separate license file for human consumption: write_file(join(path, "license.txt"), self.license_text) return True # Create a temporary name for the new .zip file: file_name = path + ".###" # Create the new zip file: new_zf = ZipFile(file_name, "w", ZIP_DEFLATED) try: # Get the current zip file: cur_zf = self.zip_file # Copy all of the image files from the current zip file to the new # zip file: for name in cur_zf.namelist(): if name not in dont_copy_list: new_zf.writestr(name, cur_zf.read(name)) # Temporarily close the current zip file while we replace it with # the new version: cur_zf.close() # We need to time stamp when this volume info was generated, but # it needs to be the same or newer then the time stamp of the file # it is in. So we use the current time plus a 'fudge factor' to # allow for some slop in when the OS actually time stamps the file: self.time_stamp = time_stamp_for(time.time() + 10.0) # Write the volume manifest source code to the zip file: new_zf.writestr("image_volume.py", self.image_volume_code) # Write the image info source code to the zip file: new_zf.writestr("image_info.py", images_code) # Write a separate license file for human consumption: new_zf.writestr("license.txt", self.license_text) # Done creating the new zip file: new_zf.close() new_zf = None # Rename the original file to a temporary name, so we can give the # new file the original name. Note that unlocking the original zip # file after the previous close sometimes seems to take a while, # which is why we repeatedly try the rename until it either succeeds # or takes so long that it must have failed for another reason: temp_name = path + ".$$$" for i in range(50): try: rename(path, temp_name) break except Exception: time.sleep(0.1) try: rename(file_name, path) file_name = temp_name except: rename(temp_name, path) raise finally: if new_zf is not None: new_zf.close() remove(file_name) return True
[docs] def image_resource(self, image_name): """ Returns the ImageResource object for the specified **image_name**. """ # Get the name of the image file: volume_name, file_name = split_image_name(image_name) if self.is_zip_file: # See if we already have the image file cached in the file system: cache_file = self._check_cache(file_name) if cache_file is None: # If not cached, then create a zip file reference: ref = ZipFileReference( resource_factory=resource_manager.resource_factory, zip_file=self.zip_file, path=self.path, volume_name=self.name, file_name=file_name, ) else: # Otherwise, create a cache file reference: ref = ImageReference( resource_manager.resource_factory, filename=cache_file ) else: # Otherwise, create a normal file reference: ref = ImageReference( resource_manager.resource_factory, filename=join(self.path, file_name), ) # Create the ImageResource object using the reference (note that the # ImageResource class will not allow us to specify the reference in the # constructor): resource = ImageResource(file_name) resource._ref = ref # Return the ImageResource: return resource
[docs] def image_data(self, image_name): """ Returns the image data (i.e. file contents) for the specified image name. """ volume_name, file_name = split_image_name(image_name) if self.is_zip_file: return self.zip_file.read(file_name) else: return read_file(join(self.path, file_name))
[docs] def volume_info(self, image_name): """ Returns the ImageVolumeInfo object that corresponds to the image specified by **image_name**. """ for info in self.info: if (len(info.image_names) == 0) or ( image_name in info.image_names ): return info raise ValueError( "Volume info for image name {} not found.".format(repr(info)) )
# -- Default Value Implementations ------------------------------------------ def _info_default(self): return [ImageVolumeInfo()] def _images_default(self): return self._load_image_info() # -- Property Implementations ----------------------------------------------- @cached_property def _get_catalog(self): return dict((image.image_name, image) for image in self.images) def _get_image_volume_code(self): data = dict( (name, repr(value)) for name, value in self.trait_get( "description", "category", "keywords", "aliases", "time_stamp" ).items() ) data["info"] = ",\n".join( info.image_volume_info_code for info in self.info ) return ImageVolumeTemplate % data def _get_images_code(self): images = ",\n".join(info.image_info_code for info in self.images) return ImageVolumeImagesTemplate % images def _get_license_text(self): return ("\n\n%s\n" % ("-" * 79)).join( [info.image_volume_info_text for info in self.info] ) # -- Private Methods -------------------------------------------------------- def _load_image_info(self): """ Returns the list of ImageInfo objects for the images in the volume. """ # If there is no current path, then return a default list of images: if self.path == "": return [] time_stamp = time_stamp_for(stat(self.path)[ST_MTIME]) volume_name = self.name old_images = [] cur_images = [] if self.is_zip_file: zf = self.zip_file # Get the names of all top-level entries in the zip file: names = zf.namelist() # Check to see if there is an image info manifest file: if "image_info.py" in names: # Load the manifest code and extract the images list: old_images = get_python_value( zf.read("image_info.py"), "images" ) # Check to see if our time stamp is up to date with the file: if self.time_stamp < time_stamp: # If not, create an ImageInfo object for all image files # contained in the .zip file: for name in names: root, ext = splitext(name) if ext in ImageFileExts: cur_images.append( ImageInfo( name=root, image_name=join_image_name(volume_name, name), ) ) else: image_info_path = join(self.path, "image_info.py") if exists(image_info_path): # Load the manifest code and extract the images list: old_images = get_python_value( read_file(image_info_path), "images" ) # Check to see if our time stamp is up to data with the file: if self.time_stamp < time_stamp: # If not, create an ImageInfo object for each image file # contained in the path: for name in listdir(self.path): root, ext = splitext(name) if ext in ImageFileExts: cur_images.append( ImageInfo( name=root, image_name=join_image_name(volume_name, name), ) ) # Merge the old and current images into a single up to date list: if len(cur_images) == 0: images = old_images else: cur_image_set = dict( [(image.image_name, image) for image in cur_images] ) for old_image in old_images: cur_image = cur_image_set.get(old_image.image_name) if cur_image is not None: cur_image_set[old_image.image_name] = old_image cur_image.volume = self old_image.width = cur_image.width old_image.height = cur_image.height cur_image.volume = None images = list(cur_image_set.values()) # Set the new time stamp of the volume: self.time_stamp = time_stamp # Return the resulting sorted list as the default value: images.sort(key=lambda item: item.image_name) # Make sure all images reference this volume: for image in images: image.volume = self return images def _check_cache(self, file_name): """ Checks to see if the specified zip file name has been saved in the image cache. If it has, it returns the fully-qualified cache file name to use; otherwise it returns None. """ cache_file = join(image_cache_path, self.name, file_name) if exists(cache_file) and ( time_stamp_for(stat(cache_file)[ST_MTIME]) > self.time_stamp ): return cache_file return None
# ------------------------------------------------------------------------------- # 'ZipFileReference' class: # -------------------------------------------------------------------------------
[docs]class ZipFileReference(ResourceReference): #: The zip file to read; zip_file = Instance(FastZipFile) #: The volume name: volume_name = Str() #: The file within the zip file: file_name = Str() #: The name of the cached image file: cache_file = File() # -- The 'ResourceReference' API -------------------------------------------- #: The file name of the image (in this case, the cache file name): filename = Property # -- ResourceReference Interface Implementation -----------------------------
[docs] def load(self): """ Loads the resource. """ # Check if the cache file has already been created: cache_file = self.cache_file if cache_file == "": # Extract the data from the zip file: data = self.zip_file.read(self.file_name) # Try to create an image from the data, without writing it to a # file first: image = self.resource_factory.image_from_data(data, Undefined) if image is not None: return image # Make sure the correct image cache directory exists: cache_dir = join(image_cache_path, self.volume_name) if not exists(cache_dir): makedirs(cache_dir) # Write the image data to the cache file: cache_file = join(cache_dir, self.file_name) with open(cache_file, "wb") as fh: fh.write(data) # Save the cache file name in case we are called again: self.cache_file = cache_file # Release our reference to the zip file object: self.zip_file = None # Return the image data from the image cache file: return self.resource_factory.image_from_file(cache_file)
# -- Property Implementations ----------------------------------------------- def _get_filename(self): if self.cache_file == "": self.load() return self.cache_file
# ------------------------------------------------------------------------------- # 'ImageLibrary' class: # ------------------------------------------------------------------------------- class ImageLibrary(HasPrivateTraits): """ Manages Traits UI image libraries. """ #: The list of available image volumes in the library: volumes = List(ImageVolume) #: The volume dictionary (the keys are volume names, and the values are the #: corresponding ImageVolume objects): catalog = Dict(Str, ImageVolume) #: The list of available images in the library: images = Property(List, observe="volumes.items.images") # -- Private Traits --------------------------------------------------------- #: Mapping from a 'virtual' library name to a 'real' library name: aliases = Dict() # -- Public methods --------------------------------------------------------- def image_info(self, image_name): """ Returns the ImageInfo object corresponding to a specified **image_name**. """ volume = self.find_volume(image_name) if volume is not None: return volume.catalog.get(image_name) return None def image_resource(self, image_name): """ Returns an ImageResource object for the specified image name. """ # If no volume was specified, use the standard volume: if image_name.find(":") < 0: image_name = "@images:%s" % image_name[1:] # Find the correct volume, possible resolving any aliases used: volume = self.find_volume(image_name) # Find the image within the volume and return its ImageResource object: if volume is not None: return volume.image_resource(image_name) # Otherwise, the volume was not found: return None def find_volume(self, image_name): """ Returns the ImageVolume object corresponding to the specified **image_name** or None if the volume cannot be found. """ # Extract the volume name from the image name: volume_name, file_name = split_image_name(image_name) # Find the correct volume, possibly resolving any aliases used: catalog = self.catalog aliases = self.aliases while volume_name not in catalog: volume_name = aliases.get(volume_name) if volume_name is None: return None return catalog[volume_name] def add_volume(self, file_name=None): """ If **file_name** is a file, it adds an image volume specified by **file_name** to the image library. If **file_name** is a directory, it adds all image libraries contained in the directory to the image library. If **file_name** is omitted, all image libraries located in the *images* directory contained in the same directory as the caller are added. """ # If no file name was specified, derive a path from the caller's # source code location: if file_name is None: file_name = join(get_resource_path(2), "images") if isfile(file_name): # Load an image volume from the specified file: volume = self._add_volume(file_name) if volume is None: raise TraitError( "'%s' is not a valid image volume." % file_name ) if volume.name in self.catalog: self._duplicate_volume(volume.name) self.catalog[volume.name] = volume self.volumes.append(volume) elif isdir(file_name): # Load all image volumes from the specified path: catalog = self.catalog volumes = self._add_path(file_name) for volume in volumes: if volume.name in catalog: self._duplicate_volume(volume.name) catalog[volume.name] = volume self.volumes.extend(volumes) else: # Handle an unrecognized argument: raise TraitError( "The add method argument must be None or a file " "or directory path, but '%s' was specified." % file_name ) def add_path(self, volume_name, path=None): """ Adds the directory specified by **path** as a *virtual* volume called **volume_name**. All image files contained within path define the contents of the volume. If **path** is None, the *images* contained in the 'images' subdirectory of the same directory as the caller are is used as the path for the *virtual* volume.. """ # Make sure we don't already have a volume with that name: if volume_name in self.catalog: raise TraitError( ("The volume name '%s' is already in the image " "library.") % volume_name ) # If no path specified, derive one from the caller's source code # location: if path is None: path = join(get_resource_path(2), "images") # Make sure that the specified path is a directory: if not isdir(path): raise TraitError( "The image volume path '%s' does not exist." % path ) # Create the ImageVolume to describe the path's contents: image_volume_path = join(path, "image_volume.py") if exists(image_volume_path): volume = get_python_value(read_file(image_volume_path), "volume") else: volume = ImageVolume() # Set up the rest of the volume information: volume.trait_set(name=volume_name, path=path, is_zip_file=False) # Try to bring the volume information up to date if necessary: if volume.time_stamp < time_stamp_for(stat(path)[ST_MTIME]): # Note that the save could fail if the volume is read-only, but # that's OK, because we're only trying to do the save in case # a developer had added or deleted some image files, which would # require write access to the volume: volume.save() # Add the new volume to the library: self.catalog[volume_name] = volume self.volumes.append(volume) def extract(self, file_name, image_names): """ Builds a new image volume called **file_name** from the list of image names specified by **image_names**. Each image name should be of the form: '@volume:name'. """ # Get the volume name and file extension: volume_name, ext = splitext(basename(file_name)) # If no extension specified, add the '.zip' file extension: if ext == "": file_name += ".zip" # Create the ImageVolume object to describe the new volume: volume = ImageVolume(name=volume_name) # Make sure the zip file does not already exists: if exists(file_name): raise TraitError("The '%s' file already exists." % file_name) # Create the zip file: zf = ZipFile(file_name, "w", ZIP_DEFLATED) # Add each of the specified images to it and the ImageVolume: error = True aliases = set() keywords = set() images = [] info = {} try: for image_name in set(image_names): # Verify the image name is legal: if (image_name[:1] != "@") or (image_name.find(":") < 0): raise TraitError( ( "The image name specified by '%s' is " "not of the form: @volume:name." ) % image_name ) # Get the reference volume and image file names: image_volume_name, image_file_name = split_image_name( image_name ) # Get the volume for the image name: image_volume = self.find_volume(image_name) if image_volume is None: raise TraitError( ( "Could not find the image volume " "specified by '%s'." ) % image_name ) # Get the image info: image_info = image_volume.catalog.get(image_name) if image_info is None: raise TraitError( ("Could not find the image specified by " "'%s'.") % image_name ) # Add the image info to the list of images: images.append(image_info) # Add the image file to the zip file: zf.writestr( image_file_name, image_volume.image_data(image_name) ) # Add the volume alias needed by the image (if any): if image_volume_name != volume_name: if image_volume_name not in aliases: aliases.add(image_volume_name) # Add the volume keywords as well: for keyword in image_volume.keywords: keywords.add(keyword) # Add the volume info for the image: volume_info = image_volume.volume_info(image_name) vinfo = info.get(image_volume_name) if vinfo is None: info[image_volume_name] = vinfo = volume_info.clone() vinfo.image_names.append(image_name) # Create the list of images for the volume: images.sort(key=lambda item: item.image_name) volume.images = images # Create the list of aliases for the volume: volume.aliases = list(aliases) # Create the list of keywords for the volume: volume.keywords = list(keywords) # Create the final volume info list for the volume: volume.info = list(info.values()) # Write the volume manifest source code to the zip file: zf.writestr("image_volume.py", volume.image_volume_code) # Write the image info source code to the zip file: zf.writestr("image_info.py", volume.images_code) # Write a separate licenses file for human consumption: zf.writestr("license.txt", volume.license_text) # Indicate no errors occurred: error = False finally: zf.close() if error: remove(file_name) # -- Default Value Implementations ------------------------------------------ def _volumes_default(self): result = [] # Check for and add the 'application' image library: app_library = join(dirname(abspath(sys.argv[0])), "library") if isdir(app_library): result.extend(self._add_path(app_library)) # Get all volumes in the standard Traits UI image library directory: result.extend(self._add_path(join(get_resource_path(1), "library"))) # Check to see if there is an environment variable specifying a list # of paths containing image libraries: paths = environ.get("TRAITS_IMAGES") if paths is not None: # Determine the correct OS path separator to use: separator = ";" if system() != "Windows": separator = ":" # Add all image volumes found in each path in the environment # variable: for path in paths.split(separator): result.extend(self._add_path(path)) # Return the list of default volumes found: return result def _catalog_default(self): return dict([(volume.name, volume) for volume in self.volumes]) # -- Property Implementations ----------------------------------------------- @cached_property def _get_images(self): return self._get_images_list() # -- Private Methods -------------------------------------------------------- def _get_images_list(self): """ Returns the list of all library images. """ # Merge the list of images from each volume: images = [] for volume in self.volumes: images.extend(volume.images) # Sort the result: images.sort(key=lambda image: image.image_name) # Return the images list: return images def _add_path(self, path): """ Returns a list of ImageVolume objects, one for each image library located in the specified **path**. """ result = [] # Make sure the path is a directory: if isdir(path): # Find each zip file in the directory: for base in listdir(path): if splitext(base)[1] == ".zip": # Try to create a volume from the zip file and add it to # the result: volume = self._add_volume(join(path, base)) if volume is not None: result.append(volume) # Return the list of volumes found: return result def _add_volume(self, path): """ Returns an ImageVolume object for the image library specified by **path**. If **path** does not specify a valid ImageVolume, None is returned. """ path = abspath(path) # Make sure the path is a valid zip file: if is_zipfile(path): # Create a fast zip file for reading: zf = FastZipFile(path=path) # Extract the volume name from the path: volume_name = splitext(basename(path))[0] # Get the names of all top-level entries in the zip file: names = zf.namelist() # Check to see if there is a manifest file: if "image_volume.py" in names: # Load the manifest code and extract the volume object: volume = get_python_value(zf.read("image_volume.py"), "volume") # Set the volume name: volume.name = volume_name # Try to add all of the external volume references as # aliases for this volume: self._add_aliases(volume) # Set the path to this volume: volume.path = path # Save the reference to the zip file object we are using: volume.zip_file = zf else: # Create a new volume from the zip file: volume = ImageVolume(name=volume_name, path=path, zip_file=zf) # If this volume is not up to date, update it: if volume.time_stamp < time_stamp_for(stat(path)[ST_MTIME]): # Note that the save could fail if the volume is read-only, but # that's OK, because we're only trying to do the save in case # a developer had added or deleted some image files, which would # require write access to the volume: volume.save() # Return the volume: return volume # Indicate no volume was found: return None def _add_aliases(self, volume): """ Try to add all of the external volume references as aliases for this volume. """ aliases = self.aliases volume_name = volume.name for vname in volume.aliases: if (vname in aliases) and (volume_name != aliases[vname]): raise TraitError( ( "Image library error: " "Attempt to alias '%s' to '%s' when it is " "already aliased to '%s'" ) % (vname, volume_name, aliases[volume_name]) ) aliases[vname] = volume_name def _duplicate_volume(self, volume_name): """ Raises a duplicate volume name error. """ raise TraitError( ( "Attempted to add an image volume called '%s' when " "a volume with that name is already defined." ) % volume_name ) # Create the singleton image object: ImageLibrary = ImageLibrary()